forest/message_pool/
msg_chain.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3#![allow(clippy::indexing_slicing)]
4use std::{
5    cmp::Ordering,
6    mem,
7    ops::{Index, IndexMut},
8};
9
10use crate::message::{Message, SignedMessage};
11use crate::networks::ChainConfig;
12use crate::shim::{
13    address::Address,
14    econ::TokenAmount,
15    gas::{Gas, price_list_by_network_version},
16};
17use crate::{
18    blocks::{BLOCK_MESSAGE_LIMIT, Tipset},
19    shim::crypto::SignatureType,
20};
21use ahash::HashMap;
22use num_traits::Zero;
23use slotmap::{SlotMap, new_key_type};
24use tracing::warn;
25
26use super::errors::Error;
27use crate::message_pool::{
28    provider::Provider,
29    utils::{get_gas_perf, get_gas_reward},
30};
31
32new_key_type! {
33    pub struct NodeKey;
34}
35
36/// Chains is an abstraction of a list of message chain nodes.
37/// It wraps a `SlotMap` instance. `key_vec` is an additional requirement in
38/// order to satisfy optimal `msg` selection use cases, such as iteration in
39/// insertion order. The `SlotMap` serves as a lookup table for nodes to get
40/// around the borrow checker rules. Each `MsgChainNode` contains only pointers
41/// as `NodeKey` to the entries in the map With this design, we get around the
42/// borrow checker rule issues when implementing the optimal selection
43/// algorithm.
44pub(in crate::message_pool) struct Chains {
45    pub map: SlotMap<NodeKey, MsgChainNode>,
46    pub key_vec: Vec<NodeKey>,
47}
48
49impl Chains {
50    // Sort by effective perf with cmp_effective
51    pub(in crate::message_pool) fn sort_effective(&mut self) {
52        let mut chains = mem::take(&mut self.key_vec);
53        chains.sort_by(|a, b| {
54            let a = self.map.get(*a).unwrap();
55            let b = self.map.get(*b).unwrap();
56            a.cmp_effective(b)
57        });
58        let _ = mem::replace(&mut self.key_vec, chains);
59    }
60
61    // Sort by effective `perf` on a range
62    pub(in crate::message_pool) fn sort_range_effective(
63        &mut self,
64        range: std::ops::RangeFrom<usize>,
65    ) {
66        let mut chains = mem::take(&mut self.key_vec);
67        chains[range].sort_by(|a, b| {
68            self.map
69                .get(*a)
70                .unwrap()
71                .cmp_effective(self.map.get(*b).unwrap())
72        });
73        let _ = mem::replace(&mut self.key_vec, chains);
74    }
75
76    /// Retrieves the `msg` chain node by the given `NodeKey` along with the
77    /// data required from previous chain (if exists) to set effective
78    /// performance of this node.
79    pub(in crate::message_pool) fn get_mut_with_prev_eff(
80        &mut self,
81        k: NodeKey,
82    ) -> (Option<&mut MsgChainNode>, Option<(f64, u64)>) {
83        let node = self.map.get(k);
84        let prev = if let Some(node) = node {
85            if let Some(prev_key) = node.prev {
86                let prev_node = self.map.get(prev_key).unwrap();
87                Some((prev_node.eff_perf, prev_node.gas_limit))
88            } else {
89                None
90            }
91        } else {
92            None
93        };
94
95        let node = self.map.get_mut(k);
96        (node, prev)
97    }
98
99    /// Retrieves the `msg` chain node by the given `NodeKey`
100    pub(in crate::message_pool) fn get(&self, k: NodeKey) -> Option<&MsgChainNode> {
101        self.map.get(k)
102    }
103}
104
105impl Chains {
106    pub(in crate::message_pool) fn new() -> Self {
107        Self {
108            map: SlotMap::with_key(),
109            key_vec: vec![],
110        }
111    }
112
113    /// Pushes a `msg` chain node into slot map and places the key in the
114    /// `node_vec` passed as parameter.
115    pub(in crate::message_pool) fn push_with(
116        &mut self,
117        cur_chain: MsgChainNode,
118        node_vec: &mut Vec<NodeKey>,
119    ) {
120        let key = self.map.insert(cur_chain);
121        node_vec.push(key);
122    }
123
124    /// Sorts the chains with `compare` method. If rev is true, sorts in
125    /// descending order.
126    pub(in crate::message_pool) fn sort(&mut self, rev: bool) {
127        // replace dance to get around borrow checker
128        let mut chains = mem::take(&mut self.key_vec);
129        chains.sort_by(|a, b| {
130            let a = self.map.get(*a).unwrap();
131            let b = self.map.get(*b).unwrap();
132            if rev { b.compare(a) } else { a.compare(b) }
133        });
134        let _ = mem::replace(&mut self.key_vec, chains);
135    }
136
137    /// Retrieves the `msg` chain node by the given `NodeKey`
138    pub(in crate::message_pool) fn get_mut(&mut self, k: NodeKey) -> Option<&mut MsgChainNode> {
139        self.map.get_mut(k)
140    }
141
142    /// Retrieves the `msg` chain node at the given index
143    pub(in crate::message_pool) fn get_mut_at(&mut self, i: usize) -> Option<&mut MsgChainNode> {
144        let key = self.key_vec.get(i)?;
145        self.get_mut(*key)
146    }
147
148    // Retrieves a msg chain node at the given index in the provided NodeKey vec
149    pub(in crate::message_pool) fn get_from(&self, i: usize, vec: &[NodeKey]) -> &MsgChainNode {
150        #[allow(clippy::indexing_slicing)]
151        self.map.get(vec[i]).unwrap()
152    }
153
154    // Retrieves a msg chain node at the given index in the provided NodeKey vec
155    pub(in crate::message_pool) fn get_mut_from(
156        &mut self,
157        i: usize,
158        vec: &[NodeKey],
159    ) -> &mut MsgChainNode {
160        #[allow(clippy::indexing_slicing)]
161        self.map.get_mut(vec[i]).unwrap()
162    }
163
164    // Retrieves the node key at the given index
165    pub(in crate::message_pool) fn get_key_at(&self, i: usize) -> Option<NodeKey> {
166        self.key_vec.get(i).copied()
167    }
168
169    /// Retrieves the `msg` chain node at the given index. Returns `None` if index is out-of-bounds.
170    pub(in crate::message_pool) fn get_at(&self, i: usize) -> Option<&MsgChainNode> {
171        self.map.get(self.get_key_at(i)?)
172    }
173
174    /// Retrieves the amount of items.
175    pub(in crate::message_pool) fn len(&self) -> usize {
176        self.map.len()
177    }
178
179    /// Returns true is the chain is empty and otherwise. We check the map as
180    /// the source of truth as `key_vec` can be extended time to time.
181    pub(in crate::message_pool) fn is_empty(&self) -> bool {
182        self.map.is_empty()
183    }
184
185    /// Removes messages from the given index and resets effective `perfs`
186    #[tracing::instrument(skip_all, level = "debug")]
187    pub(in crate::message_pool) fn trim_msgs_at(
188        &mut self,
189        idx: usize,
190        gas_limit: u64,
191        msg_limit: usize,
192        base_fee: &TokenAmount,
193    ) {
194        let prev = match idx {
195            0 => None,
196            _ => self
197                .get_at(idx - 1)
198                .map(|prev| (prev.eff_perf, prev.gas_limit)),
199        };
200        let chain_node = self.get_mut_at(idx).unwrap();
201        let mut i = chain_node.msgs.len() as i64 - 1;
202
203        while i >= 0
204            && (chain_node.gas_limit > gas_limit
205                || chain_node.gas_perf < 0.0
206                || i >= msg_limit as i64)
207        {
208            #[allow(clippy::indexing_slicing)]
209            let msg = &chain_node.msgs[i as usize];
210            let gas_reward = get_gas_reward(msg, base_fee);
211            chain_node.gas_reward -= gas_reward;
212            chain_node.gas_limit = chain_node.gas_limit.saturating_sub(msg.gas_limit());
213            if chain_node.gas_limit > 0 {
214                chain_node.gas_perf = get_gas_perf(&chain_node.gas_reward, chain_node.gas_limit);
215                if chain_node.bp != 0.0 {
216                    chain_node.set_eff_perf(prev);
217                }
218            } else {
219                chain_node.gas_perf = 0.0;
220                chain_node.eff_perf = 0.0;
221            }
222            i -= 1;
223        }
224
225        if i < 0 {
226            chain_node.msgs.clear();
227            chain_node.valid = false;
228        } else {
229            chain_node.msgs.truncate(i as usize + 1);
230        }
231
232        let next = chain_node.next;
233        if next.is_some() {
234            self.invalidate(next);
235        }
236    }
237
238    pub(in crate::message_pool) fn invalidate(&mut self, mut key: Option<NodeKey>) {
239        let mut next_keys = vec![];
240
241        while let Some(nk) = key {
242            let chain_node = self.map.get(nk).unwrap();
243            next_keys.push(nk);
244            key = chain_node.next;
245        }
246
247        for k in next_keys.iter().rev() {
248            if let Some(node) = self.map.get_mut(*k) {
249                node.valid = false;
250                node.msgs.clear();
251                node.next = None;
252            }
253        }
254    }
255
256    /// Drops nodes which are no longer valid after the merge step
257    pub(in crate::message_pool) fn drop_invalid(&mut self, key_vec: &mut Vec<NodeKey>) {
258        let mut valid_keys = vec![];
259        for k in key_vec.iter() {
260            if self.map.get(*k).map(|n| n.valid).unwrap() {
261                valid_keys.push(*k);
262            } else {
263                self.map.remove(*k);
264            }
265        }
266
267        *key_vec = valid_keys;
268    }
269}
270
271impl Index<usize> for Chains {
272    type Output = MsgChainNode;
273    fn index(&self, i: usize) -> &Self::Output {
274        self.get_at(i).unwrap()
275    }
276}
277
278impl IndexMut<usize> for Chains {
279    fn index_mut(&mut self, i: usize) -> &mut Self::Output {
280        #[allow(clippy::indexing_slicing)]
281        self.map.get_mut(self.key_vec[i]).unwrap()
282    }
283}
284
285/// Represents a node in the `MsgChain`.
286#[derive(Clone, Debug)]
287pub struct MsgChainNode {
288    pub msgs: Vec<SignedMessage>,
289    pub gas_reward: TokenAmount,
290    pub gas_limit: u64,
291    pub gas_perf: f64,
292    pub eff_perf: f64,
293    pub bp: f64,
294    pub parent_offset: f64,
295    pub valid: bool,
296    pub merged: bool,
297    pub next: Option<NodeKey>,
298    pub prev: Option<NodeKey>,
299    pub sig_type: Option<SignatureType>,
300}
301
302impl MsgChainNode {
303    pub fn compare(&self, other: &Self) -> Ordering {
304        if approx_cmp(self.gas_perf, other.gas_perf) == Ordering::Greater
305            || approx_cmp(self.gas_perf, other.gas_perf) == Ordering::Equal
306                && self.gas_reward.cmp(&other.gas_reward) == Ordering::Greater
307        {
308            return Ordering::Greater;
309        }
310
311        Ordering::Less
312    }
313
314    pub fn set_eff_perf(&mut self, prev: Option<(f64, u64)>) {
315        let mut eff_perf = self.gas_perf * self.bp;
316        if let Some(prev) = prev
317            && eff_perf > 0.0
318        {
319            let prev_eff_perf = prev.0;
320            let prev_gas_limit = prev.1;
321            let eff_perf_with_parent = (eff_perf * self.gas_limit as f64
322                + prev_eff_perf * prev_gas_limit as f64)
323                / (self.gas_limit + prev_gas_limit) as f64;
324            self.parent_offset = eff_perf - eff_perf_with_parent;
325            eff_perf = eff_perf_with_parent;
326        }
327        self.eff_perf = eff_perf;
328    }
329}
330
331impl MsgChainNode {
332    pub(in crate::message_pool) fn cmp_effective(&self, other: &Self) -> Ordering {
333        // Highest priority: merged
334        // Comment from Lotus: move merged chains to the front so we can discard them earlier
335        // Note: both cases need to be checked to ensure total ordering. Without it, the standard
336        // libraries' sorting methods may panic (since Rust 1.81).
337        match (self.merged, other.merged) {
338            (true, false) => return Ordering::Greater,
339            (false, true) => return Ordering::Less,
340            _ => {}
341        }
342
343        if self.gas_perf >= 0.0 && other.gas_perf < 0.0
344            || self.eff_perf > other.eff_perf
345            || (approx_cmp(self.eff_perf, other.eff_perf) == Ordering::Equal
346                && self.gas_perf > other.gas_perf)
347            || (approx_cmp(self.eff_perf, other.eff_perf) == Ordering::Equal
348                && approx_cmp(self.gas_perf, other.gas_perf) == Ordering::Equal
349                && self.gas_reward > other.gas_reward)
350        {
351            return Ordering::Greater;
352        }
353
354        Ordering::Less
355    }
356
357    pub fn set_null_effective_perf(&mut self) {
358        if self.gas_perf < 0.0 {
359            self.eff_perf = self.gas_perf;
360        } else {
361            self.eff_perf = 0.0;
362        }
363    }
364}
365
366impl std::default::Default for MsgChainNode {
367    fn default() -> Self {
368        Self {
369            msgs: vec![],
370            gas_reward: TokenAmount::zero(),
371            gas_limit: 0,
372            gas_perf: 0.0,
373            eff_perf: 0.0,
374            bp: 0.0,
375            parent_offset: 0.0,
376            valid: true,
377            merged: false,
378            next: None,
379            prev: None,
380            sig_type: None,
381        }
382    }
383}
384
385pub(in crate::message_pool) fn create_message_chains<T>(
386    api: &T,
387    actor: &Address,
388    mset: &HashMap<u64, SignedMessage>,
389    base_fee: &TokenAmount,
390    ts: &Tipset,
391    chains: &mut Chains,
392    chain_config: &ChainConfig,
393) -> Result<(), Error>
394where
395    T: Provider,
396{
397    // collect all messages and sort
398    let mut msgs: Vec<SignedMessage> = mset.values().cloned().collect();
399    msgs.sort_by_key(|v| v.sequence());
400
401    // sanity checks:
402    // - there can be no gaps in nonces, starting from the current actor nonce if
403    //   there is a gap, drop messages after the gap, we can't include them
404    // - all messages must have minimum gas and the total gas for the candidate
405    //   messages cannot exceed the block limit; drop all messages that exceed the
406    //   limit
407    // - the total gasReward cannot exceed the actor's balance; drop all messages
408    //   that exceed the balance
409    let Ok(actor_state) = api.get_actor_after(actor, ts) else {
410        tracing::warn!("failed to load actor state, not building chain for {actor}");
411        return Ok(());
412    };
413    let mut cur_seq = actor_state.sequence;
414    let mut balance: TokenAmount = TokenAmount::from(&actor_state.balance);
415
416    let mut gas_limit = 0;
417    let mut skip = 0;
418    let mut i = 0;
419    let mut rewards = Vec::with_capacity(msgs.len());
420
421    while let Some(m) = msgs.get(i) {
422        if m.sequence() < cur_seq {
423            warn!(
424                "encountered message from actor {} with nonce {} less than the current nonce {}",
425                actor,
426                m.sequence(),
427                cur_seq
428            );
429            skip += 1;
430            i += 1;
431            continue;
432        }
433
434        if m.sequence() != cur_seq {
435            break;
436        }
437        cur_seq += 1;
438
439        let network_version = chain_config.network_version(ts.epoch());
440
441        let min_gas = price_list_by_network_version(network_version)
442            .on_chain_message(m.chain_length()?)
443            .total();
444
445        if Gas::new(m.gas_limit()) < min_gas {
446            break;
447        }
448        gas_limit += m.gas_limit();
449        if gas_limit > crate::shim::econ::BLOCK_GAS_LIMIT {
450            break;
451        }
452
453        let required = m.required_funds();
454        if balance < required {
455            break;
456        }
457
458        balance -= required;
459        let value = m.value();
460        balance -= value;
461
462        let gas_reward = get_gas_reward(m, base_fee);
463        rewards.push(gas_reward);
464        i += 1;
465    }
466
467    // check we have a sane set of messages to construct the chains
468    let mut msgs = if i > skip {
469        #[allow(clippy::indexing_slicing)]
470        msgs[skip..i].to_vec()
471    } else {
472        return Ok(());
473    };
474
475    // if we have more messages from this sender than can fit in a block, drop the extra ones
476    if msgs.len() > BLOCK_MESSAGE_LIMIT {
477        warn!(
478            "dropping {} messages from {actor} as they exceed the block message limit of {BLOCK_MESSAGE_LIMIT}",
479            msgs.len() - BLOCK_MESSAGE_LIMIT,
480        );
481        msgs.truncate(BLOCK_MESSAGE_LIMIT);
482    };
483
484    let mut cur_chain = MsgChainNode::default();
485    let mut node_vec = vec![];
486
487    let new_chain = |m: SignedMessage, reward: &TokenAmount| -> MsgChainNode {
488        let gl = m.gas_limit();
489        let sig_type = Some(m.signature().sig_type);
490        MsgChainNode {
491            msgs: vec![m],
492            gas_reward: reward.clone(),
493            gas_limit: gl,
494            gas_perf: get_gas_perf(reward, gl),
495            eff_perf: 0.0,
496            bp: 0.0,
497            parent_offset: 0.0,
498            valid: true,
499            merged: false,
500            prev: None,
501            next: None,
502            sig_type,
503        }
504    };
505
506    // creates msg chain nodes in chunks based on gas_perf obtained from the current
507    // chain's gas limit.
508    for (i, (m, reward)) in msgs.into_iter().zip(rewards.iter()).enumerate() {
509        if i == 0 {
510            cur_chain = new_chain(m, reward);
511            continue;
512        }
513
514        let gas_reward = cur_chain.gas_reward.clone() + reward;
515        let gas_limit = cur_chain.gas_limit + m.gas_limit();
516        let gas_perf = get_gas_perf(&gas_reward, gas_limit);
517
518        // try to add the message to the current chain -- if it decreases the gasPerf,
519        // then make a new chain
520        if gas_perf < cur_chain.gas_perf {
521            chains.push_with(cur_chain, &mut node_vec);
522            cur_chain = new_chain(m, reward);
523        } else {
524            cur_chain.msgs.push(m);
525            cur_chain.gas_reward = gas_reward;
526            cur_chain.gas_limit = gas_limit;
527            cur_chain.gas_perf = gas_perf;
528        }
529    }
530
531    chains.push_with(cur_chain, &mut node_vec);
532
533    // merge chains to maintain the invariant: higher gas perf nodes on the front.
534    loop {
535        let mut merged = 0;
536        for i in (1..node_vec.len()).rev() {
537            if chains.get_from(i, &node_vec).gas_perf >= chains.get_from(i - 1, &node_vec).gas_perf
538            {
539                // copy messages
540                let chain_i_msg = chains.get_from(i, &node_vec).msgs.clone();
541                chains
542                    .get_mut_from(i - 1, &node_vec)
543                    .msgs
544                    .extend(chain_i_msg);
545
546                // set gas reward
547                let chain_i_gas_reward = chains.get_from(i, &node_vec).gas_reward.clone();
548                chains.get_mut_from(i - 1, &node_vec).gas_reward += chain_i_gas_reward;
549
550                // set gas limit
551                let chain_i_gas_limit = chains.get_from(i, &node_vec).gas_limit;
552                chains.get_mut_from(i - 1, &node_vec).gas_limit += chain_i_gas_limit;
553
554                // set gas perf
555                let chain_i_gas_perf = get_gas_perf(
556                    &chains.get_from(i - 1, &node_vec).gas_reward,
557                    chains.get_from(i - 1, &node_vec).gas_limit,
558                );
559                chains.get_mut_from(i - 1, &node_vec).gas_perf = chain_i_gas_perf;
560                // invalidate the current chain as it is merged with the prev chain
561                chains.get_mut_from(i, &node_vec).valid = false;
562                merged += 1;
563            }
564        }
565
566        if merged == 0 {
567            break;
568        }
569
570        chains.drop_invalid(&mut node_vec);
571    }
572
573    if node_vec.len() > 1 {
574        for (&k1, &k2) in node_vec.iter().zip(node_vec.iter().skip(1)) {
575            // link next pointers
576            let n1 = chains
577                .get_mut(k1)
578                .ok_or_else(|| Error::Other(format!("{k1:?} should present in `chains`")))?;
579            n1.next = Some(k2);
580            // Should we link or clear n1.prev as well?
581
582            // link prev pointers
583            let n2 = chains
584                .get_mut(k2)
585                .ok_or_else(|| Error::Other(format!("{k2:?} should present in `chains`")))?;
586            n2.prev = Some(k1);
587            // Should we link or clear n2.next as well?
588        }
589    }
590
591    // Update the main chain key_vec with this node_vec
592    chains.key_vec.extend(node_vec);
593
594    Ok(())
595}
596
597fn approx_cmp(a: f64, b: f64) -> Ordering {
598    if (a - b).abs() <= (a * f64::EPSILON).abs() {
599        Ordering::Equal
600    } else {
601        a.partial_cmp(&b).unwrap()
602    }
603}