forest/message_pool/msgpool/
provider.rs1use std::sync::Arc;
5
6use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey};
7use crate::chain::HeadChange;
8use crate::message::{ChainMessage, SignedMessage};
9use crate::message_pool::msg_pool::{
10 MAX_ACTOR_PENDING_MESSAGES, MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES,
11};
12use crate::networks::Height;
13use crate::shim::{
14 address::Address,
15 econ::TokenAmount,
16 message::Message,
17 state_tree::{ActorState, StateTree},
18};
19use crate::state_manager::StateManager;
20use crate::utils::db::CborStoreExt;
21use async_trait::async_trait;
22use cid::Cid;
23use fvm_ipld_blockstore::Blockstore;
24use tokio::sync::broadcast::{Receiver as Subscriber, Sender as Publisher};
25
26use crate::message_pool::errors::Error;
27
28#[async_trait]
32pub trait Provider {
33 fn subscribe_head_changes(&self) -> Subscriber<HeadChange>;
35 fn get_heaviest_tipset(&self) -> Tipset;
37 fn put_message(&self, msg: &ChainMessage) -> Result<Cid, Error>;
40 fn get_actor_after(&self, addr: &Address, ts: &Tipset) -> Result<ActorState, Error>;
44 fn messages_for_block(
46 &self,
47 h: &CachingBlockHeader,
48 ) -> Result<(Vec<Message>, Vec<SignedMessage>), Error>;
49 fn load_tipset(&self, tsk: &TipsetKey) -> Result<Tipset, Error>;
51 fn chain_compute_base_fee(&self, ts: &Tipset) -> Result<TokenAmount, Error>;
53 fn max_actor_pending_messages(&self) -> u64 {
55 MAX_ACTOR_PENDING_MESSAGES
56 }
57 fn max_untrusted_actor_pending_messages(&self) -> u64 {
59 MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES
60 }
61}
62
63#[derive(derive_more::Constructor)]
66pub struct MpoolRpcProvider<DB> {
67 subscriber: Publisher<HeadChange>,
68 sm: Arc<StateManager<DB>>,
69}
70
71#[async_trait]
72impl<DB> Provider for MpoolRpcProvider<DB>
73where
74 DB: Blockstore + Sync + Send + 'static,
75{
76 fn subscribe_head_changes(&self) -> Subscriber<HeadChange> {
77 self.subscriber.subscribe()
78 }
79
80 fn get_heaviest_tipset(&self) -> Tipset {
81 self.sm.chain_store().heaviest_tipset()
82 }
83
84 fn put_message(&self, msg: &ChainMessage) -> Result<Cid, Error> {
85 let cid = self
86 .sm
87 .blockstore()
88 .put_cbor_default(msg)
89 .map_err(|err| Error::Other(err.to_string()))?;
90 Ok(cid)
91 }
92
93 fn get_actor_after(&self, addr: &Address, ts: &Tipset) -> Result<ActorState, Error> {
94 let state = StateTree::new_from_root(self.sm.blockstore_owned(), ts.parent_state())
95 .map_err(|e| Error::Other(e.to_string()))?;
96 Ok(state.get_required_actor(addr)?)
97 }
98
99 fn messages_for_block(
100 &self,
101 h: &CachingBlockHeader,
102 ) -> Result<(Vec<Message>, Vec<SignedMessage>), Error> {
103 crate::chain::block_messages(self.sm.blockstore(), h).map_err(|err| err.into())
104 }
105
106 fn load_tipset(&self, tsk: &TipsetKey) -> Result<Tipset, Error> {
107 Ok(self
108 .sm
109 .chain_store()
110 .chain_index()
111 .load_required_tipset(tsk)?)
112 }
113
114 fn chain_compute_base_fee(&self, ts: &Tipset) -> Result<TokenAmount, Error> {
115 let smoke_height = self.sm.chain_config().epoch(Height::Smoke);
116 crate::chain::compute_base_fee(self.sm.blockstore(), ts, smoke_height)
117 .map_err(|err| err.into())
118 }
119}