Skip to main content

forest/message_pool/msgpool/
selection.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! Contains routines for message selection APIs.
5//! Whenever a miner is ready to create a block for a tipset, it invokes the
6//! `select_messages` API which selects an appropriate set of messages such that
7//! it optimizes miner reward and chain capacity. See <https://docs.filecoin.io/mine/lotus/message-pool/#message-selection> for more details
8
9use std::{borrow::BorrowMut, cmp::Ordering};
10
11use crate::blocks::{BLOCK_MESSAGE_LIMIT, Tipset};
12use crate::message::{Message, SignedMessage};
13use crate::message_pool::msg_chain::MsgChainNode;
14use crate::shim::crypto::SignatureType;
15use crate::shim::{address::Address, econ::TokenAmount};
16use ahash::{HashMap, HashMapExt};
17use anyhow::{Context, bail, ensure};
18use parking_lot::RwLock;
19use rand::prelude::SliceRandom;
20use tracing::{debug, error, warn};
21
22use super::{msg_pool::MessagePool, provider::Provider};
23use crate::message_pool::{
24    Error, add_to_selected_msgs,
25    msg_chain::{Chains, NodeKey, create_message_chains},
26    msg_pool::MsgSet,
27    msgpool::MIN_GAS,
28    remove_from_selected_msgs,
29};
30
31type Pending = HashMap<Address, HashMap<u64, SignedMessage>>;
32
33// A cap on maximum number of message to include in a block
34const MAX_BLOCK_MSGS: usize = 16000;
35const MAX_BLOCKS: usize = 15;
36const CBOR_GEN_LIMIT: usize = 8192; // Same limits as in
37// [cbor-gen](https://github.com/whyrusleeping/cbor-gen/blob/cba3eeea9ae8ec4db1b7283e3654d8c18979affe/gen.go#L32), which Lotus uses.
38
39/// A structure that holds the selected messages for a block.
40/// It tracks the gas limit and the limits for different signature types
41/// to ensure that the block does not exceed the limits set by the protocol.
42struct SelectedMessages {
43    /// The messages selected for inclusion in the block.
44    msgs: Vec<SignedMessage>,
45    /// The remaining gas limit for the block.
46    gas_limit: u64,
47    /// The remaining limit for `secp256k1` messages in the block.
48    secp_limit: u64,
49    /// The remaining limit for `bls` messages in the block.
50    bls_limit: u64,
51}
52
53impl Default for SelectedMessages {
54    fn default() -> Self {
55        SelectedMessages {
56            msgs: Vec::new(),
57            gas_limit: crate::shim::econ::BLOCK_GAS_LIMIT,
58            secp_limit: CBOR_GEN_LIMIT as u64,
59            bls_limit: CBOR_GEN_LIMIT as u64,
60        }
61    }
62}
63
64impl SelectedMessages {
65    fn new(msgs: Vec<SignedMessage>, gas_limit: u64) -> Self {
66        SelectedMessages {
67            msgs,
68            gas_limit,
69            ..Default::default()
70        }
71    }
72
73    /// Returns the number of messages selected for inclusion in the block.
74    fn len(&self) -> usize {
75        self.msgs.len()
76    }
77
78    /// Truncates the selected messages to the specified length.
79    fn truncate(&mut self, len: usize) {
80        self.msgs.truncate(len);
81    }
82
83    /// Reduces the gas limit by the specified amount. It ensures that the gas limit does not
84    /// go below zero (which would cause a panic).
85    fn reduce_gas_limit(&mut self, gas: u64) {
86        self.gas_limit = self.gas_limit.saturating_sub(gas);
87    }
88
89    /// Reduces the BLS limit by the specified amount. It ensures that the BLS message limit does not
90    /// go below zero (which would cause a panic).
91    fn reduce_bls_limit(&mut self, bls: u64) {
92        self.bls_limit = self.bls_limit.saturating_sub(bls);
93    }
94
95    /// Reduces the `Secp256k1` limit by the specified amount. It ensures that the `Secp256k1` message limit
96    /// does not go below zero (which would cause a panic).
97    fn reduce_secp_limit(&mut self, secp: u64) {
98        self.secp_limit = self.secp_limit.saturating_sub(secp);
99    }
100
101    /// Extends the selected messages with the given messages.
102    fn extend(&mut self, msgs: Vec<SignedMessage>) {
103        self.msgs.extend(msgs);
104    }
105
106    /// Tries to add a message chain to the selected messages. Returns an error if the chain can't
107    /// be added due to block constraints.
108    fn try_to_add(&mut self, message_chain: MsgChainNode) -> anyhow::Result<()> {
109        let msg_chain_len = message_chain.msgs.len();
110        ensure!(
111            BLOCK_MESSAGE_LIMIT >= msg_chain_len + self.len(),
112            "Message chain is too long to fit in the block: {} messages, limit is {BLOCK_MESSAGE_LIMIT}",
113            msg_chain_len + self.len(),
114        );
115        ensure!(
116            self.gas_limit >= message_chain.gas_limit,
117            "Message chain gas limit is too high: {} gas, limit is {}",
118            message_chain.gas_limit,
119            self.gas_limit
120        );
121
122        match message_chain.sig_type {
123            Some(SignatureType::Bls) => {
124                ensure!(
125                    self.bls_limit >= msg_chain_len as u64,
126                    "BLS limit is too low: {msg_chain_len} messages, limit is {}",
127                    self.bls_limit
128                );
129
130                self.extend(message_chain.msgs);
131                self.reduce_bls_limit(msg_chain_len as u64);
132                self.reduce_gas_limit(message_chain.gas_limit);
133            }
134            Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
135                ensure!(
136                    self.secp_limit >= msg_chain_len as u64,
137                    "Secp256k1 limit is too low: {msg_chain_len} messages, limit is {}",
138                    self.secp_limit
139                );
140
141                self.extend(message_chain.msgs);
142                self.reduce_secp_limit(msg_chain_len as u64);
143                self.reduce_gas_limit(message_chain.gas_limit);
144            }
145            None => {
146                // This is a message with no signature type, which is not allowed in the current
147                // implementation. This _should_ never happen, but we handle it gracefully.
148                warn!("Tried to add a message chain with no signature type");
149            }
150        }
151        Ok(())
152    }
153
154    /// Tries to add a message chain with dependencies to the selected messages.
155    /// It will trim or invalidate if appropriate.
156    fn try_to_add_with_deps(
157        &mut self,
158        idx: usize,
159        chains: &mut Chains,
160        base_fee: &TokenAmount,
161    ) -> anyhow::Result<()> {
162        let message_chain = chains
163            .get_mut_at(idx)
164            .context("Couldn't find required message chain")?;
165        // compute the dependencies that must be merged and the gas limit including deps
166        let mut chain_gas_limit = message_chain.gas_limit;
167        let mut chain_msg_limit = message_chain.msgs.len();
168        let mut dep_gas_limit = 0;
169        let mut dep_message_limit = 0;
170        let selected_messages_limit = match message_chain.sig_type {
171            Some(SignatureType::Bls) => self.bls_limit as usize,
172            Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
173                self.secp_limit as usize
174            }
175            None => {
176                // This is a message with no signature type, which is not allowed in the current
177                // implementation. This _should_ never happen, but we handle it gracefully.
178                bail!("Tried to add a message chain with no signature type");
179            }
180        };
181        let selected_messages_limit =
182            selected_messages_limit.min(BLOCK_MESSAGE_LIMIT.saturating_sub(self.len()));
183
184        let mut chain_deps = vec![];
185        let mut cur_chain = message_chain.prev;
186        let _ = message_chain; // drop the mutable borrow to avoid conflicts
187        while let Some(cur_chn) = cur_chain {
188            let node = chains.get(cur_chn).unwrap();
189            if !node.merged {
190                chain_deps.push(cur_chn);
191                chain_gas_limit += node.gas_limit;
192                chain_msg_limit += node.msgs.len();
193                dep_gas_limit += node.gas_limit;
194                dep_message_limit += node.msgs.len();
195                cur_chain = node.prev;
196            } else {
197                break;
198            }
199        }
200
201        // the chain doesn't fit as-is, so trim / invalidate it and return false
202        if chain_gas_limit > self.gas_limit || chain_msg_limit > selected_messages_limit {
203            // it doesn't all fit; now we have to take into account the dependent chains before
204            // making a decision about trimming or invalidating.
205            // if the dependencies exceed the block limits, then we must invalidate the chain
206            // as it can never be included.
207            // Otherwise we can just trim and continue
208            if dep_gas_limit > self.gas_limit || dep_message_limit >= selected_messages_limit {
209                chains.invalidate(chains.get_key_at(idx));
210            } else {
211                // dependencies fit, just trim it
212                chains.trim_msgs_at(
213                    idx,
214                    self.gas_limit.saturating_sub(dep_gas_limit),
215                    selected_messages_limit.saturating_sub(dep_message_limit),
216                    base_fee,
217                );
218            }
219
220            bail!("Chain doesn't fit in the block");
221        }
222        for dep in chain_deps.iter().rev() {
223            let cur_chain = chains.get_mut(*dep);
224            if let Some(node) = cur_chain {
225                node.merged = true;
226                self.extend(node.msgs.clone());
227            } else {
228                bail!("Couldn't find required dependent message chain");
229            }
230        }
231
232        let message_chain = chains
233            .get_mut_at(idx)
234            .context("Couldn't find required message chain")?;
235        message_chain.merged = true;
236        self.extend(message_chain.msgs.clone());
237        self.reduce_gas_limit(chain_gas_limit);
238
239        match message_chain.sig_type {
240            Some(SignatureType::Bls) => {
241                self.reduce_bls_limit(chain_msg_limit as u64);
242            }
243            Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
244                self.reduce_secp_limit(chain_msg_limit as u64);
245            }
246            None => {
247                // This is a message with no signature type, which is not allowed in the current
248                // implementation. This _should_ never happen, but we handle it gracefully.
249                bail!("Tried to add a message chain with no signature type");
250            }
251        }
252
253        Ok(())
254    }
255
256    fn trim_chain_at(&mut self, chains: &mut Chains, idx: usize, base_fee: &TokenAmount) {
257        let message_chain = match chains.get_at(idx) {
258            Some(message_chain) => message_chain,
259            None => {
260                error!("Tried to trim a message chain that doesn't exist");
261                return;
262            }
263        };
264        let msg_limit = BLOCK_MESSAGE_LIMIT.saturating_sub(self.len());
265        let msg_limit = match message_chain.sig_type {
266            Some(SignatureType::Bls) => std::cmp::min(self.bls_limit, msg_limit as u64),
267            Some(SignatureType::Secp256k1) | Some(SignatureType::Delegated) => {
268                std::cmp::min(self.secp_limit, msg_limit as u64)
269            }
270            _ => {
271                // This is a message with no signature type, which is not allowed in the current
272                // implementation. This _should_ never happen, but we handle it gracefully.
273                error!("Tried to trim a message chain with no signature type");
274                return;
275            }
276        };
277
278        if message_chain.gas_limit > self.gas_limit || message_chain.msgs.len() > msg_limit as usize
279        {
280            chains.trim_msgs_at(idx, self.gas_limit, msg_limit as usize, base_fee);
281        }
282    }
283}
284
285impl<T> MessagePool<T>
286where
287    T: Provider,
288{
289    /// Forest employs a sophisticated algorithm for selecting messages
290    /// for inclusion from the pool, given the ticket quality of a miner.
291    /// This method selects messages for including in a block.
292    pub fn select_messages(&self, ts: &Tipset, tq: f64) -> Result<Vec<SignedMessage>, Error> {
293        let cur_ts = self.current_tipset();
294        // if the ticket quality is high enough that the first block has higher
295        // probability than any other block, then we don't bother with optimal
296        // selection because the first block will always have higher effective
297        // performance. Otherwise we select message optimally based on effective
298        // performance of chains.
299        let mut msgs = if tq > 0.84 {
300            self.select_messages_greedy(&cur_ts, ts)
301        } else {
302            self.select_messages_optimal(&cur_ts, ts, tq)
303        }?;
304
305        if msgs.len() > MAX_BLOCK_MSGS {
306            warn!(
307                "Message selection chose too many messages: {} > {MAX_BLOCK_MSGS}",
308                msgs.len(),
309            );
310            msgs.truncate(MAX_BLOCK_MSGS)
311        }
312
313        Ok(msgs.msgs)
314    }
315
316    fn select_messages_greedy(
317        &self,
318        cur_ts: &Tipset,
319        ts: &Tipset,
320    ) -> Result<SelectedMessages, Error> {
321        let base_fee = self.api.chain_compute_base_fee(ts)?;
322
323        // 0. Load messages from the target tipset; if it is the same as the current
324        // tipset in    the mpool, then this is just the pending messages
325        let mut pending = self.get_pending_messages(cur_ts, ts)?;
326
327        if pending.is_empty() {
328            return Ok(SelectedMessages::default());
329        }
330
331        // 0b. Select all priority messages that fit in the block
332        let selected_msgs = self.select_priority_messages(&mut pending, &base_fee, ts)?;
333
334        // check if block has been filled
335        if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
336            return Ok(selected_msgs);
337        }
338
339        // 1. Create a list of dependent message chains with maximal gas reward per
340        // limit consumed
341        let mut chains = Chains::new();
342        for (actor, mset) in pending.into_iter() {
343            create_message_chains(
344                self.api.as_ref(),
345                &actor,
346                &mset,
347                &base_fee,
348                ts,
349                &mut chains,
350                &self.chain_config,
351            )?;
352        }
353
354        Ok(merge_and_trim(
355            &mut chains,
356            selected_msgs,
357            &base_fee,
358            MIN_GAS,
359        ))
360    }
361
362    #[allow(clippy::indexing_slicing)]
363    fn select_messages_optimal(
364        &self,
365        cur_ts: &Tipset,
366        target_tipset: &Tipset,
367        ticket_quality: f64,
368    ) -> Result<SelectedMessages, Error> {
369        let base_fee = self.api.chain_compute_base_fee(target_tipset)?;
370
371        // 0. Load messages from the target tipset; if it is the same as the current
372        // tipset in    the mpool, then this is just the pending messages
373        let mut pending = self.get_pending_messages(cur_ts, target_tipset)?;
374
375        if pending.is_empty() {
376            return Ok(SelectedMessages::default());
377        }
378
379        // 0b. Select all priority messages that fit in the block
380        let mut selected_msgs =
381            self.select_priority_messages(&mut pending, &base_fee, target_tipset)?;
382
383        // check if block has been filled
384        if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
385            return Ok(selected_msgs);
386        }
387
388        // 1. Create a list of dependent message chains with maximal gas reward per
389        // limit consumed
390        let mut chains = Chains::new();
391        for (actor, mset) in pending.into_iter() {
392            create_message_chains(
393                self.api.as_ref(),
394                &actor,
395                &mset,
396                &base_fee,
397                target_tipset,
398                &mut chains,
399                &self.chain_config,
400            )?;
401        }
402
403        // 2. Sort the chains
404        chains.sort(false);
405
406        if chains.get_at(0).is_some_and(|it| it.gas_perf < 0.0) {
407            tracing::warn!(
408                "all messages in mpool have non-positive gas performance {}",
409                chains[0].gas_perf
410            );
411            return Ok(selected_msgs);
412        }
413
414        // 3. Partition chains into blocks (without trimming)
415        //    we use the full block_gas_limit (as opposed to the residual `gas_limit`
416        //    from the priority message selection) as we have to account for
417        //    what other block providers are doing
418        let mut next_chain = 0;
419        let mut partitions: Vec<Vec<NodeKey>> = vec![vec![]; MAX_BLOCKS];
420        let mut i = 0;
421        while i < MAX_BLOCKS && next_chain < chains.len() {
422            let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
423            let mut msg_limit = BLOCK_MESSAGE_LIMIT;
424            while next_chain < chains.len() {
425                let chain_key = chains.key_vec[next_chain];
426                next_chain += 1;
427                partitions[i].push(chain_key);
428                let chain = chains.get(chain_key).unwrap();
429                let chain_gas_limit = chain.gas_limit;
430                if gas_limit < chain_gas_limit {
431                    break;
432                }
433                gas_limit = gas_limit.saturating_sub(chain_gas_limit);
434                msg_limit = msg_limit.saturating_sub(chain.msgs.len());
435                if gas_limit < MIN_GAS || msg_limit == 0 {
436                    break;
437                }
438            }
439            i += 1;
440        }
441
442        // 4. Compute effective performance for each chain, based on the partition they
443        // fall into    The effective performance is the gas_perf of the chain *
444        // block probability
445        let block_prob = crate::message_pool::block_probabilities(ticket_quality);
446        let mut eff_chains = 0;
447        for i in 0..MAX_BLOCKS {
448            for k in &partitions[i] {
449                if let Some(node) = chains.get_mut(*k) {
450                    node.eff_perf = node.gas_perf * block_prob[i];
451                }
452            }
453            eff_chains += partitions[i].len();
454        }
455
456        // nullify the effective performance of chains that don't fit in any partition
457        for i in eff_chains..chains.len() {
458            if let Some(node) = chains.get_mut_at(i) {
459                node.set_null_effective_perf();
460            }
461        }
462
463        // 5. Re-sort the chains based on effective performance
464        chains.sort_effective();
465
466        // 6. Merge the head chains to produce the list of messages selected for
467        //    inclusion subject to the residual block limits
468        //    When a chain is merged in, all its previous dependent chains *must* also
469        //    be merged in or we'll have a broken block
470        let mut last = chains.len();
471        for i in 0..chains.len() {
472            // did we run out of performing chains?
473            if chains[i].gas_perf < 0.0 {
474                break;
475            }
476
477            // has it already been merged?
478            if chains[i].merged {
479                continue;
480            }
481
482            match selected_msgs.try_to_add_with_deps(i, &mut chains, &base_fee) {
483                Ok(_) => {
484                    // adjust the effective performance for all subsequent chains
485                    if let Some(next_key) = chains[i].next {
486                        let next_node = chains.get_mut(next_key).unwrap();
487                        if next_node.eff_perf > 0.0 {
488                            next_node.eff_perf += next_node.parent_offset;
489                            let mut next_next_key = next_node.next;
490                            while let Some(nnk) = next_next_key {
491                                let (nn_node, prev_perfs) = chains.get_mut_with_prev_eff(nnk);
492                                if let Some(nn_node) = nn_node {
493                                    if nn_node.eff_perf > 0.0 {
494                                        nn_node.set_eff_perf(prev_perfs);
495                                        next_next_key = nn_node.next;
496                                    } else {
497                                        break;
498                                    }
499                                } else {
500                                    break;
501                                }
502                            }
503                        }
504                    }
505
506                    // re-sort to account for already merged chains and effective performance
507                    // adjustments the sort *must* be stable or we end up getting
508                    // negative gasPerfs pushed up.
509                    chains.sort_range_effective(i + 1..);
510
511                    continue;
512                }
513                Err(e) => {
514                    debug!("Failed to add message chain with dependencies: {e}");
515                }
516            }
517
518            // we can't fit this chain and its dependencies because of block gasLimit -- we
519            // are at the edge
520            last = i;
521            break;
522        }
523
524        // 7. We have reached the edge of what can fit wholesale; if we still hae
525        // available    gasLimit to pack some more chains, then trim the last
526        // chain and push it down.
527        //
528        // Trimming invalidates subsequent dependent chains so that they can't be selected
529        // as their dependency cannot be (fully) included. We do this in a loop because the blocker
530        // might have been inordinately large and we might have to do it
531        // multiple times to satisfy tail packing.
532        'tail_loop: while selected_msgs.gas_limit >= MIN_GAS && last < chains.len() {
533            if !chains[last].valid {
534                // the chain has been invalidated, we can't use it
535                last += 1;
536                continue;
537            }
538
539            // trim if necessary
540            selected_msgs.trim_chain_at(&mut chains, last, &base_fee);
541
542            // push down if it hasn't been invalidated
543            if chains[last].valid {
544                for i in last..chains.len() - 1 {
545                    if chains[i].cmp_effective(&chains[i + 1]) == Ordering::Greater {
546                        break;
547                    }
548                    chains.key_vec.swap(i, i + 1);
549                }
550            }
551
552            // select the next (valid and fitting) chain and its dependencies for inclusion
553            let lst = last; // to make clippy happy, see: https://rust-lang.github.io/rust-clippy/master/index.html#mut_range_bound
554            for i in lst..chains.len() {
555                let chain = &mut chains[i];
556                // has the chain been invalidated
557                if !chain.valid {
558                    continue;
559                }
560
561                // has it already been merged?
562                if chain.merged {
563                    continue;
564                }
565
566                // if gasPerf < 0 we have no more profitable chains
567                if chain.gas_perf < 0.0 {
568                    break 'tail_loop;
569                }
570
571                match selected_msgs.try_to_add_with_deps(i, &mut chains, &base_fee) {
572                    Ok(_) => continue,
573                    Err(e) => debug!("Failed to add message chain with dependencies: {e}"),
574                }
575
576                continue 'tail_loop;
577            }
578
579            // the merge loop ended after processing all the chains and we we probably have
580            // still gas to spare; end the loop.
581            break;
582        }
583
584        // if we have room to spare, pick some random (non-negative) chains to fill
585        // the block
586        // we pick randomly so that we minimize the probability of
587        // duplication among all block producers
588        if selected_msgs.gas_limit >= MIN_GAS && selected_msgs.msgs.len() <= BLOCK_MESSAGE_LIMIT {
589            let pre_random_length = selected_msgs.len();
590
591            chains
592                .key_vec
593                .shuffle(&mut crate::utils::rand::forest_rng());
594
595            for i in 0..chains.len() {
596                if selected_msgs.gas_limit < MIN_GAS || selected_msgs.len() >= BLOCK_MESSAGE_LIMIT {
597                    break;
598                }
599
600                // has it been merged or invalidated?
601                if chains[i].merged || !chains[i].valid {
602                    continue;
603                }
604
605                // is it negative?
606                if chains[i].gas_perf < 0.0 {
607                    continue;
608                }
609
610                if selected_msgs
611                    .try_to_add_with_deps(i, &mut chains, &base_fee)
612                    .is_ok()
613                {
614                    // we added it, continue
615                    continue;
616                }
617
618                if chains[i].valid {
619                    // chain got trimmer on the previous call to `try_to_add_with_deps`, so it can
620                    // now be included.
621                    selected_msgs
622                        .try_to_add_with_deps(i, &mut chains, &base_fee)
623                        .context("Failed to add message chain with dependencies")?;
624                    continue;
625                }
626            }
627
628            if selected_msgs.len() != pre_random_length {
629                tracing::warn!(
630                    "optimal selection failed to pack a block; picked {} messages with random selection",
631                    selected_msgs.len() - pre_random_length
632                );
633            }
634        }
635
636        Ok(selected_msgs)
637    }
638
639    fn get_pending_messages(&self, cur_ts: &Tipset, ts: &Tipset) -> Result<Pending, Error> {
640        let mut result: Pending = HashMap::new();
641        let mut in_sync = false;
642        if cur_ts.epoch() == ts.epoch() && cur_ts == ts {
643            in_sync = true;
644        }
645
646        for (a, mset) in self.pending.read().iter() {
647            if in_sync {
648                result.insert(*a, mset.msgs.clone());
649            } else {
650                let mut mset_copy = HashMap::new();
651                for (nonce, m) in mset.msgs.iter() {
652                    mset_copy.insert(*nonce, m.clone());
653                }
654                result.insert(*a, mset_copy);
655            }
656        }
657
658        if in_sync {
659            return Ok(result);
660        }
661
662        // Run head change to do reorg detection
663        run_head_change(
664            self.api.as_ref(),
665            &self.pending,
666            cur_ts.clone(),
667            ts.clone(),
668            &mut result,
669        )?;
670
671        Ok(result)
672    }
673
674    fn select_priority_messages(
675        &self,
676        pending: &mut Pending,
677        base_fee: &TokenAmount,
678        ts: &Tipset,
679    ) -> Result<SelectedMessages, Error> {
680        let result = Vec::with_capacity(self.config.size_limit_low() as usize);
681        let gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT;
682        let min_gas = 1298450;
683
684        // 1. Get priority actor chains
685        let priority = self.config.priority_addrs();
686        let mut chains = Chains::new();
687        for actor in priority.iter() {
688            // remove actor from pending set as we are processing these messages.
689            if let Some(mset) = pending.remove(actor) {
690                // create chains for the priority actor
691                create_message_chains(
692                    self.api.as_ref(),
693                    actor,
694                    &mset,
695                    base_fee,
696                    ts,
697                    &mut chains,
698                    &self.chain_config,
699                )?;
700            }
701        }
702
703        if chains.is_empty() {
704            return Ok(SelectedMessages::new(Vec::new(), gas_limit));
705        }
706
707        Ok(merge_and_trim(
708            &mut chains,
709            SelectedMessages::new(result, gas_limit),
710            base_fee,
711            min_gas,
712        ))
713    }
714}
715
716/// Returns merged and trimmed messages with the gas limit
717#[allow(clippy::indexing_slicing)]
718fn merge_and_trim(
719    chains: &mut Chains,
720    mut selected_msgs: SelectedMessages,
721    base_fee: &TokenAmount,
722    min_gas: u64,
723) -> SelectedMessages {
724    if chains.is_empty() {
725        return selected_msgs;
726    }
727
728    // 2. Sort the chains
729    chains.sort(true);
730
731    let first_chain_gas_perf = chains[0].gas_perf;
732
733    if !chains.is_empty() && first_chain_gas_perf < 0.0 {
734        warn!(
735            "all priority messages in mpool have negative gas performance bestGasPerf: {}",
736            first_chain_gas_perf
737        );
738        return selected_msgs;
739    }
740
741    // 3. Merge chains until the block limit, as long as they have non-negative gas
742    // performance
743    let mut last = chains.len();
744    for i in 0..chains.len() {
745        let node = &chains[i];
746
747        if node.gas_perf < 0.0 {
748            break;
749        }
750
751        if selected_msgs.try_to_add(node.clone()).is_ok() {
752            // there was room, we added the chain, keep going
753            continue;
754        }
755
756        // we can't fit this chain because of block gas limit -- we are at the edge
757        last = i;
758        break;
759    }
760
761    'tail_loop: while selected_msgs.gas_limit >= min_gas && last < chains.len() {
762        // trim, discard negative performing messages
763        selected_msgs.trim_chain_at(chains, last, base_fee);
764
765        // push down if it hasn't been invalidated
766        let node = &chains[last];
767        if node.valid {
768            for i in last..chains.len() - 1 {
769                // slot_chains
770                let cur_node = &chains[i];
771                let next_node = &chains[i + 1];
772                if cur_node.compare(next_node) == Ordering::Greater {
773                    break;
774                }
775
776                chains.key_vec.swap(i, i + 1);
777            }
778        }
779
780        // select the next (valid and fitting) chain for inclusion
781        let lst = last; // to make clippy happy, see: https://rust-lang.github.io/rust-clippy/master/index.html#mut_range_bound
782        for i in lst..chains.len() {
783            let chain = chains[i].clone();
784            if !chain.valid {
785                continue;
786            }
787
788            // if gas_perf < 0 then we have no more profitable chains
789            if chain.gas_perf < 0.0 {
790                break 'tail_loop;
791            }
792
793            // does it fit in the block?
794            if selected_msgs.try_to_add(chain).is_ok() {
795                // there was room, we added the chain, keep going
796                continue;
797            }
798
799            // this chain needs to be trimmed
800            last += i;
801            continue 'tail_loop;
802        }
803
804        break;
805    }
806
807    selected_msgs
808}
809
810/// Like `head_change`, except it doesn't change the state of the `MessagePool`.
811/// It simulates a head change call.
812// This logic should probably be implemented in the ChainStore. It handles
813// reorgs.
814pub(in crate::message_pool) fn run_head_change<T>(
815    api: &T,
816    pending: &RwLock<HashMap<Address, MsgSet>>,
817    from: Tipset,
818    to: Tipset,
819    rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
820) -> Result<(), Error>
821where
822    T: Provider,
823{
824    let mut left = from;
825    let mut right = to;
826    let mut left_chain = Vec::new();
827    let mut right_chain = Vec::new();
828    while left != right {
829        if left.epoch() > right.epoch() {
830            left_chain.push(left.clone());
831            let par = api.load_tipset(left.parents())?;
832            left = par;
833        } else {
834            right_chain.push(right.clone());
835            let par = api.load_tipset(right.parents())?;
836            right = par;
837        }
838    }
839    for ts in left_chain {
840        let mut msgs: Vec<SignedMessage> = Vec::new();
841        for block in ts.block_headers() {
842            let (_, smsgs) = api.messages_for_block(block)?;
843            msgs.extend(smsgs);
844        }
845        for msg in msgs {
846            add_to_selected_msgs(msg, rmsgs);
847        }
848    }
849
850    for ts in right_chain {
851        for b in ts.block_headers() {
852            let (msgs, smsgs) = api.messages_for_block(b)?;
853
854            for msg in smsgs {
855                remove_from_selected_msgs(
856                    &msg.from(),
857                    pending,
858                    msg.sequence(),
859                    rmsgs.borrow_mut(),
860                )?;
861            }
862            for msg in msgs {
863                remove_from_selected_msgs(&msg.from, pending, msg.sequence, rmsgs.borrow_mut())?;
864            }
865        }
866    }
867    Ok(())
868}
869
870#[cfg(test)]
871mod test_selection {
872    use std::sync::Arc;
873
874    use crate::db::MemoryDB;
875    use crate::key_management::{KeyStore, KeyStoreConfig, Wallet};
876    use crate::message::Message;
877    use crate::shim::crypto::SignatureType;
878    use crate::shim::econ::BLOCK_GAS_LIMIT;
879    use tokio::task::JoinSet;
880
881    use super::*;
882    use crate::message_pool::{
883        head_change,
884        msgpool::{
885            test_provider::{TestApi, mock_block},
886            tests::{create_fake_smsg, create_smsg},
887        },
888    };
889
890    const TEST_GAS_LIMIT: i64 = 6955002;
891
892    fn make_test_mpool(joinset: &mut JoinSet<anyhow::Result<()>>) -> MessagePool<TestApi> {
893        let tma = TestApi::default();
894        let (tx, _rx) = flume::bounded(50);
895        MessagePool::new(tma, tx, Default::default(), Arc::default(), joinset).unwrap()
896    }
897
898    /// Creates a tipset with a mocked block and performs a head change to setup the
899    /// [`MessagePool`] for testing.
900    async fn mock_tipset(mpool: &mut MessagePool<TestApi>) -> Tipset {
901        let b1 = mock_block(1, 1);
902        let ts = Tipset::from(&b1);
903        let api = mpool.api.clone();
904        let bls_sig_cache = mpool.bls_sig_cache.clone();
905        let pending = mpool.pending.clone();
906        let cur_tipset = mpool.cur_tipset.clone();
907        let repub_trigger = mpool.repub_trigger.clone();
908        let republished = mpool.republished.clone();
909
910        head_change(
911            api.as_ref(),
912            bls_sig_cache.as_ref(),
913            repub_trigger.clone(),
914            republished.as_ref(),
915            pending.as_ref(),
916            cur_tipset.as_ref(),
917            Vec::new(),
918            vec![Tipset::from(b1)],
919        )
920        .await
921        .unwrap();
922
923        ts
924    }
925
926    #[tokio::test]
927    async fn basic_message_selection() {
928        let mut joinset = JoinSet::new();
929        let mpool = make_test_mpool(&mut joinset);
930
931        let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
932        let mut w1 = Wallet::new(ks1);
933        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
934
935        let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
936        let mut w2 = Wallet::new(ks2);
937        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
938
939        let b1 = mock_block(1, 1);
940        let ts = Tipset::from(&b1);
941        let api = mpool.api.clone();
942        let bls_sig_cache = mpool.bls_sig_cache.clone();
943        let pending = mpool.pending.clone();
944        let cur_tipset = mpool.cur_tipset.clone();
945        let repub_trigger = mpool.repub_trigger.clone();
946        let republished = mpool.republished.clone();
947
948        head_change(
949            api.as_ref(),
950            bls_sig_cache.as_ref(),
951            repub_trigger.clone(),
952            republished.as_ref(),
953            pending.as_ref(),
954            cur_tipset.as_ref(),
955            Vec::new(),
956            vec![Tipset::from(b1)],
957        )
958        .await
959        .unwrap();
960
961        api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
962        api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
963
964        // we create 10 messages from each actor to another, with the first actor paying
965        // higher gas prices than the second; we expect message selection to
966        // order his messages first
967        for i in 0..10 {
968            let m = create_smsg(&a2, &a1, &mut w1, i, TEST_GAS_LIMIT, 2 * i + 1);
969            mpool.add(m).unwrap();
970        }
971        for i in 0..10 {
972            let m = create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1);
973            mpool.add(m).unwrap();
974        }
975
976        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
977
978        assert_eq!(msgs.len(), 20, "Expected 20 messages, got {}", msgs.len());
979
980        let mut next_nonce = 0;
981        for (i, msg) in msgs.iter().enumerate().take(10) {
982            assert_eq!(
983                msg.from(),
984                a1,
985                "first 10 returned messages should be from actor a1 {i}",
986            );
987            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
988            next_nonce += 1;
989        }
990
991        next_nonce = 0;
992        for (i, msg) in msgs.iter().enumerate().take(20).skip(10) {
993            assert_eq!(
994                msg.from(),
995                a2,
996                "next 10 returned messages should be from actor a2 {i}",
997            );
998            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
999            next_nonce += 1;
1000        }
1001
1002        // now we make a block with all the messages and advance the chain
1003        let b2 = mpool.api.next_block();
1004        mpool.api.set_block_messages(&b2, msgs);
1005        head_change(
1006            api.as_ref(),
1007            bls_sig_cache.as_ref(),
1008            repub_trigger.clone(),
1009            republished.as_ref(),
1010            pending.as_ref(),
1011            cur_tipset.as_ref(),
1012            Vec::new(),
1013            vec![Tipset::from(b2)],
1014        )
1015        .await
1016        .unwrap();
1017
1018        // we should now have no pending messages in the MessagePool
1019        // let pending = mpool.pending.read().await;
1020        assert!(
1021            mpool.pending.read().is_empty(),
1022            "Expected no pending messages, but got {}",
1023            mpool.pending.read().len()
1024        );
1025
1026        // create a block and advance the chain without applying to the mpool
1027        let mut msgs = Vec::with_capacity(20);
1028        for i in 10..20 {
1029            msgs.push(create_smsg(&a2, &a1, &mut w1, i, TEST_GAS_LIMIT, 2 * i + 1));
1030            msgs.push(create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1));
1031        }
1032        let b3 = mpool.api.next_block();
1033        let ts3 = Tipset::from(&b3);
1034        mpool.api.set_block_messages(&b3, msgs);
1035
1036        // now create another set of messages and add them to the mpool
1037        for i in 20..30 {
1038            mpool
1039                .add(create_smsg(
1040                    &a2,
1041                    &a1,
1042                    &mut w1,
1043                    i,
1044                    TEST_GAS_LIMIT,
1045                    2 * i + 200,
1046                ))
1047                .unwrap();
1048            mpool
1049                .add(create_smsg(&a1, &a2, &mut w2, i, TEST_GAS_LIMIT, i + 1))
1050                .unwrap();
1051        }
1052        // select messages in the last tipset; this should include the missed messages
1053        // as well as the last messages we added, with the first actor's
1054        // messages first first we need to update the nonce on the api
1055        mpool.api.set_state_sequence(&a1, 10);
1056        mpool.api.set_state_sequence(&a2, 10);
1057        let msgs = mpool.select_messages(&ts3, 1.0).unwrap();
1058
1059        assert_eq!(
1060            msgs.len(),
1061            20,
1062            "Expected 20 messages, but got {}",
1063            msgs.len()
1064        );
1065
1066        let mut next_nonce = 20;
1067        for msg in msgs.iter().take(10) {
1068            assert_eq!(
1069                msg.from(),
1070                a1,
1071                "first 10 returned messages should be from actor a1"
1072            );
1073            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1074            next_nonce += 1;
1075        }
1076        next_nonce = 20;
1077        for msg in msgs.iter().take(20).skip(10) {
1078            assert_eq!(
1079                msg.from(),
1080                a2,
1081                "next 10 returned messages should be from actor a2"
1082            );
1083            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1084            next_nonce += 1;
1085        }
1086    }
1087
1088    #[tokio::test]
1089    async fn message_selection_trimming_gas() {
1090        let mut joinset = JoinSet::new();
1091        let mut mpool = make_test_mpool(&mut joinset);
1092        let ts = mock_tipset(&mut mpool).await;
1093        let api = mpool.api.clone();
1094
1095        let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1096        let mut w1 = Wallet::new(ks1);
1097        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1098
1099        let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1100        let mut w2 = Wallet::new(ks2);
1101        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1102
1103        api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1104        api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1105
1106        let nmsgs = (crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT) + 1;
1107
1108        // make many small chains for the two actors
1109        for i in 0..nmsgs {
1110            let bias = (nmsgs - i) / 3;
1111            let m = create_fake_smsg(
1112                &mpool,
1113                &a2,
1114                &a1,
1115                i as u64,
1116                TEST_GAS_LIMIT,
1117                (1 + i % 3 + bias) as u64,
1118            );
1119            mpool.add(m).unwrap();
1120            let m = create_fake_smsg(
1121                &mpool,
1122                &a1,
1123                &a2,
1124                i as u64,
1125                TEST_GAS_LIMIT,
1126                (1 + i % 3 + bias) as u64,
1127            );
1128            mpool.add(m).unwrap();
1129        }
1130
1131        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1132
1133        let expected = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1134        assert_eq!(msgs.len(), expected as usize);
1135        let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1136        assert!(m_gas_limit <= crate::shim::econ::BLOCK_GAS_LIMIT);
1137    }
1138
1139    #[tokio::test]
1140    async fn message_selection_trimming_msgs_basic() {
1141        let mut joinset = JoinSet::new();
1142        let mut mpool = make_test_mpool(&mut joinset);
1143        let ts = mock_tipset(&mut mpool).await;
1144        let api = mpool.api.clone();
1145
1146        let keystore = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1147        let mut wallet = Wallet::new(keystore);
1148        let address = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
1149
1150        api.set_state_balance_raw(&address, TokenAmount::from_whole(1));
1151
1152        // create a larger than selectable chain
1153        for i in 0..BLOCK_MESSAGE_LIMIT {
1154            let msg = create_fake_smsg(&mpool, &address, &address, i as u64, 200_000, 100);
1155            mpool.add(msg).unwrap();
1156        }
1157
1158        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1159        assert_eq!(
1160            msgs.len(),
1161            CBOR_GEN_LIMIT,
1162            "Expected {CBOR_GEN_LIMIT} messages, got {}",
1163            msgs.len()
1164        );
1165
1166        // check that the gas limit is not exceeded
1167        let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1168        assert!(
1169            m_gas_limit <= BLOCK_GAS_LIMIT,
1170            "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1171        );
1172    }
1173
1174    #[tokio::test]
1175    async fn message_selection_trimming_msgs_two_senders() {
1176        let mut joinset = JoinSet::new();
1177        let mut mpool = make_test_mpool(&mut joinset);
1178        let ts = mock_tipset(&mut mpool).await;
1179        let api = mpool.api.clone();
1180
1181        let keystore_1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1182        let mut wallet_1 = Wallet::new(keystore_1);
1183        let address_1 = wallet_1.generate_addr(SignatureType::Secp256k1).unwrap();
1184
1185        let keystore_2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1186        let mut wallet_2 = Wallet::new(keystore_2);
1187        let address_2 = wallet_2.generate_addr(SignatureType::Bls).unwrap();
1188
1189        api.set_state_balance_raw(&address_1, TokenAmount::from_whole(1));
1190        api.set_state_balance_raw(&address_2, TokenAmount::from_whole(1));
1191
1192        // create 2 larger than selectable chains
1193        for i in 0..BLOCK_MESSAGE_LIMIT {
1194            let msg = create_smsg(
1195                &address_2,
1196                &address_1,
1197                &mut wallet_1,
1198                i as u64,
1199                300_000,
1200                100,
1201            );
1202            mpool.add(msg).unwrap();
1203            // higher has price, those should be preferred and fill the block up to
1204            // the [`CBOR_GEN_LIMIT`] messages.
1205            let msg = create_smsg(
1206                &address_1,
1207                &address_2,
1208                &mut wallet_2,
1209                i as u64,
1210                300_000,
1211                1000,
1212            );
1213            mpool.add(msg).unwrap();
1214        }
1215        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1216        // check that the gas limit is not exceeded
1217        let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1218        assert!(
1219            m_gas_limit <= BLOCK_GAS_LIMIT,
1220            "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1221        );
1222        let bls_msgs = msgs.iter().filter(|m| m.is_bls()).count();
1223        assert_eq!(
1224            CBOR_GEN_LIMIT, bls_msgs,
1225            "Expected {CBOR_GEN_LIMIT} bls messages, got {bls_msgs}."
1226        );
1227        assert_eq!(
1228            msgs.len(),
1229            BLOCK_MESSAGE_LIMIT,
1230            "Expected {BLOCK_MESSAGE_LIMIT} messages, got {}",
1231            msgs.len()
1232        );
1233    }
1234
1235    #[tokio::test]
1236    async fn message_selection_trimming_msgs_two_senders_complex() {
1237        let mut joinset = JoinSet::new();
1238        let mut mpool = make_test_mpool(&mut joinset);
1239        let ts = mock_tipset(&mut mpool).await;
1240        let api = mpool.api.clone();
1241
1242        let keystore_1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1243        let mut wallet_1 = Wallet::new(keystore_1);
1244        let address_1 = wallet_1.generate_addr(SignatureType::Secp256k1).unwrap();
1245
1246        let keystore_2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1247        let mut wallet_2 = Wallet::new(keystore_2);
1248        let address_2 = wallet_2.generate_addr(SignatureType::Bls).unwrap();
1249
1250        api.set_state_balance_raw(&address_1, TokenAmount::from_whole(1));
1251        api.set_state_balance_raw(&address_2, TokenAmount::from_whole(1));
1252
1253        // create two almost max-length chains of equal value
1254        let mut counter = 0;
1255        for i in 0..CBOR_GEN_LIMIT {
1256            counter += 1;
1257            let msg = create_smsg(
1258                &address_2,
1259                &address_1,
1260                &mut wallet_1,
1261                i as u64,
1262                300_000,
1263                100,
1264            );
1265            mpool.add(msg).unwrap();
1266            // higher has price, those should be preferred and fill the block up to
1267            // the [`CBOR_GEN_LIMIT`] messages.
1268            let msg = create_smsg(
1269                &address_1,
1270                &address_2,
1271                &mut wallet_2,
1272                i as u64,
1273                300_000,
1274                100,
1275            );
1276            mpool.add(msg).unwrap();
1277        }
1278
1279        // address_1 8192th message is worth more than address_2 8192th message
1280        let msg = create_smsg(
1281            &address_2,
1282            &address_1,
1283            &mut wallet_1,
1284            counter as u64,
1285            300_000,
1286            1000,
1287        );
1288        mpool.add(msg).unwrap();
1289
1290        let msg = create_smsg(
1291            &address_1,
1292            &address_2,
1293            &mut wallet_2,
1294            counter as u64,
1295            300_000,
1296            100,
1297        );
1298        mpool.add(msg).unwrap();
1299
1300        counter += 1;
1301
1302        // address 2 (uneselectable) message is worth so much!
1303        let msg = create_smsg(
1304            &address_2,
1305            &address_1,
1306            &mut wallet_1,
1307            counter as u64,
1308            400_000,
1309            1_000_000,
1310        );
1311        mpool.add(msg).unwrap();
1312
1313        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1314        // check that the gas limit is not exceeded
1315        let m_gas_limit = msgs.iter().map(|m| m.gas_limit()).sum::<u64>();
1316        assert!(
1317            m_gas_limit <= BLOCK_GAS_LIMIT,
1318            "Selected messages gas limit {m_gas_limit} exceeds block gas limit {BLOCK_GAS_LIMIT}",
1319        );
1320        // We should have taken the SECP chain from address_1.
1321        let secps_len = msgs.iter().filter(|m| m.is_secp256k1()).count();
1322        assert_eq!(
1323            CBOR_GEN_LIMIT, secps_len,
1324            "Expected {CBOR_GEN_LIMIT} secp messages, got {secps_len}."
1325        );
1326        // The remaining messages should be BLS messages.
1327        assert_eq!(
1328            msgs.len(),
1329            BLOCK_MESSAGE_LIMIT,
1330            "Expected {BLOCK_MESSAGE_LIMIT} messages, got {}",
1331            msgs.len()
1332        );
1333    }
1334
1335    #[tokio::test]
1336    async fn message_selection_priority() {
1337        let db = MemoryDB::default();
1338
1339        let mut joinset = JoinSet::new();
1340        let mut mpool = make_test_mpool(&mut joinset);
1341        let ts = mock_tipset(&mut mpool).await;
1342        let api = mpool.api.clone();
1343
1344        let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1345        let mut w1 = Wallet::new(ks1);
1346        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1347
1348        let ks2 = KeyStore::new(KeyStoreConfig::Memory).unwrap();
1349        let mut w2 = Wallet::new(ks2);
1350        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1351
1352        // set priority addrs to a1
1353        let mut mpool_cfg = mpool.get_config().clone();
1354        mpool_cfg.priority_addrs.push(a1);
1355        mpool.set_config(&db, mpool_cfg).unwrap();
1356
1357        // let gas_limit = 6955002;
1358        api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1359        api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1360
1361        let nmsgs = 10;
1362
1363        // make many small chains for the two actors
1364        for i in 0..nmsgs {
1365            let bias = (nmsgs - i) / 3;
1366            let m = create_smsg(
1367                &a2,
1368                &a1,
1369                &mut w1,
1370                i as u64,
1371                TEST_GAS_LIMIT,
1372                (1 + i % 3 + bias) as u64,
1373            );
1374            mpool.add(m).unwrap();
1375            let m = create_smsg(
1376                &a1,
1377                &a2,
1378                &mut w2,
1379                i as u64,
1380                TEST_GAS_LIMIT,
1381                (1 + i % 3 + bias) as u64,
1382            );
1383            mpool.add(m).unwrap();
1384        }
1385
1386        let msgs = mpool.select_messages(&ts, 1.0).unwrap();
1387
1388        assert_eq!(msgs.len(), 20);
1389
1390        let mut next_nonce = 0;
1391        for msg in msgs.iter().take(10) {
1392            assert_eq!(
1393                msg.from(),
1394                a1,
1395                "first 10 returned messages should be from actor a1"
1396            );
1397            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1398            next_nonce += 1;
1399        }
1400        next_nonce = 0;
1401        for msg in msgs.iter().take(20).skip(10) {
1402            assert_eq!(
1403                msg.from(),
1404                a2,
1405                "next 10 returned messages should be from actor a2"
1406            );
1407            assert_eq!(msg.sequence(), next_nonce, "nonce should be in order");
1408            next_nonce += 1;
1409        }
1410    }
1411
1412    #[tokio::test]
1413    async fn test_optimal_msg_selection1() {
1414        // this test uses just a single actor sending messages with a low tq
1415        // the chain dependent merging algorithm should pick messages from the actor
1416        // from the start
1417        let mut joinset = JoinSet::new();
1418        let mut mpool = make_test_mpool(&mut joinset);
1419        let ts = mock_tipset(&mut mpool).await;
1420        let api = mpool.api.clone();
1421
1422        // create two actors
1423        let mut w1 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1424        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1425        let mut w2 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1426        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1427
1428        api.set_state_balance_raw(&a1, TokenAmount::from_whole(1));
1429        api.set_state_balance_raw(&a2, TokenAmount::from_whole(1));
1430
1431        let n_msgs = 10 * crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1432
1433        // we create n_msgs messages from each actor to another, with the first actor paying
1434        // higher gas prices than the second; we expect message selection to
1435        // order his messages first
1436        for i in 0..(n_msgs as usize) {
1437            let bias = (n_msgs as usize - i) / 3;
1438            let m = create_fake_smsg(
1439                &mpool,
1440                &a2,
1441                &a1,
1442                i as u64,
1443                TEST_GAS_LIMIT,
1444                (1 + i % 3 + bias) as u64,
1445            );
1446            mpool.add(m).unwrap();
1447        }
1448
1449        let msgs = mpool.select_messages(&ts, 0.25).unwrap();
1450
1451        let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1452
1453        assert_eq!(msgs.len(), expected_msgs as usize);
1454
1455        for (next_nonce, m) in msgs.into_iter().enumerate() {
1456            assert_eq!(m.from(), a1, "Expected message from a1");
1457            assert_eq!(
1458                m.message().sequence,
1459                next_nonce as u64,
1460                "expected nonce {} but got {}",
1461                next_nonce,
1462                m.message().sequence
1463            );
1464        }
1465    }
1466
1467    #[tokio::test]
1468    async fn test_optimal_msg_selection2() {
1469        let mut joinset = JoinSet::new();
1470        // this test uses two actors sending messages to each other, with the first
1471        // actor paying (much) higher gas premium than the second.
1472        // We select with a low ticket quality; the chain dependent merging algorithm
1473        // should pick messages from the second actor from the start
1474        let mut mpool = make_test_mpool(&mut joinset);
1475        let ts = mock_tipset(&mut mpool).await;
1476        let api = mpool.api.clone();
1477
1478        // create two actors
1479        let mut w1 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1480        let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap();
1481        let mut w2 = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1482        let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap();
1483
1484        api.set_state_balance_raw(&a1, TokenAmount::from_whole(1)); // in FIL
1485        api.set_state_balance_raw(&a2, TokenAmount::from_whole(1)); // in FIL
1486
1487        let n_msgs = 5 * crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1488        for i in 0..n_msgs as usize {
1489            let bias = (n_msgs as usize - i) / 3;
1490            let m = create_fake_smsg(
1491                &mpool,
1492                &a2,
1493                &a1,
1494                i as u64,
1495                TEST_GAS_LIMIT,
1496                (200000 + i % 3 + bias) as u64,
1497            );
1498            mpool.add(m).unwrap();
1499            let m = create_fake_smsg(
1500                &mpool,
1501                &a1,
1502                &a2,
1503                i as u64,
1504                TEST_GAS_LIMIT,
1505                (190000 + i % 3 + bias) as u64,
1506            );
1507            mpool.add(m).unwrap();
1508        }
1509
1510        let msgs = mpool.select_messages(&ts, 0.1).unwrap();
1511
1512        let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1513        assert_eq!(
1514            msgs.len(),
1515            expected_msgs as usize,
1516            "Expected {} messages, but got {}",
1517            expected_msgs,
1518            msgs.len()
1519        );
1520
1521        let mut n_from1 = 0;
1522        let mut n_from2 = 0;
1523        let mut next_nonce1 = 0;
1524        let mut next_nonce2 = 0;
1525
1526        for m in msgs {
1527            if m.from() == a1 {
1528                if m.message.sequence != next_nonce1 {
1529                    panic!(
1530                        "Expected nonce {}, but got {}",
1531                        next_nonce1, m.message.sequence
1532                    );
1533                }
1534                next_nonce1 += 1;
1535                n_from1 += 1;
1536            } else {
1537                if m.message.sequence != next_nonce2 {
1538                    panic!(
1539                        "Expected nonce {}, but got {}",
1540                        next_nonce2, m.message.sequence
1541                    );
1542                }
1543                next_nonce2 += 1;
1544                n_from2 += 1;
1545            }
1546        }
1547
1548        if n_from1 > n_from2 {
1549            panic!("Expected more msgs from a2 than a1");
1550        }
1551    }
1552
1553    #[tokio::test]
1554    async fn test_optimal_msg_selection3() {
1555        let mut joinset = JoinSet::new();
1556        // this test uses 10 actors sending a block of messages to each other, with the
1557        // the first actors paying higher gas premium than the subsequent
1558        // actors. We select with a low ticket quality; the chain dependent
1559        // merging algorithm should pick messages from the median actor from the
1560        // start
1561        let mut mpool = make_test_mpool(&mut joinset);
1562        let ts = mock_tipset(&mut mpool).await;
1563        let api = mpool.api.clone();
1564
1565        let n_actors = 10;
1566
1567        let mut actors = vec![];
1568        let mut wallets = vec![];
1569
1570        for _ in 0..n_actors {
1571            let mut wallet = Wallet::new(KeyStore::new(KeyStoreConfig::Memory).unwrap());
1572            let actor = wallet.generate_addr(SignatureType::Secp256k1).unwrap();
1573
1574            actors.push(actor);
1575            wallets.push(wallet);
1576        }
1577
1578        for a in &mut actors {
1579            api.set_state_balance_raw(a, TokenAmount::from_whole(1));
1580        }
1581
1582        let n_msgs = 1 + crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1583        for i in 0..n_msgs {
1584            for j in 0..n_actors {
1585                let premium =
1586                    500000 + 10000 * (n_actors - j) + (n_msgs + 2 - i) / (30 * n_actors) + i % 3;
1587                let m = create_fake_smsg(
1588                    &mpool,
1589                    &actors[j as usize],
1590                    &actors[j as usize],
1591                    i as u64,
1592                    TEST_GAS_LIMIT,
1593                    premium as u64,
1594                );
1595                mpool.add(m).unwrap();
1596            }
1597        }
1598
1599        let msgs = mpool.select_messages(&ts, 0.1).unwrap();
1600        let expected_msgs = crate::shim::econ::BLOCK_GAS_LIMIT as i64 / TEST_GAS_LIMIT;
1601
1602        assert_eq!(
1603            msgs.len(),
1604            expected_msgs as usize,
1605            "Expected {} messages, but got {}",
1606            expected_msgs,
1607            msgs.len()
1608        );
1609
1610        let who_is = |addr| -> usize {
1611            for (i, a) in actors.iter().enumerate() {
1612                if a == &addr {
1613                    return i;
1614                }
1615            }
1616            // Lotus has -1, but since we don't have -ve indexes, set it some unrealistic
1617            // number
1618            9999999
1619        };
1620
1621        let mut nonces = vec![0; n_actors as usize];
1622        for m in &msgs {
1623            let who = who_is(m.from());
1624            if who < 3 {
1625                panic!("got message from {who}th actor",);
1626            }
1627
1628            let next_nonce: u64 = nonces[who];
1629            if m.message.sequence != next_nonce {
1630                panic!(
1631                    "expected nonce {} but got {}",
1632                    next_nonce, m.message.sequence
1633                );
1634            }
1635            nonces[who] += 1;
1636        }
1637    }
1638}