Skip to main content

forest/message_pool/msgpool/
selection.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! Contains routines for message selection APIs.
5//! Whenever a miner is ready to create a block for a tipset, it invokes the
6//! `select_messages` API which selects an appropriate set of messages such that
7//! it optimizes miner reward and chain capacity. See <https://docs.filecoin.io/mine/lotus/message-pool/#message-selection> for more details
8
9use std::cmp::Ordering;
10
11use crate::blocks::{BLOCK_MESSAGE_LIMIT, Tipset};
12use crate::message::{MessageRead as _, SignedMessage};
13use crate::message_pool::msg_chain::MsgChainNode;
14use crate::shim::crypto::SignatureType;
15use crate::shim::{address::Address, econ::TokenAmount};
16use crate::state_manager::IdToAddressCache;
17use ahash::{HashMap, HashMapExt};
18use anyhow::{Context, bail, ensure};
19use rand::prelude::SliceRandom;
20use tracing::{debug, error, warn};
21
22use crate::shim::crypto::Signature;
23use crate::utils::cache::SizeTrackingLruCache;
24use crate::utils::get_size::CidWrapper;
25
26use super::{MpoolCtx, msg_pool::MessagePool, provider::Provider, utils::recover_sig};
27use crate::message_pool::{
28    Error, add_to_selected_msgs,
29    msg_chain::{Chains, NodeKey, create_message_chains},
30    msgpool::{MIN_GAS, pending_store::PendingStore},
31};
32
33type Pending = HashMap<Address, HashMap<u64, SignedMessage>>;
34
35// A cap on maximum number of message to include in a block
36const MAX_BLOCK_MSGS: usize = 16000;
37const MAX_BLOCKS: usize = 15;
38const CBOR_GEN_LIMIT: usize = 8192; // Same limits as in
39// [cbor-gen](https://github.com/whyrusleeping/cbor-gen/blob/cba3eeea9ae8ec4db1b7283e3654d8c18979affe/gen.go#L32), which Lotus uses.
40
41/// A structure that holds the selected messages for a block.
42/// It tracks the gas limit and the limits for different signature types
43/// to ensure that the block does not exceed the limits set by the protocol.
44struct SelectedMessages {
45    /// The messages selected for inclusion in the block.
46    msgs: Vec<SignedMessage>,
47    /// The remaining gas limit for the block.
48    gas_limit: u64,
49    /// The remaining limit for `secp256k1` messages in the block.
50    secp_limit: u64,
51    /// The remaining limit for `bls` messages in the block.
52    bls_limit: u64,
53}
54
55impl Default for SelectedMessages {
56    fn default() -> Self {
57        SelectedMessages {
58            msgs: Vec::new(),
59            gas_limit: crate::shim::econ::BLOCK_GAS_LIMIT,
60            secp_limit: CBOR_GEN_LIMIT as u64,
61            bls_limit: CBOR_GEN_LIMIT as u64,
62        }
63    }
64}
65
66impl SelectedMessages {
67    fn new(msgs: Vec<SignedMessage>, gas_limit: u64) -> Self {
68        SelectedMessages {
69            msgs,
70            gas_limit,
71            ..Default::default()
72        }
73    }
74
75    /// Returns the number of messages selected for inclusion in the block.
76    fn len(&self) -> usize {
77        self.msgs.len()
78    }
79
80    /// Truncates the selected messages to the specified length.
81    fn truncate(&mut self, len: usize) {
82        self.msgs.truncate(len);
83    }
84
85    /// Reduces the gas limit by the specified amount. It ensures that the gas limit does not
86    /// go below zero (which would cause a panic).
87    fn reduce_gas_limit(&mut self, gas: u64) {
88        self.gas_limit = self.gas_limit.saturating_sub(gas);
89    }
90
91    /// Reduces the BLS limit by the specified amount. It ensures that the BLS message limit does not
92    /// go below zero (which would cause a panic).
93    fn reduce_bls_limit(&mut self, bls: u64) {
94        self.bls_limit = self.bls_limit.saturating_sub(bls);
95    }
96
97    /// Reduces the `Secp256k1` limit by the specified amount. It ensures that the `Secp256k1` message limit
98    /// does not go below zero (which would cause a panic).
99    fn reduce_secp_limit(&mut self, secp: u64) {
100        self.secp_limit = self.secp_limit.saturating_sub(secp);
101    }
102
103    /// Extends the selected messages with the given messages.
104    fn extend(&mut self, msgs: Vec<SignedMessage>) {
105        self.msgs.extend(msgs);
106    }
107
108    /// Tries to add a message chain to the selected messages. Returns an error if the chain can't
109    /// be added due to block constraints.
110    fn try_to_add(&mut self, message_chain: MsgChainNode) -> anyhow::Result<()> {
111        let msg_chain_len = message_chain.msgs.len();
112        ensure!(
113            BLOCK_MESSAGE_LIMIT >= msg_chain_len + self.len(),
114            "Message chain is too long to fit in the block: {} messages, limit is {BLOCK_MESSAGE_LIMIT}",
115            msg_chain_len + self.len(),
116        );
117        ensure!(
118            self.gas_limit >= message_chain.gas_limit,
119            "Message chain gas limit is too high: {} gas, limit is {}",
120            message_chain.gas_limit,
121            self.gas_limit
122        );
123
124        match message_chain.sig_type {
125            Some(SignatureType::Bls) => {
126                ensure!(
127                    self.bls_limit >= msg_chain_len as u64,
128                    "BLS limit is too low: {msg_chain_len} messages, limit is {}",
129                    self.bls_limit
130                );
131
132                self.extend(message_chain.msgs);
133                self.reduce_bls_limit(msg_chain_len as u64);
134                self.reduce_gas_limit(message_chain.gas_limit);
135            }
136            Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
137                ensure!(
138                    self.secp_limit >= msg_chain_len as u64,
139                    "Secp256k1 limit is too low: {msg_chain_len} messages, limit is {}",
140                    self.secp_limit
141                );
142
143                self.extend(message_chain.msgs);
144                self.reduce_secp_limit(msg_chain_len as u64);
145                self.reduce_gas_limit(message_chain.gas_limit);
146            }
147            None => {
148                // This is a message with no signature type, which is not allowed in the current
149                // implementation. This _should_ never happen, but we handle it gracefully.
150                warn!("Tried to add a message chain with no signature type");
151            }
152        }
153        Ok(())
154    }
155
156    /// Tries to add a message chain with dependencies to the selected messages.
157    /// It will trim or invalidate if appropriate.
158    fn try_to_add_with_deps(
159        &mut self,
160        idx: usize,
161        chains: &mut Chains,
162        base_fee: &TokenAmount,
163    ) -> anyhow::Result<()> {
164        let message_chain = chains
165            .get_mut_at(idx)
166            .context("Couldn't find required message chain")?;
167        // compute the dependencies that must be merged and the gas limit including deps
168        let mut chain_gas_limit = message_chain.gas_limit;
169        let mut chain_msg_limit = message_chain.msgs.len();
170        let mut dep_gas_limit = 0;
171        let mut dep_message_limit = 0;
172        let selected_messages_limit = match message_chain.sig_type {
173            Some(SignatureType::Bls) => self.bls_limit as usize,
174            Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
175                self.secp_limit as usize
176            }
177            None => {
178                // This is a message with no signature type, which is not allowed in the current
179                // implementation. This _should_ never happen, but we handle it gracefully.
180                bail!("Tried to add a message chain with no signature type");
181            }
182        };
183        let selected_messages_limit =
184            selected_messages_limit.min(BLOCK_MESSAGE_LIMIT.saturating_sub(self.len()));
185
186        let mut chain_deps = vec![];
187        let mut cur_chain = message_chain.prev;
188        let _ = message_chain; // drop the mutable borrow to avoid conflicts
189        while let Some(cur_chn) = cur_chain {
190            let node = chains.get(cur_chn).unwrap();
191            if !node.merged {
192                chain_deps.push(cur_chn);
193                chain_gas_limit += node.gas_limit;
194                chain_msg_limit += node.msgs.len();
195                dep_gas_limit += node.gas_limit;
196                dep_message_limit += node.msgs.len();
197                cur_chain = node.prev;
198            } else {
199                break;
200            }
201        }
202
203        // the chain doesn't fit as-is, so trim / invalidate it and return false
204        if chain_gas_limit > self.gas_limit || chain_msg_limit > selected_messages_limit {
205            // it doesn't all fit; now we have to take into account the dependent chains before
206            // making a decision about trimming or invalidating.
207            // if the dependencies exceed the block limits, then we must invalidate the chain
208            // as it can never be included.
209            // Otherwise we can just trim and continue
210            if dep_gas_limit > self.gas_limit || dep_message_limit >= selected_messages_limit {
211                chains.invalidate(chains.get_key_at(idx));
212            } else {
213                // dependencies fit, just trim it
214                chains.trim_msgs_at(
215                    idx,
216                    self.gas_limit.saturating_sub(dep_gas_limit),
217                    selected_messages_limit.saturating_sub(dep_message_limit),
218                    base_fee,
219                );
220            }
221
222            bail!("Chain doesn't fit in the block");
223        }
224        for dep in chain_deps.iter().rev() {
225            let cur_chain = chains.get_mut(*dep);
226            if let Some(node) = cur_chain {
227                node.merged = true;
228                self.extend(node.msgs.clone());
229            } else {
230                bail!("Couldn't find required dependent message chain");
231            }
232        }
233
234        let message_chain = chains
235            .get_mut_at(idx)
236            .context("Couldn't find required message chain")?;
237        message_chain.merged = true;
238        self.extend(message_chain.msgs.clone());
239        self.reduce_gas_limit(chain_gas_limit);
240
241        match message_chain.sig_type {
242            Some(SignatureType::Bls) => {
243                self.reduce_bls_limit(chain_msg_limit as u64);
244            }
245            Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
246                self.reduce_secp_limit(chain_msg_limit as u64);
247            }
248            None => {
249                // This is a message with no signature type, which is not allowed in the current
250                // implementation. This _should_ never happen, but we handle it gracefully.
251                bail!("Tried to add a message chain with no signature type");
252            }
253        }
254
255        Ok(())
256    }
257
258    fn trim_chain_at(&mut self, chains: &mut Chains, idx: usize, base_fee: &TokenAmount) {
259        let message_chain = match chains.get_at(idx) {
260            Some(message_chain) => message_chain,
261            None => {
262                error!("Tried to trim a message chain that doesn't exist");
263                return;
264            }
265        };
266        let msg_limit = BLOCK_MESSAGE_LIMIT.saturating_sub(self.len());
267        let msg_limit = match message_chain.sig_type {
268            Some(SignatureType::Bls) => std::cmp::min(self.bls_limit, msg_limit as u64),
269            Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
270                std::cmp::min(self.secp_limit, msg_limit as u64)
271            }
272            _ => {
273                // This is a message with no signature type, which is not allowed in the current
274                // implementation. This _should_ never happen, but we handle it gracefully.
275                error!("Tried to trim a message chain with no signature type");
276                return;
277            }
278        };
279
280        if message_chain.gas_limit > self.gas_limit || message_chain.msgs.len() > msg_limit as usize
281        {
282            chains.trim_msgs_at(idx, self.gas_limit, msg_limit as usize, base_fee);
283        }
284    }
285}
286
287impl<T> MessagePool<T>
288where
289    T: Provider,
290{
291    /// Forest employs a sophisticated algorithm for selecting messages
292    /// for inclusion from the pool, given the ticket quality of a miner.
293    /// This method selects messages for including in a block.
294    pub fn select_messages(&self, ts: &Tipset, tq: f64) -> Result<Vec<SignedMessage>, Error> {
295        let cur_ts = self.current_tipset();
296        // if the ticket quality is high enough that the first block has higher
297        // probability than any other block, then we don't bother with optimal
298        // selection because the first block will always have higher effective
299        // performance. Otherwise we select message optimally based on effective
300        // performance of chains.
301        let mut msgs = if tq > 0.84 {
302            self.select_messages_greedy(&cur_ts, ts)
303        } else {
304            self.select_messages_optimal(&cur_ts, ts, tq)
305        }?;
306
307        if msgs.len() > MAX_BLOCK_MSGS {
308            warn!(
309                "Message selection chose too many messages: {} > {MAX_BLOCK_MSGS}",
310                msgs.len(),
311            );
312            msgs.truncate(MAX_BLOCK_MSGS)
313        }
314
315        Ok(msgs.msgs)
316    }
317
318    fn select_messages_greedy(
319        &self,
320        cur_ts: &Tipset,
321        ts: &Tipset,
322    ) -> Result<SelectedMessages, Error> {
323        let base_fee = self.api.chain_compute_base_fee(ts)?;
324
325        // 0. Load messages from the target tipset; if it is the same as the current
326        // tipset in    the mpool, then this is just the pending messages
327        let mut pending = self.get_pending_messages(cur_ts, ts)?;
328
329        if pending.is_empty() {
330            return Ok(SelectedMessages::default());
331        }
332
333        // 0b. Select all priority messages that fit in the block
334        let selected_msgs = self.select_priority_messages(&mut pending, &base_fee, ts)?;
335
336        // check if block has been filled
337        if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
338            return Ok(selected_msgs);
339        }
340
341        // 1. Create a list of dependent message chains with maximal gas reward per
342        // limit consumed
343        let mut chains = Chains::new();
344        for (actor, mset) in pending.into_iter() {
345            create_message_chains(
346                self.api.as_ref(),
347                &actor,
348                &mset,
349                &base_fee,
350                ts,
351                &mut chains,
352                &self.chain_config,
353            )?;
354        }
355
356        Ok(merge_and_trim(
357            &mut chains,
358            selected_msgs,
359            &base_fee,
360            MIN_GAS,
361        ))
362    }
363
364    #[allow(clippy::indexing_slicing)]
365    fn select_messages_optimal(
366        &self,
367        cur_ts: &Tipset,
368        target_tipset: &Tipset,
369        ticket_quality: f64,
370    ) -> Result<SelectedMessages, Error> {
371        let base_fee = self.api.chain_compute_base_fee(target_tipset)?;
372
373        // 0. Load messages from the target tipset; if it is the same as the current
374        // tipset in    the mpool, then this is just the pending messages
375        let mut pending = self.get_pending_messages(cur_ts, target_tipset)?;
376
377        if pending.is_empty() {
378            return Ok(SelectedMessages::default());
379        }
380
381        // 0b. Select all priority messages that fit in the block
382        let mut selected_msgs =
383            self.select_priority_messages(&mut pending, &base_fee, target_tipset)?;
384
385        // check if block has been filled
386        if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
387            return Ok(selected_msgs);
388        }
389
390        // 1. Create a list of dependent message chains with maximal gas reward per
391        // limit consumed
392        let mut chains = Chains::new();
393        for (actor, mset) in pending.into_iter() {
394            create_message_chains(
395                self.api.as_ref(),
396                &actor,
397                &mset,
398                &base_fee,
399                target_tipset,
400                &mut chains,
401                &self.chain_config,
402            )?;
403        }
404
405        // 2. Sort the chains
406        chains.sort(false);
407
408        if chains.get_at(0).is_some_and(|it| it.gas_perf < 0.0) {
409            tracing::warn!(
410                "all messages in mpool have non-positive gas performance {}",
411                chains[0].gas_perf
412            );
413            return Ok(selected_msgs);
414        }
415
416        // 3. Partition chains into blocks (without trimming)
417        //    we use the full block_gas_limit (as opposed to the residual `gas_limit`
418        //    from the priority message selection) as we have to account for
419        //    what other block providers are doing
420        let mut next_chain = 0;
421        let mut partitions: Vec<Vec<NodeKey>> = vec![vec![]; MAX_BLOCKS];
422        let mut i = 0;
423        while i < MAX_BLOCKS && next_chain < chains.len() {
424            let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
425            let mut msg_limit = BLOCK_MESSAGE_LIMIT;
426            while next_chain < chains.len() {
427                let chain_key = chains.key_vec[next_chain];
428                next_chain += 1;
429                partitions[i].push(chain_key);
430                let chain = chains.get(chain_key).unwrap();
431                let chain_gas_limit = chain.gas_limit;
432                if gas_limit < chain_gas_limit {
433                    break;
434                }
435                gas_limit = gas_limit.saturating_sub(chain_gas_limit);
436                msg_limit = msg_limit.saturating_sub(chain.msgs.len());
437                if gas_limit < MIN_GAS || msg_limit == 0 {
438                    break;
439                }
440            }
441            i += 1;
442        }
443
444        // 4. Compute effective performance for each chain, based on the partition they
445        // fall into    The effective performance is the gas_perf of the chain *
446        // block probability
447        let block_prob = crate::message_pool::block_probabilities(ticket_quality);
448        let mut eff_chains = 0;
449        for i in 0..MAX_BLOCKS {
450            for k in &partitions[i] {
451                if let Some(node) = chains.get_mut(*k) {
452                    node.eff_perf = node.gas_perf * block_prob[i];
453                }
454            }
455            eff_chains += partitions[i].len();
456        }
457
458        // nullify the effective performance of chains that don't fit in any partition
459        for i in eff_chains..chains.len() {
460            if let Some(node) = chains.get_mut_at(i) {
461                node.set_null_effective_perf();
462            }
463        }
464
465        // 5. Re-sort the chains based on effective performance
466        chains.sort_effective();
467
468        // 6. Merge the head chains to produce the list of messages selected for
469        //    inclusion subject to the residual block limits
470        //    When a chain is merged in, all its previous dependent chains *must* also
471        //    be merged in or we'll have a broken block
472        let mut last = chains.len();
473        for i in 0..chains.len() {
474            // did we run out of performing chains?
475            if chains[i].gas_perf < 0.0 {
476                break;
477            }
478
479            // has it already been merged?
480            if chains[i].merged {
481                continue;
482            }
483
484            match selected_msgs.try_to_add_with_deps(i, &mut chains, &base_fee) {
485                Ok(_) => {
486                    // adjust the effective performance for all subsequent chains
487                    if let Some(next_key) = chains[i].next {
488                        let next_node = chains.get_mut(next_key).unwrap();
489                        if next_node.eff_perf > 0.0 {
490                            next_node.eff_perf += next_node.parent_offset;
491                            let mut next_next_key = next_node.next;
492                            while let Some(nnk) = next_next_key {
493                                let (nn_node, prev_perfs) = chains.get_mut_with_prev_eff(nnk);
494                                if let Some(nn_node) = nn_node {
495                                    if nn_node.eff_perf > 0.0 {
496                                        nn_node.set_eff_perf(prev_perfs);
497                                        next_next_key = nn_node.next;
498                                    } else {
499                                        break;
500                                    }
501                                } else {
502                                    break;
503                                }
504                            }
505                        }
506                    }
507
508                    // re-sort to account for already merged chains and effective performance
509                    // adjustments the sort *must* be stable or we end up getting
510                    // negative gasPerfs pushed up.
511                    chains.sort_range_effective(i + 1..);
512
513                    continue;
514                }
515                Err(e) => {
516                    debug!("Failed to add message chain with dependencies: {e:#}");
517                }
518            }
519
520            // we can't fit this chain and its dependencies because of block gasLimit -- we
521            // are at the edge
522            last = i;
523            break;
524        }
525
526        // 7. We have reached the edge of what can fit wholesale; if we still hae
527        // available    gasLimit to pack some more chains, then trim the last
528        // chain and push it down.
529        //
530        // Trimming invalidates subsequent dependent chains so that they can't be selected
531        // as their dependency cannot be (fully) included. We do this in a loop because the blocker
532        // might have been inordinately large and we might have to do it
533        // multiple times to satisfy tail packing.
534        'tail_loop: while selected_msgs.gas_limit >= MIN_GAS && last < chains.len() {
535            if !chains[last].valid {
536                // the chain has been invalidated, we can't use it
537                last += 1;
538                continue;
539            }
540
541            // trim if necessary
542            selected_msgs.trim_chain_at(&mut chains, last, &base_fee);
543
544            // push down if it hasn't been invalidated
545            if chains[last].valid {
546                for i in last..chains.len() - 1 {
547                    if chains[i].cmp_effective(&chains[i + 1]) == Ordering::Greater {
548                        break;
549                    }
550                    chains.key_vec.swap(i, i + 1);
551                }
552            }
553
554            // select the next (valid and fitting) chain and its dependencies for inclusion
555            let lst = last; // to make clippy happy, see: https://rust-lang.github.io/rust-clippy/master/index.html#mut_range_bound
556            for i in lst..chains.len() {
557                let chain = &mut chains[i];
558                // has the chain been invalidated
559                if !chain.valid {
560                    continue;
561                }
562
563                // has it already been merged?
564                if chain.merged {
565                    continue;
566                }
567
568                // if gasPerf < 0 we have no more profitable chains
569                if chain.gas_perf < 0.0 {
570                    break 'tail_loop;
571                }
572
573                match selected_msgs.try_to_add_with_deps(i, &mut chains, &base_fee) {
574                    Ok(_) => continue,
575                    Err(e) => debug!("Failed to add message chain with dependencies: {e:#}"),
576                }
577
578                continue 'tail_loop;
579            }
580
581            // the merge loop ended after processing all the chains and we we probably have
582            // still gas to spare; end the loop.
583            break;
584        }
585
586        // if we have room to spare, pick some random (non-negative) chains to fill
587        // the block
588        // we pick randomly so that we minimize the probability of
589        // duplication among all block producers
590        if selected_msgs.gas_limit >= MIN_GAS && selected_msgs.msgs.len() <= BLOCK_MESSAGE_LIMIT {
591            let pre_random_length = selected_msgs.len();
592
593            chains
594                .key_vec
595                .shuffle(&mut crate::utils::rand::forest_rng());
596
597            for i in 0..chains.len() {
598                if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
599                    break;
600                }
601
602                // has it been merged or invalidated?
603                if chains[i].merged || !chains[i].valid {
604                    continue;
605                }
606
607                // is it negative?
608                if chains[i].gas_perf < 0.0 {
609                    continue;
610                }
611
612                if selected_msgs
613                    .try_to_add_with_deps(i, &mut chains, &base_fee)
614                    .is_ok()
615                {
616                    // we added it, continue
617                    continue;
618                }
619
620                if chains[i].valid {
621                    // chain got trimmer on the previous call to `try_to_add_with_deps`, so it can
622                    // now be included.
623                    selected_msgs
624                        .try_to_add_with_deps(i, &mut chains, &base_fee)
625                        .context("Failed to add message chain with dependencies")?;
626                    continue;
627                }
628            }
629
630            if selected_msgs.len() != pre_random_length {
631                tracing::warn!(
632                    "optimal selection failed to pack a block; picked {} messages with random selection",
633                    selected_msgs.len() - pre_random_length
634                );
635            }
636        }
637
638        Ok(selected_msgs)
639    }
640
641    fn get_pending_messages(&self, cur_ts: &Tipset, ts: &Tipset) -> Result<Pending, Error> {
642        let snapshot = self.pending_store.snapshot();
643        let mut result: Pending = HashMap::with_capacity(snapshot.len());
644        for (a, mset) in snapshot {
645            result.insert(a, mset.msgs);
646        }
647
648        if cur_ts.epoch() == ts.epoch() && cur_ts == ts {
649            return Ok(result);
650        }
651
652        // Run head change to do reorg detection
653        run_head_change(
654            self.api.as_ref(),
655            &self.bls_sig_cache,
656            &self.pending_store,
657            &self.key_cache,
658            cur_ts.clone(),
659            ts.clone(),
660            &mut result,
661        )?;
662
663        Ok(result)
664    }
665
666    fn select_priority_messages(
667        &self,
668        pending: &mut Pending,
669        base_fee: &TokenAmount,
670        ts: &Tipset,
671    ) -> Result<SelectedMessages, Error> {
672        let result = Vec::with_capacity(self.config.size_limit_low() as usize);
673        let gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
674        let min_gas = 1298450;
675
676        // 1. Get priority actor chains
677        let priority = self.config.priority_addrs();
678        let mut chains = Chains::new();
679        for actor in priority.iter() {
680            // remove actor from pending set as we are processing these messages.
681            if let Some(mset) = pending.remove(actor) {
682                // create chains for the priority actor
683                create_message_chains(
684                    self.api.as_ref(),
685                    actor,
686                    &mset,
687                    base_fee,
688                    ts,
689                    &mut chains,
690                    &self.chain_config,
691                )?;
692            }
693        }
694
695        if chains.is_empty() {
696            return Ok(SelectedMessages::new(Vec::new(), gas_limit));
697        }
698
699        Ok(merge_and_trim(
700            &mut chains,
701            SelectedMessages::new(result, gas_limit),
702            base_fee,
703            min_gas,
704        ))
705    }
706}
707
708/// Returns merged and trimmed messages with the gas limit
709#[allow(clippy::indexing_slicing)]
710fn merge_and_trim(
711    chains: &mut Chains,
712    mut selected_msgs: SelectedMessages,
713    base_fee: &TokenAmount,
714    min_gas: u64,
715) -> SelectedMessages {
716    if chains.is_empty() {
717        return selected_msgs;
718    }
719
720    // 2. Sort the chains
721    chains.sort(true);
722
723    let first_chain_gas_perf = chains[0].gas_perf;
724
725    if !chains.is_empty() && first_chain_gas_perf < 0.0 {
726        warn!(
727            "all priority messages in mpool have negative gas performance bestGasPerf: {}",
728            first_chain_gas_perf
729        );
730        return selected_msgs;
731    }
732
733    // 3. Merge chains until the block limit, as long as they have non-negative gas
734    // performance
735    let mut last = chains.len();
736    for i in 0..chains.len() {
737        let node = &chains[i];
738
739        if node.gas_perf < 0.0 {
740            break;
741        }
742
743        if selected_msgs.try_to_add(node.clone()).is_ok() {
744            // there was room, we added the chain, keep going
745            continue;
746        }
747
748        // we can't fit this chain because of block gas limit -- we are at the edge
749        last = i;
750        break;
751    }
752
753    'tail_loop: while selected_msgs.gas_limit >= min_gas && last < chains.len() {
754        // trim, discard negative performing messages
755        selected_msgs.trim_chain_at(chains, last, base_fee);
756
757        // push down if it hasn't been invalidated
758        let node = &chains[last];
759        if node.valid {
760            for i in last..chains.len() - 1 {
761                // slot_chains
762                let cur_node = &chains[i];
763                let next_node = &chains[i + 1];
764                if cur_node.compare(next_node) == Ordering::Greater {
765                    break;
766                }
767
768                chains.key_vec.swap(i, i + 1);
769            }
770        }
771
772        // select the next (valid and fitting) chain for inclusion
773        let lst = last; // to make clippy happy, see: https://rust-lang.github.io/rust-clippy/master/index.html#mut_range_bound
774        for i in lst..chains.len() {
775            let chain = chains[i].clone();
776            if !chain.valid {
777                continue;
778            }
779
780            // if gas_perf < 0 then we have no more profitable chains
781            if chain.gas_perf < 0.0 {
782                break 'tail_loop;
783            }
784
785            // does it fit in the block?
786            if selected_msgs.try_to_add(chain).is_ok() {
787                // there was room, we added the chain, keep going
788                continue;
789            }
790
791            // this chain needs to be trimmed
792            last += i;
793            continue 'tail_loop;
794        }
795
796        break;
797    }
798
799    selected_msgs
800}
801
802/// Like `head_change`, except it doesn't change the state of the `MessagePool`.
803/// It simulates a head change call.
804// This logic should probably be implemented in the ChainStore. It handles
805// reorgs.
806pub(in crate::message_pool) fn run_head_change<T>(
807    api: &T,
808    bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
809    pending_store: &PendingStore,
810    key_cache: &IdToAddressCache,
811    from: Tipset,
812    to: Tipset,
813    rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
814) -> Result<(), Error>
815where
816    T: Provider,
817{
818    let mut left = from;
819    let mut right = to;
820    let mut left_chain = Vec::new();
821    let mut right_chain = Vec::new();
822    while left != right {
823        if left.epoch() > right.epoch() {
824            left_chain.push(left.clone());
825            let par = api.load_tipset(left.parents())?;
826            left = par;
827        } else {
828            right_chain.push(right.clone());
829            let par = api.load_tipset(right.parents())?;
830            right = par;
831        }
832    }
833    for ts in left_chain {
834        let mut msgs: Vec<SignedMessage> = Vec::new();
835        for block in ts.block_headers() {
836            let (umsg, smsgs) = api.messages_for_block(block)?;
837            msgs.extend(smsgs);
838            for msg in umsg {
839                let msg_cid = msg.cid();
840                if let Ok(smsg) = recover_sig(bls_sig_cache, msg) {
841                    msgs.push(smsg);
842                } else {
843                    tracing::debug!("could not recover signature for bls message {msg_cid}");
844                }
845            }
846        }
847        for msg in msgs {
848            add_to_selected_msgs(msg, rmsgs);
849        }
850    }
851
852    for ts in right_chain {
853        let mpool_ctx = MpoolCtx {
854            api,
855            key_cache,
856            pending_store,
857            ts: &ts,
858        };
859        for b in ts.block_headers() {
860            let (msgs, smsgs) = api.messages_for_block(b)?;
861
862            for msg in smsgs {
863                mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), rmsgs)?;
864            }
865            for msg in msgs {
866                mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, rmsgs)?;
867            }
868        }
869    }
870    Ok(())
871}
872
873#[cfg(test)]
874mod test_selection {
875    use std::sync::Arc;
876
877    use super::*;
878    use crate::db::MemoryDB;
879    use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
880    use crate::message_pool::msgpool::{
881        test_provider::{TestApi, mock_block},
882        tests::{create_fake_smsg, create_smsg},
883    };
884    use crate::shim::crypto::SignatureType;
885    use crate::shim::econ::BLOCK_GAS_LIMIT;
886    use tokio::task::JoinSet;
887
888    const TEST_GAS_LIMIT: i64 = 6955002;
889
890    fn make_test_mpool(joinset: &mut JoinSet<anyhow::Result<()>>) -> MessagePool<TestApi> {
891        let tma = TestApi::default();
892        let (tx, _rx) = flume::bounded(50);
893        MessagePool::new(tma, tx, Default::default(), Arc::default(), joinset).unwrap()
894    }
895
896    /// Creates a tipset with a mocked block and performs a head change to setup the
897    /// [`MessagePool`] for testing.
898    async fn mock_tipset(mpool: &MessagePool<TestApi>) -> Tipset {
899        let b1 = mock_block(1, 1);
900        let ts = Tipset::from(&b1);
901        mpool
902            .apply_head_change(Vec::new(), vec![Tipset::from(b1)])
903            .await
904            .unwrap();
905        ts
906    }
907
908    #[tokio::test]
909    async fn basic_message_selection() {
910        let mut joinset = JoinSet::new();
911        let mpool = make_test_mpool(&mut joinset);
912
913        let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
914        let mut w1 = Wallet::new(ks1);
915        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
916
917        let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
918        let mut w2 = Wallet::new(ks2);
919        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
920
921        let ts = mock_tipset(&mpool).await;
922
923        mpool
924            .api
925            .set_state_balance_raw(&a1, TokenAmount::from_whole(1));
926        mpool
927            .api
928            .set_state_balance_raw(&a2, TokenAmount::from_whole(1));
929
930        // we create 10 messages from each actor to another, with the first actor paying
931        // higher gas prices than the second; we expect message selection to
932        // order his messages first
933        for i in 0..10 {
934            let m = create_smsg(&a2, &a1, &mut w1, i, TEST_GAS_LIMIT, 2 * i + 1);
935            mpool.add(m).unwrap();
936        }
937        for i in 0..10 {
938            let m = create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1);
939            mpool.add(m).unwrap();
940        }
941
942        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
943
944        assert_eq!(msgs.len(), 20, "Expected 20 messages, got {}", msgs.len());
945
946        let mut next_nonce = 0;
947        for (i, msg) in msgs.iter().enumerate().take(10) {
948            assert_eq!(
949                msg.from(),
950                a1,
951                "first 10 returned messages should be from actor a1 {i}",
952            );
953            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
954            next_nonce += 1;
955        }
956
957        next_nonce = 0;
958        for (i, msg) in msgs.iter().enumerate().take(20).skip(10) {
959            assert_eq!(
960                msg.from(),
961                a2,
962                "next 10 returned messages should be from actor a2 {i}",
963            );
964            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
965            next_nonce += 1;
966        }
967
968        // now we make a block with all the messages and advance the chain
969        let b2 = mpool.api.next_block();
970        mpool.api.set_block_messages(&b2, msgs);
971        mpool
972            .apply_head_change(Vec::new(), vec![Tipset::from(b2)])
973            .await
974            .unwrap();
975
976        // we should now have no pending messages in the MessagePool
977        let remaining = mpool.pending_store.snapshot();
978        assert!(
979            remaining.is_empty(),
980            "Expected no pending messages, but got {}",
981            remaining.len()
982        );
983
984        // create a block and advance the chain without applying to the mpool
985        let mut msgs = Vec::with_capacity(20);
986        for i in 10..20 {
987            msgs.push(create_smsg(&a2, &a1, &mut w1, i, TEST_GAS_LIMIT, 2 * i + 1));
988            msgs.push(create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1));
989        }
990        let b3 = mpool.api.next_block();
991        let ts3 = Tipset::from(&b3);
992        mpool.api.set_block_messages(&b3, msgs);
993
994        // Set state sequence to 20 so that nonces 20..30 don't exceed the nonce gap.
995        mpool.api.set_state_sequence(&a1, 20);
996        mpool.api.set_state_sequence(&a2, 20);
997
998        // now create another set of messages and add them to the mpool
999        for i in 20..30 {
1000            mpool
1001                .add(create_smsg(
1002                    &a2,
1003                    &a1,
1004                    &mut w1,
1005                    i,
1006                    TEST_GAS_LIMIT,
1007                    2 * i + 200,
1008                ))
1009                .unwrap();
1010            mpool
1011                .add(create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1))
1012                .unwrap();
1013        }
1014        // select messages in the last tipset; this should include the missed messages
1015        // as well as the last messages we added, with the first actor's
1016        // messages first first we need to update the nonce on the api
1017        mpool.api.set_state_sequence(&a1, 10);
1018        mpool.api.set_state_sequence(&a2, 10);
1019        let msgs = mpool.select_messages(&ts3, 1.0).unwrap();
1020
1021        assert_eq!(
1022            msgs.len(),
1023            20,
1024            "Expected 20 messages, but got {}",
1025            msgs.len()
1026        );
1027
1028        let mut next_nonce = 20;
1029        for msg in msgs.iter().take(10) {
1030            assert_eq!(
1031                msg.from(),
1032                a1,
1033                "first 10 returned messages should be from actor a1"
1034            );
1035            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1036            next_nonce += 1;
1037        }
1038        next_nonce = 20;
1039        for msg in msgs.iter().take(20).skip(10) {
1040            assert_eq!(
1041                msg.from(),
1042                a2,
1043                "next 10 returned messages should be from actor a2"
1044            );
1045            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1046            next_nonce += 1;
1047        }
1048    }
1049
1050    #[tokio::test]
1051    async fn message_selection_trimming_gas() {
1052        let mut joinset = JoinSet::new();
1053        let mpool = make_test_mpool(&mut joinset);
1054        let ts = mock_tipset(&mpool).await;
1055        let api = mpool.api.clone();
1056
1057        let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1058        let mut w1 = Wallet::new(ks1);
1059        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1060
1061        let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1062        let mut w2 = Wallet::new(ks2);
1063        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1064
1065        api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1066        api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1067
1068        let nmsgs = (crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT) + 1;
1069
1070        // make many small chains for the two actors
1071        for i in 0..nmsgs {
1072            let bias = (nmsgs - i) / 3;
1073            let m = create_fake_smsg(
1074                &mpool,
1075                &a2,
1076                &a1,
1077                i as u64,
1078                TEST_GAS_LIMIT,
1079                (1 + i % 3 + bias) as u64,
1080            );
1081            mpool.add(m).unwrap();
1082            let m = create_fake_smsg(
1083                &mpool,
1084                &a1,
1085                &a2,
1086                i as u64,
1087                TEST_GAS_LIMIT,
1088                (1 + i % 3 + bias) as u64,
1089            );
1090            mpool.add(m).unwrap();
1091        }
1092
1093        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1094
1095        let expected = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1096        assert_eq!(msgs.len(), expected as usize);
1097        let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1098        assert!(m_gas_limit <= crate::shim::econ::BLOCK_GAS_LIMIT);
1099    }
1100
1101    #[tokio::test]
1102    async fn message_selection_trimming_msgs_basic() {
1103        let mut joinset = JoinSet::new();
1104        let mpool = make_test_mpool(&mut joinset);
1105        let ts = mock_tipset(&mpool).await;
1106        let api = mpool.api.clone();
1107
1108        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1109        let mut wallet = Wallet::new(keystore);
1110        let address = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
1111
1112        api.set_state_balance_raw(&address, TokenAmount::from_whole(1));
1113
1114        // create a larger than selectable chain
1115        for i in 0..BLOCK_MESSAGE_LIMIT {
1116            let msg = create_fake_smsg(&mpool, &address, &address, i as u64, 200_000, 100);
1117            mpool.add(msg).unwrap();
1118        }
1119
1120        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1121        assert_eq!(
1122            msgs.len(),
1123            CBOR_GEN_LIMIT,
1124            "Expected {CBOR_GEN_LIMIT} messages, got {}",
1125            msgs.len()
1126        );
1127
1128        // check that the gas limit is not exceeded
1129        let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1130        assert!(
1131            m_gas_limit <= BLOCK_GAS_LIMIT,
1132            "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1133        );
1134    }
1135
1136    #[tokio::test]
1137    async fn message_selection_trimming_msgs_two_senders() {
1138        let mut joinset = JoinSet::new();
1139        let mpool = make_test_mpool(&mut joinset);
1140        let ts = mock_tipset(&mpool).await;
1141        let api = mpool.api.clone();
1142
1143        let keystore_1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1144        let mut wallet_1 = Wallet::new(keystore_1);
1145        let address_1 = wallet_1.generate_addr(SignatureType::Secp256k1).unwrap();
1146
1147        let keystore_2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1148        let mut wallet_2 = Wallet::new(keystore_2);
1149        let address_2 = wallet_2.generate_addr(SignatureType::Bls).unwrap();
1150
1151        api.set_state_balance_raw(&address_1, TokenAmount::from_whole(1));
1152        api.set_state_balance_raw(&address_2, TokenAmount::from_whole(1));
1153
1154        // create 2 larger than selectable chains
1155        for i in 0..BLOCK_MESSAGE_LIMIT {
1156            let msg = create_smsg(
1157                &address_2,
1158                &address_1,
1159                &mut wallet_1,
1160                i as u64,
1161                300_000,
1162                100,
1163            );
1164            mpool.add(msg).unwrap();
1165            // higher has price, those should be preferred and fill the block up to
1166            // the [`CBOR_GEN_LIMIT`] messages.
1167            let msg = create_smsg(
1168                &address_1,
1169                &address_2,
1170                &mut wallet_2,
1171                i as u64,
1172                300_000,
1173                1000,
1174            );
1175            mpool.add(msg).unwrap();
1176        }
1177        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1178        // check that the gas limit is not exceeded
1179        let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1180        assert!(
1181            m_gas_limit <= BLOCK_GAS_LIMIT,
1182            "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1183        );
1184        let bls_msgs = msgs.iter().filter(|m| m.is_bls()).count();
1185        assert_eq!(
1186            CBOR_GEN_LIMIT, bls_msgs,
1187            "Expected {CBOR_GEN_LIMIT} bls messages, got {bls_msgs}."
1188        );
1189        assert_eq!(
1190            msgs.len(),
1191            BLOCK_MESSAGE_LIMIT,
1192            "Expected {BLOCK_MESSAGE_LIMIT} messages, got {}",
1193            msgs.len()
1194        );
1195    }
1196
1197    #[tokio::test]
1198    async fn message_selection_trimming_msgs_two_senders_complex() {
1199        let mut joinset = JoinSet::new();
1200        let mpool = make_test_mpool(&mut joinset);
1201        let ts = mock_tipset(&mpool).await;
1202        let api = mpool.api.clone();
1203
1204        let keystore_1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1205        let mut wallet_1 = Wallet::new(keystore_1);
1206        let address_1 = wallet_1.generate_addr(SignatureType::Secp256k1).unwrap();
1207
1208        let keystore_2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1209        let mut wallet_2 = Wallet::new(keystore_2);
1210        let address_2 = wallet_2.generate_addr(SignatureType::Bls).unwrap();
1211
1212        api.set_state_balance_raw(&address_1, TokenAmount::from_whole(1));
1213        api.set_state_balance_raw(&address_2, TokenAmount::from_whole(1));
1214
1215        // create two almost max-length chains of equal value
1216        let mut counter = 0;
1217        for i in 0..CBOR_GEN_LIMIT {
1218            counter += 1;
1219            let msg = create_smsg(
1220                &address_2,
1221                &address_1,
1222                &mut wallet_1,
1223                i as u64,
1224                300_000,
1225                100,
1226            );
1227            mpool.add(msg).unwrap();
1228            // higher has price, those should be preferred and fill the block up to
1229            // the [`CBOR_GEN_LIMIT`] messages.
1230            let msg = create_smsg(
1231                &address_1,
1232                &address_2,
1233                &mut wallet_2,
1234                i as u64,
1235                300_000,
1236                100,
1237            );
1238            mpool.add(msg).unwrap();
1239        }
1240
1241        // address_1 8192th message is worth more than address_2 8192th message
1242        let msg = create_smsg(
1243            &address_2,
1244            &address_1,
1245            &mut wallet_1,
1246            counter as u64,
1247            300_000,
1248            1000,
1249        );
1250        mpool.add(msg).unwrap();
1251
1252        let msg = create_smsg(
1253            &address_1,
1254            &address_2,
1255            &mut wallet_2,
1256            counter as u64,
1257            300_000,
1258            100,
1259        );
1260        mpool.add(msg).unwrap();
1261
1262        counter += 1;
1263
1264        // address 2 (uneselectable) message is worth so much!
1265        let msg = create_smsg(
1266            &address_2,
1267            &address_1,
1268            &mut wallet_1,
1269            counter as u64,
1270            400_000,
1271            1_000_000,
1272        );
1273        mpool.add(msg).unwrap();
1274
1275        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1276        // check that the gas limit is not exceeded
1277        let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1278        assert!(
1279            m_gas_limit <= BLOCK_GAS_LIMIT,
1280            "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1281        );
1282        // We should have taken the SECP chain from address_1.
1283        let secps_len = msgs.iter().filter(|m| m.is_secp256k1()).count();
1284        assert_eq!(
1285            CBOR_GEN_LIMIT, secps_len,
1286            "Expected {CBOR_GEN_LIMIT} secp messages, got {secps_len}."
1287        );
1288        // The remaining messages should be BLS messages.
1289        assert_eq!(
1290            msgs.len(),
1291            BLOCK_MESSAGE_LIMIT,
1292            "Expected {BLOCK_MESSAGE_LIMIT} messages, got {}",
1293            msgs.len()
1294        );
1295    }
1296
1297    #[tokio::test]
1298    async fn message_selection_priority() {
1299        let db = MemoryDB::default();
1300
1301        let mut joinset = JoinSet::new();
1302        let mut mpool = make_test_mpool(&mut joinset);
1303        let ts = mock_tipset(&mpool).await;
1304        let api = mpool.api.clone();
1305
1306        let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1307        let mut w1 = Wallet::new(ks1);
1308        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1309
1310        let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1311        let mut w2 = Wallet::new(ks2);
1312        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1313
1314        // set priority addrs to a1
1315        let mut mpool_cfg = mpool.get_config().clone();
1316        mpool_cfg.priority_addrs.push(a1);
1317        mpool.set_config(&db, mpool_cfg).unwrap();
1318
1319        // let gas_limit = 6955002;
1320        api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1321        api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1322
1323        let nmsgs = 10;
1324
1325        // make many small chains for the two actors
1326        for i in 0..nmsgs {
1327            let bias = (nmsgs - i) / 3;
1328            let m = create_smsg(
1329                &a2,
1330                &a1,
1331                &mut w1,
1332                i as u64,
1333                TEST_GAS_LIMIT,
1334                (1 + i % 3 + bias) as u64,
1335            );
1336            mpool.add(m).unwrap();
1337            let m = create_smsg(
1338                &a1,
1339                &a2,
1340                &mut w2,
1341                i as u64,
1342                TEST_GAS_LIMIT,
1343                (1 + i % 3 + bias) as u64,
1344            );
1345            mpool.add(m).unwrap();
1346        }
1347
1348        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1349
1350        assert_eq!(msgs.len(), 20);
1351
1352        let mut next_nonce = 0;
1353        for msg in msgs.iter().take(10) {
1354            assert_eq!(
1355                msg.from(),
1356                a1,
1357                "first 10 returned messages should be from actor a1"
1358            );
1359            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1360            next_nonce += 1;
1361        }
1362        next_nonce = 0;
1363        for msg in msgs.iter().take(20).skip(10) {
1364            assert_eq!(
1365                msg.from(),
1366                a2,
1367                "next 10 returned messages should be from actor a2"
1368            );
1369            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1370            next_nonce += 1;
1371        }
1372    }
1373
1374    #[tokio::test]
1375    async fn test_optimal_msg_selection1() {
1376        // this test uses just a single actor sending messages with a low tq
1377        // the chain dependent merging algorithm should pick messages from the actor
1378        // from the start
1379        let mut joinset = JoinSet::new();
1380        let mpool = make_test_mpool(&mut joinset);
1381        let ts = mock_tipset(&mpool).await;
1382        let api = mpool.api.clone();
1383
1384        // create two actors
1385        let mut w1 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1386        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1387        let mut w2 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1388        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1389
1390        api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1391        api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1392
1393        let n_msgs = 10 * crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1394
1395        // we create n_msgs messages from each actor to another, with the first actor paying
1396        // higher gas prices than the second; we expect message selection to
1397        // order his messages first
1398        for i in 0..(n_msgs as usize) {
1399            let bias = (n_msgs as usize - i) / 3;
1400            let m = create_fake_smsg(
1401                &mpool,
1402                &a2,
1403                &a1,
1404                i as u64,
1405                TEST_GAS_LIMIT,
1406                (1 + i % 3 + bias) as u64,
1407            );
1408            mpool.add(m).unwrap();
1409        }
1410
1411        let msgs = mpool.select_messages(&ts, 0.25).unwrap();
1412
1413        let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1414
1415        assert_eq!(msgs.len(), expected_msgs as usize);
1416
1417        for (next_nonce, m) in msgs.into_iter().enumerate() {
1418            assert_eq!(m.from(), a1, "Expected message from a1");
1419            assert_eq!(
1420                m.message().sequence,
1421                next_nonce as u64,
1422                "expected nonce {} but got {}",
1423                next_nonce,
1424                m.message().sequence
1425            );
1426        }
1427    }
1428
1429    #[tokio::test]
1430    async fn test_optimal_msg_selection2() {
1431        let mut joinset = JoinSet::new();
1432        // this test uses two actors sending messages to each other, with the first
1433        // actor paying (much) higher gas premium than the second.
1434        // We select with a low ticket quality; the chain dependent merging algorithm
1435        // should pick messages from the second actor from the start
1436        let mpool = make_test_mpool(&mut joinset);
1437        let ts = mock_tipset(&mpool).await;
1438        let api = mpool.api.clone();
1439
1440        // create two actors
1441        let mut w1 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1442        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1443        let mut w2 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1444        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1445
1446        api.set_state_balance_raw(&a1, TokenAmount::from_whole(1)); // in FIL
1447        api.set_state_balance_raw(&a2, TokenAmount::from_whole(1)); // in FIL
1448
1449        let n_msgs = 5 * crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1450        for i in 0..n_msgs as usize {
1451            let bias = (n_msgs as usize - i) / 3;
1452            let m = create_fake_smsg(
1453                &mpool,
1454                &a2,
1455                &a1,
1456                i as u64,
1457                TEST_GAS_LIMIT,
1458                (200000 + i % 3 + bias) as u64,
1459            );
1460            mpool.add(m).unwrap();
1461            let m = create_fake_smsg(
1462                &mpool,
1463                &a1,
1464                &a2,
1465                i as u64,
1466                TEST_GAS_LIMIT,
1467                (190000 + i % 3 + bias) as u64,
1468            );
1469            mpool.add(m).unwrap();
1470        }
1471
1472        let msgs = mpool.select_messages(&ts, 0.1).unwrap();
1473
1474        let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1475        assert_eq!(
1476            msgs.len(),
1477            expected_msgs as usize,
1478            "Expected {} messages, but got {}",
1479            expected_msgs,
1480            msgs.len()
1481        );
1482
1483        let mut n_from1 = 0;
1484        let mut n_from2 = 0;
1485        let mut next_nonce1 = 0;
1486        let mut next_nonce2 = 0;
1487
1488        for m in msgs {
1489            if m.from() == a1 {
1490                if m.message.sequence != next_nonce1 {
1491                    panic!(
1492                        "Expected nonce {}, but got {}",
1493                        next_nonce1, m.message.sequence
1494                    );
1495                }
1496                next_nonce1 += 1;
1497                n_from1 += 1;
1498            } else {
1499                if m.message.sequence != next_nonce2 {
1500                    panic!(
1501                        "Expected nonce {}, but got {}",
1502                        next_nonce2, m.message.sequence
1503                    );
1504                }
1505                next_nonce2 += 1;
1506                n_from2 += 1;
1507            }
1508        }
1509
1510        if n_from1 > n_from2 {
1511            panic!("Expected more msgs from a2 than a1");
1512        }
1513    }
1514
1515    #[tokio::test]
1516    async fn test_optimal_msg_selection3() {
1517        let mut joinset = JoinSet::new();
1518        // this test uses 10 actors sending a block of messages to each other, with the
1519        // the first actors paying higher gas premium than the subsequent
1520        // actors. We select with a low ticket quality; the chain dependent
1521        // merging algorithm should pick messages from the median actor from the
1522        // start
1523        let mpool = make_test_mpool(&mut joinset);
1524        let ts = mock_tipset(&mpool).await;
1525        let api = mpool.api.clone();
1526
1527        let n_actors = 10;
1528
1529        let mut actors = vec![];
1530        let mut wallets = vec![];
1531
1532        for _ in 0..n_actors {
1533            let mut wallet = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1534            let actor = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
1535
1536            actors.push(actor);
1537            wallets.push(wallet);
1538        }
1539
1540        for a in &mut actors {
1541            api.set_state_balance_raw(a, TokenAmount::from_whole(1));
1542        }
1543
1544        let n_msgs = 1 + crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1545        for i in 0..n_msgs {
1546            for j in 0..n_actors {
1547                let premium =
1548                    500000 + 10000 * (n_actors - j) + (n_msgs + 2 - i) / (30 * n_actors) + i % 3;
1549                let m = create_fake_smsg(
1550                    &mpool,
1551                    &actors[j as usize],
1552                    &actors[j as usize],
1553                    i as u64,
1554                    TEST_GAS_LIMIT,
1555                    premium as u64,
1556                );
1557                mpool.add(m).unwrap();
1558            }
1559        }
1560
1561        let msgs = mpool.select_messages(&ts, 0.1).unwrap();
1562        let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1563
1564        assert_eq!(
1565            msgs.len(),
1566            expected_msgs as usize,
1567            "Expected {} messages, but got {}",
1568            expected_msgs,
1569            msgs.len()
1570        );
1571
1572        let who_is = |addr| -> usize {
1573            for (i, a) in actors.iter().enumerate() {
1574                if a == &addr {
1575                    return i;
1576                }
1577            }
1578            // Lotus has -1, but since we don't have -ve indexes, set it some unrealistic
1579            // number
1580            9999999
1581        };
1582
1583        let mut nonces = vec![0; n_actors as usize];
1584        for m in &msgs {
1585            let who = who_is(m.from());
1586            if who < 3 {
1587                panic!("got message from {who}th actor",);
1588            }
1589
1590            let next_nonce: u64 = nonces[who];
1591            if m.message.sequence != next_nonce {
1592                panic!(
1593                    "expected nonce {} but got {}",
1594                    next_nonce, m.message.sequence
1595                );
1596            }
1597            nonces[who] += 1;
1598        }
1599    }
1600}