Skip to main content

forest/message_pool/msgpool/
provider.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey};
5use crate::chain::index::ResolveNullTipset;
6use crate::chain::{ChainStore, HeadChanges};
7use crate::message::{ChainMessage, SignedMessage};
8use crate::message_pool::errors::Error;
9use crate::message_pool::msg_pool::{
10    MAX_ACTOR_PENDING_MESSAGES, MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES,
11};
12use crate::networks::Height;
13use crate::shim::{
14    address::{Address, Protocol::*},
15    econ::TokenAmount,
16    message::Message,
17    state_tree::{ActorState, StateTree},
18};
19use crate::utils::db::CborStoreExt;
20use auto_impl::auto_impl;
21use cid::Cid;
22use fvm_ipld_blockstore::Blockstore;
23use std::sync::Arc;
24use tokio::sync::broadcast;
25
26/// Provider Trait. This trait will be used by the message pool to interact with
27/// some medium in order to do the operations that are listed below that are
28/// required for the message pool.
29#[auto_impl(Arc)]
30pub trait Provider {
31    /// Update `Mpool`'s `cur_tipset` whenever there is a change to the provider
32    fn subscribe_head_changes(&self) -> broadcast::Receiver<HeadChanges>;
33    /// Get the heaviest Tipset in the provider
34    fn get_heaviest_tipset(&self) -> Tipset;
35    /// Add a message to the `MpoolProvider`, return either Cid or Error
36    /// depending on successful put
37    fn put_message(&self, msg: &ChainMessage) -> Result<Cid, Error>;
38    /// Return state actor for given address given the tipset that the a temp
39    /// `StateTree` will be rooted at. Return `ActorState` or Error
40    /// depending on whether or not `ActorState` is found
41    fn get_actor_after(&self, addr: &Address, ts: &Tipset) -> Result<ActorState, Error>;
42    /// Return the signed messages for given block header
43    fn messages_for_block(
44        &self,
45        h: &CachingBlockHeader,
46    ) -> Result<(Vec<Message>, Vec<SignedMessage>), Error>;
47    /// Return a tipset given the tipset keys from the `ChainStore`
48    fn load_tipset(&self, tsk: &TipsetKey) -> Result<Tipset, Error>;
49    /// Computes the base fee
50    fn chain_compute_base_fee(&self, ts: &Tipset) -> Result<TokenAmount, Error>;
51    /// Similar to [`crate::state_manager::StateManager::resolve_to_deterministic_address`] but fails if the ID address being resolved isn't reorg-stable yet.
52    /// It should not be used for consensus-critical subsystems.
53    fn resolve_to_deterministic_address_at_finality(
54        &self,
55        addr: &Address,
56        ts: &Tipset,
57    ) -> Result<Address, Error>;
58    /// Return all messages included in the given tipset.
59    fn messages_for_tipset(&self, ts: &Tipset) -> Result<Arc<Vec<ChainMessage>>, Error>;
60    // Get max number of messages per actor in the pool
61    fn max_actor_pending_messages(&self) -> u64 {
62        MAX_ACTOR_PENDING_MESSAGES
63    }
64    // Get max number of messages per actor in the pool for untrusted sources
65    fn max_untrusted_actor_pending_messages(&self) -> u64 {
66        MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES
67    }
68}
69
70impl<DB: Blockstore> Provider for ChainStore<DB> {
71    fn subscribe_head_changes(&self) -> broadcast::Receiver<HeadChanges> {
72        self.subscribe_head_changes()
73    }
74
75    fn get_heaviest_tipset(&self) -> Tipset {
76        self.heaviest_tipset()
77    }
78
79    fn put_message(&self, msg: &ChainMessage) -> Result<Cid, Error> {
80        let cid = self
81            .blockstore()
82            .put_cbor_default(msg)
83            .map_err(|err| Error::Other(err.to_string()))?;
84        Ok(cid)
85    }
86
87    fn get_actor_after(&self, addr: &Address, ts: &Tipset) -> Result<ActorState, Error> {
88        let state = StateTree::new_from_root(self.blockstore().clone(), ts.parent_state())
89            .map_err(|e| Error::Other(e.to_string()))?;
90        Ok(state.get_required_actor(addr)?)
91    }
92
93    fn messages_for_block(
94        &self,
95        h: &CachingBlockHeader,
96    ) -> Result<(Vec<Message>, Vec<SignedMessage>), Error> {
97        crate::chain::block_messages(self.blockstore(), h).map_err(|err| err.into())
98    }
99
100    fn load_tipset(&self, tsk: &TipsetKey) -> Result<Tipset, Error> {
101        Ok(self.chain_index().load_required_tipset(tsk)?)
102    }
103
104    fn chain_compute_base_fee(&self, ts: &Tipset) -> Result<TokenAmount, Error> {
105        let smoke_height = self.chain_config().epoch(Height::Smoke);
106        let firehorse_height = self.chain_config().epoch(Height::FireHorse);
107        crate::chain::compute_base_fee(self.blockstore(), ts, smoke_height, firehorse_height)
108            .map_err(|err| err.into())
109    }
110
111    fn resolve_to_deterministic_address_at_finality(
112        &self,
113        addr: &Address,
114        ts: &Tipset,
115    ) -> Result<Address, Error> {
116        match addr.protocol() {
117            BLS | Secp256k1 | Delegated => Ok(*addr),
118            Actor => Err(Error::Other(
119                "Cannot resolve actor address to key address".into(),
120            )),
121            _ => {
122                let lookback_ts = if ts.epoch() > self.chain_config().policy.chain_finality {
123                    self.chain_index()
124                        .tipset_by_height(
125                            ts.epoch() - self.chain_config().policy.chain_finality,
126                            ts.clone(),
127                            ResolveNullTipset::TakeOlder,
128                        )
129                        .map_err(|e| Error::Other(e.to_string()))?
130                } else {
131                    // Matches the logic at <https://github.com/filecoin-project/lotus/blob/v1.35.1/chain/stmgr/stmgr.go#L361>
132                    ts.clone()
133                };
134
135                let state =
136                    StateTree::new_from_root(self.blockstore().clone(), lookback_ts.parent_state())
137                        .map_err(|e| Error::Other(e.to_string()))?;
138                state
139                    .resolve_to_deterministic_addr(self.blockstore(), *addr)
140                    .map_err(|e| Error::Other(e.to_string()))
141            }
142        }
143    }
144
145    fn messages_for_tipset(&self, ts: &Tipset) -> Result<Arc<Vec<ChainMessage>>, Error> {
146        ChainStore::messages_for_tipset(self, ts).map_err(Into::into)
147    }
148}