Skip to main content

forest/rpc/methods/
mpool.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::gas::estimate_message_gas;
5use crate::db::EthMappingsStore;
6use crate::lotus_json::NotNullVec;
7use crate::message::SignedMessage;
8use crate::rpc::error::ServerError;
9use crate::rpc::types::{ApiTipsetKey, MessageSendSpec};
10use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod};
11use crate::shim::{
12    address::{Address, Protocol},
13    message::Message,
14};
15use ahash::{HashSet, HashSetExt as _};
16use cid::Cid;
17use enumflags2::BitFlags;
18use fvm_ipld_blockstore::Blockstore;
19
20/// Gets next nonce for the specified sender.
21pub enum MpoolGetNonce {}
22impl RpcMethod<1> for MpoolGetNonce {
23    const NAME: &'static str = "Filecoin.MpoolGetNonce";
24    const PARAM_NAMES: [&'static str; 1] = ["address"];
25    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
26    const PERMISSION: Permission = Permission::Read;
27    const DESCRIPTION: Option<&'static str> =
28        Some("Returns the current nonce for the specified address.");
29
30    type Params = (Address,);
31    type Ok = u64;
32
33    async fn handle(
34        ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
35        (address,): Self::Params,
36        _: &http::Extensions,
37    ) -> Result<Self::Ok, ServerError> {
38        Ok(ctx.mpool.get_sequence(&address)?)
39    }
40}
41
42/// Return `Vec` of pending messages in `mpool`
43pub enum MpoolPending {}
44impl RpcMethod<1> for MpoolPending {
45    const NAME: &'static str = "Filecoin.MpoolPending";
46    const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
47    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
48    const PERMISSION: Permission = Permission::Read;
49    const DESCRIPTION: Option<&'static str> =
50        Some("Returns the pending messages for a given tipset.");
51
52    type Params = (ApiTipsetKey,);
53    type Ok = NotNullVec<SignedMessage>;
54
55    async fn handle(
56        ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
57        (ApiTipsetKey(tipset_key),): Self::Params,
58        _: &http::Extensions,
59    ) -> Result<Self::Ok, ServerError> {
60        let mut ts = ctx
61            .chain_store()
62            .load_required_tipset_or_heaviest(&tipset_key)?;
63
64        let (mut pending, mpts) = ctx.mpool.pending();
65
66        let mut have_cids = HashSet::new();
67        for item in pending.iter() {
68            have_cids.insert(item.cid());
69        }
70
71        if mpts.epoch() > ts.epoch() {
72            return Ok(NotNullVec(pending.into_iter().collect()));
73        }
74
75        loop {
76            if mpts.epoch() == ts.epoch() {
77                if mpts == ts {
78                    break;
79                }
80
81                // mpts has different blocks than ts
82                let have = ctx
83                    .mpool
84                    .as_ref()
85                    .messages_for_blocks(ts.block_headers().iter())?;
86
87                for sm in have {
88                    have_cids.insert(sm.cid());
89                }
90            }
91
92            let msgs = ctx
93                .mpool
94                .as_ref()
95                .messages_for_blocks(ts.block_headers().iter())?;
96
97            for m in msgs {
98                if have_cids.contains(&m.cid()) {
99                    continue;
100                }
101
102                have_cids.insert(m.cid());
103                pending.push(m);
104            }
105
106            if mpts.epoch() >= ts.epoch() {
107                break;
108            }
109
110            ts = ctx.chain_index().load_required_tipset(ts.parents())?;
111        }
112        Ok(NotNullVec(pending.into_iter().collect()))
113    }
114}
115
116/// Return `Vec` of pending messages for inclusion in the next block
117pub enum MpoolSelect {}
118impl RpcMethod<2> for MpoolSelect {
119    const NAME: &'static str = "Filecoin.MpoolSelect";
120    const PARAM_NAMES: [&'static str; 2] = ["tipsetKey", "ticketQuality"];
121    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
122    const PERMISSION: Permission = Permission::Read;
123    const DESCRIPTION: Option<&'static str> =
124        Some("Returns a list of pending messages for inclusion in the next block.");
125
126    type Params = (ApiTipsetKey, f64);
127    type Ok = Vec<SignedMessage>;
128
129    async fn handle(
130        ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
131        (ApiTipsetKey(tipset_key), ticket_quality): Self::Params,
132        _: &http::Extensions,
133    ) -> Result<Self::Ok, ServerError> {
134        let ts = ctx
135            .chain_store()
136            .load_required_tipset_or_heaviest(&tipset_key)?;
137        Ok(ctx.mpool.select_messages(&ts, ticket_quality)?)
138    }
139}
140
141/// Add `SignedMessage` to `mpool`, return message CID
142pub enum MpoolPush {}
143impl RpcMethod<1> for MpoolPush {
144    const NAME: &'static str = "Filecoin.MpoolPush";
145    const PARAM_NAMES: [&'static str; 1] = ["message"];
146    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
147    const PERMISSION: Permission = Permission::Write;
148    const DESCRIPTION: Option<&'static str> = Some("Adds a signed message to the message pool.");
149
150    type Params = (SignedMessage,);
151    type Ok = Cid;
152
153    async fn handle(
154        ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
155        (message,): Self::Params,
156        _: &http::Extensions,
157    ) -> Result<Self::Ok, ServerError> {
158        let cid = ctx.mpool.as_ref().push(message).await?;
159        Ok(cid)
160    }
161}
162
163/// Add a batch of `SignedMessage`s to `mpool`, return message CIDs
164pub enum MpoolBatchPush {}
165impl RpcMethod<1> for MpoolBatchPush {
166    const NAME: &'static str = "Filecoin.MpoolBatchPush";
167    const PARAM_NAMES: [&'static str; 1] = ["messages"];
168    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
169    const PERMISSION: Permission = Permission::Write;
170    const DESCRIPTION: Option<&'static str> =
171        Some("Adds a set of signed messages to the message pool.");
172
173    type Params = (Vec<SignedMessage>,);
174    type Ok = Vec<Cid>;
175
176    async fn handle(
177        ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
178        (messages,): Self::Params,
179        _: &http::Extensions,
180    ) -> Result<Self::Ok, ServerError> {
181        let mut cids = vec![];
182        for msg in messages {
183            cids.push(ctx.mpool.as_ref().push(msg).await?);
184        }
185        Ok(cids)
186    }
187}
188
189/// Add `SignedMessage` from untrusted source to `mpool`, return message CID
190pub enum MpoolPushUntrusted {}
191impl RpcMethod<1> for MpoolPushUntrusted {
192    const NAME: &'static str = "Filecoin.MpoolPushUntrusted";
193    const PARAM_NAMES: [&'static str; 1] = ["message"];
194    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
195    const PERMISSION: Permission = Permission::Write;
196    const DESCRIPTION: Option<&'static str> =
197        Some("Adds a message to the message pool with verification checks.");
198
199    type Params = (SignedMessage,);
200    type Ok = Cid;
201
202    async fn handle(
203        ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
204        (message,): Self::Params,
205        _: &http::Extensions,
206    ) -> Result<Self::Ok, ServerError> {
207        // Lotus implements a few extra sanity checks that we skip. We skip them
208        // because those checks aren't used for messages received from peers and
209        // therefore aren't safety critical.
210        let cid = ctx.mpool.as_ref().push_untrusted(message).await?;
211        Ok(cid)
212    }
213}
214
215/// Add a batch of `SignedMessage`s to `mpool`, return message CIDs
216pub enum MpoolBatchPushUntrusted {}
217impl RpcMethod<1> for MpoolBatchPushUntrusted {
218    const NAME: &'static str = "Filecoin.MpoolBatchPushUntrusted";
219    const PARAM_NAMES: [&'static str; 1] = ["messages"];
220    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
221    const PERMISSION: Permission = Permission::Write;
222    const DESCRIPTION: Option<&'static str> =
223        Some("Adds a set of messages to the message pool with additional verification checks.");
224
225    type Params = (Vec<SignedMessage>,);
226    type Ok = Vec<Cid>;
227
228    async fn handle(
229        ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
230        (messages,): Self::Params,
231        ext: &http::Extensions,
232    ) -> Result<Self::Ok, ServerError> {
233        // Alias of MpoolBatchPush.
234        MpoolBatchPush::handle(ctx, (messages,), ext).await
235    }
236}
237
238/// Sign given `UnsignedMessage` and add it to `mpool`, return `SignedMessage`
239pub enum MpoolPushMessage {}
240impl RpcMethod<2> for MpoolPushMessage {
241    const NAME: &'static str = "Filecoin.MpoolPushMessage";
242    const PARAM_NAMES: [&'static str; 2] = ["message", "sendSpec"];
243    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
244    const PERMISSION: Permission = Permission::Sign;
245    const DESCRIPTION: Option<&'static str> =
246        Some("Assigns a nonce, signs, and pushes a message to the mempool.");
247
248    type Params = (Message, Option<MessageSendSpec>);
249    type Ok = SignedMessage;
250
251    async fn handle(
252        ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
253        (message, send_spec): Self::Params,
254        extensions: &http::Extensions,
255    ) -> Result<Self::Ok, ServerError> {
256        let from = message.from;
257
258        let heaviest_tipset = ctx.chain_store().heaviest_tipset();
259        let key_addr = ctx
260            .state_manager
261            .resolve_to_key_addr(&from, &heaviest_tipset)
262            .await?;
263
264        if message.sequence != 0 {
265            return Err(anyhow::anyhow!(
266                "Expected nonce for MpoolPushMessage is 0, and will be calculated for you"
267            )
268            .into());
269        }
270
271        let _sender_guard = ctx.mpool_locker.take_lock(key_addr).await;
272
273        let mut message =
274            estimate_message_gas(&ctx, message, send_spec, Default::default()).await?;
275        if message.gas_premium > message.gas_fee_cap {
276            return Err(anyhow::anyhow!(
277                "After estimation, gas premium is greater than gas fee cap"
278            )
279            .into());
280        }
281
282        if from.protocol() == Protocol::ID {
283            message.from = key_addr;
284        }
285
286        let balance =
287            super::wallet::WalletBalance::handle(ctx.clone(), (message.from,), extensions).await?;
288        let required_funds = &message.value + &message.gas_fee_cap * message.gas_limit;
289        if balance < required_funds {
290            return Err(anyhow::anyhow!(
291                "mpool push: not enough funds: {balance} < {required_funds}",
292            )
293            .into());
294        }
295
296        let key = crate::key_management::Key::try_from(crate::key_management::try_find(
297            &key_addr,
298            &ctx.keystore.as_ref().read(),
299        )?)?;
300        let eth_chain_id = ctx.chain_config().eth_chain_id;
301
302        let smsg = ctx
303            .nonce_tracker
304            .sign_and_push(ctx.mpool.as_ref(), message, &key, eth_chain_id)
305            .await?;
306
307        Ok(smsg)
308    }
309}