Skip to main content

forest/rpc/methods/
mpool.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::gas::estimate_message_gas;
5use crate::lotus_json::NotNullVec;
6use crate::message::SignedMessage;
7use crate::rpc::error::ServerError;
8use crate::rpc::types::{ApiTipsetKey, MessageSendSpec};
9use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod};
10use crate::shim::{
11    address::{Address, Protocol},
12    message::Message,
13};
14use ahash::{HashSet, HashSetExt as _};
15use cid::Cid;
16use enumflags2::BitFlags;
17use fvm_ipld_blockstore::Blockstore;
18
19/// Gets next nonce for the specified sender.
20pub enum MpoolGetNonce {}
21impl RpcMethod<1> for MpoolGetNonce {
22    const NAME: &'static str = "Filecoin.MpoolGetNonce";
23    const PARAM_NAMES: [&'static str; 1] = ["address"];
24    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
25    const PERMISSION: Permission = Permission::Read;
26    const DESCRIPTION: Option<&'static str> =
27        Some("Returns the current nonce for the specified address.");
28
29    type Params = (Address,);
30    type Ok = u64;
31
32    async fn handle(
33        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
34        (address,): Self::Params,
35        _: &http::Extensions,
36    ) -> Result<Self::Ok, ServerError> {
37        Ok(ctx.mpool.get_sequence(&address)?)
38    }
39}
40
41/// Return `Vec` of pending messages in `mpool`
42pub enum MpoolPending {}
43impl RpcMethod<1> for MpoolPending {
44    const NAME: &'static str = "Filecoin.MpoolPending";
45    const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
46    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
47    const PERMISSION: Permission = Permission::Read;
48    const DESCRIPTION: Option<&'static str> =
49        Some("Returns the pending messages for a given tipset.");
50
51    type Params = (ApiTipsetKey,);
52    type Ok = NotNullVec<SignedMessage>;
53
54    async fn handle(
55        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
56        (ApiTipsetKey(tipset_key),): Self::Params,
57        _: &http::Extensions,
58    ) -> Result<Self::Ok, ServerError> {
59        let mut ts = ctx
60            .chain_store()
61            .load_required_tipset_or_heaviest(&tipset_key)?;
62
63        let (mut pending, mpts) = ctx.mpool.pending()?;
64
65        let mut have_cids = HashSet::new();
66        for item in pending.iter() {
67            have_cids.insert(item.cid());
68        }
69
70        if mpts.epoch() > ts.epoch() {
71            return Ok(NotNullVec(pending.into_iter().collect()));
72        }
73
74        loop {
75            if mpts.epoch() == ts.epoch() {
76                if mpts == ts {
77                    break;
78                }
79
80                // mpts has different blocks than ts
81                let have = ctx
82                    .mpool
83                    .as_ref()
84                    .messages_for_blocks(ts.block_headers().iter())?;
85
86                for sm in have {
87                    have_cids.insert(sm.cid());
88                }
89            }
90
91            let msgs = ctx
92                .mpool
93                .as_ref()
94                .messages_for_blocks(ts.block_headers().iter())?;
95
96            for m in msgs {
97                if have_cids.contains(&m.cid()) {
98                    continue;
99                }
100
101                have_cids.insert(m.cid());
102                pending.push(m);
103            }
104
105            if mpts.epoch() >= ts.epoch() {
106                break;
107            }
108
109            ts = ctx.chain_index().load_required_tipset(ts.parents())?;
110        }
111        Ok(NotNullVec(pending.into_iter().collect()))
112    }
113}
114
115/// Return `Vec` of pending messages for inclusion in the next block
116pub enum MpoolSelect {}
117impl RpcMethod<2> for MpoolSelect {
118    const NAME: &'static str = "Filecoin.MpoolSelect";
119    const PARAM_NAMES: [&'static str; 2] = ["tipsetKey", "ticketQuality"];
120    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
121    const PERMISSION: Permission = Permission::Read;
122    const DESCRIPTION: Option<&'static str> =
123        Some("Returns a list of pending messages for inclusion in the next block.");
124
125    type Params = (ApiTipsetKey, f64);
126    type Ok = Vec<SignedMessage>;
127
128    async fn handle(
129        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
130        (ApiTipsetKey(tipset_key), ticket_quality): Self::Params,
131        _: &http::Extensions,
132    ) -> Result<Self::Ok, ServerError> {
133        let ts = ctx
134            .chain_store()
135            .load_required_tipset_or_heaviest(&tipset_key)?;
136        Ok(ctx.mpool.select_messages(&ts, ticket_quality)?)
137    }
138}
139
140/// Add `SignedMessage` to `mpool`, return message CID
141pub enum MpoolPush {}
142impl RpcMethod<1> for MpoolPush {
143    const NAME: &'static str = "Filecoin.MpoolPush";
144    const PARAM_NAMES: [&'static str; 1] = ["message"];
145    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
146    const PERMISSION: Permission = Permission::Write;
147    const DESCRIPTION: Option<&'static str> = Some("Adds a signed message to the message pool.");
148
149    type Params = (SignedMessage,);
150    type Ok = Cid;
151
152    async fn handle(
153        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
154        (message,): Self::Params,
155        _: &http::Extensions,
156    ) -> Result<Self::Ok, ServerError> {
157        let cid = ctx.mpool.as_ref().push(message).await?;
158        Ok(cid)
159    }
160}
161
162/// Add a batch of `SignedMessage`s to `mpool`, return message CIDs
163pub enum MpoolBatchPush {}
164impl RpcMethod<1> for MpoolBatchPush {
165    const NAME: &'static str = "Filecoin.MpoolBatchPush";
166    const PARAM_NAMES: [&'static str; 1] = ["messages"];
167    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
168    const PERMISSION: Permission = Permission::Write;
169    const DESCRIPTION: Option<&'static str> =
170        Some("Adds a set of signed messages to the message pool.");
171
172    type Params = (Vec<SignedMessage>,);
173    type Ok = Vec<Cid>;
174
175    async fn handle(
176        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
177        (messages,): Self::Params,
178        _: &http::Extensions,
179    ) -> Result<Self::Ok, ServerError> {
180        let mut cids = vec![];
181        for msg in messages {
182            cids.push(ctx.mpool.as_ref().push(msg).await?);
183        }
184        Ok(cids)
185    }
186}
187
188/// Add `SignedMessage` from untrusted source to `mpool`, return message CID
189pub enum MpoolPushUntrusted {}
190impl RpcMethod<1> for MpoolPushUntrusted {
191    const NAME: &'static str = "Filecoin.MpoolPushUntrusted";
192    const PARAM_NAMES: [&'static str; 1] = ["message"];
193    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
194    const PERMISSION: Permission = Permission::Write;
195    const DESCRIPTION: Option<&'static str> =
196        Some("Adds a message to the message pool with verification checks.");
197
198    type Params = (SignedMessage,);
199    type Ok = Cid;
200
201    async fn handle(
202        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
203        (message,): Self::Params,
204        _: &http::Extensions,
205    ) -> Result<Self::Ok, ServerError> {
206        // Lotus implements a few extra sanity checks that we skip. We skip them
207        // because those checks aren't used for messages received from peers and
208        // therefore aren't safety critical.
209        let cid = ctx.mpool.as_ref().push_untrusted(message).await?;
210        Ok(cid)
211    }
212}
213
214/// Add a batch of `SignedMessage`s to `mpool`, return message CIDs
215pub enum MpoolBatchPushUntrusted {}
216impl RpcMethod<1> for MpoolBatchPushUntrusted {
217    const NAME: &'static str = "Filecoin.MpoolBatchPushUntrusted";
218    const PARAM_NAMES: [&'static str; 1] = ["messages"];
219    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
220    const PERMISSION: Permission = Permission::Write;
221    const DESCRIPTION: Option<&'static str> =
222        Some("Adds a set of messages to the message pool with additional verification checks.");
223
224    type Params = (Vec<SignedMessage>,);
225    type Ok = Vec<Cid>;
226
227    async fn handle(
228        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
229        (messages,): Self::Params,
230        ext: &http::Extensions,
231    ) -> Result<Self::Ok, ServerError> {
232        // Alias of MpoolBatchPush.
233        MpoolBatchPush::handle(ctx, (messages,), ext).await
234    }
235}
236
237/// Sign given `UnsignedMessage` and add it to `mpool`, return `SignedMessage`
238pub enum MpoolPushMessage {}
239impl RpcMethod<2> for MpoolPushMessage {
240    const NAME: &'static str = "Filecoin.MpoolPushMessage";
241    const PARAM_NAMES: [&'static str; 2] = ["message", "sendSpec"];
242    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
243    const PERMISSION: Permission = Permission::Sign;
244    const DESCRIPTION: Option<&'static str> =
245        Some("Assigns a nonce, signs, and pushes a message to the mempool.");
246
247    type Params = (Message, Option<MessageSendSpec>);
248    type Ok = SignedMessage;
249
250    async fn handle(
251        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
252        (message, send_spec): Self::Params,
253        _: &http::Extensions,
254    ) -> Result<Self::Ok, ServerError> {
255        let from = message.from;
256
257        let heaviest_tipset = ctx.chain_store().heaviest_tipset();
258        let key_addr = ctx
259            .state_manager
260            .resolve_to_key_addr(&from, &heaviest_tipset)
261            .await?;
262
263        if message.sequence != 0 {
264            return Err(anyhow::anyhow!(
265                "Expected nonce for MpoolPushMessage is 0, and will be calculated for you"
266            )
267            .into());
268        }
269        let mut message =
270            estimate_message_gas(&ctx, message, send_spec, Default::default()).await?;
271        if message.gas_premium > message.gas_fee_cap {
272            return Err(anyhow::anyhow!(
273                "After estimation, gas premium is greater than gas fee cap"
274            )
275            .into());
276        }
277
278        if from.protocol() == Protocol::ID {
279            message.from = key_addr;
280        }
281        let nonce = ctx.mpool.get_sequence(&from)?;
282        message.sequence = nonce;
283        let key = crate::key_management::Key::try_from(crate::key_management::try_find(
284            &key_addr,
285            &mut ctx.keystore.as_ref().write(),
286        )?)?;
287        let sig = crate::key_management::sign(
288            *key.key_info.key_type(),
289            key.key_info.private_key(),
290            message.cid().to_bytes().as_slice(),
291        )?;
292
293        let smsg = SignedMessage::new_from_parts(message, sig)?;
294
295        ctx.mpool.as_ref().push(smsg.clone()).await?;
296
297        Ok(smsg)
298    }
299}