Skip to main content

forest/rpc/methods/
mpool.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::gas::estimate_message_gas;
5use crate::lotus_json::NotNullVec;
6use crate::message::SignedMessage;
7use crate::rpc::error::ServerError;
8use crate::rpc::types::{ApiTipsetKey, MessageSendSpec};
9use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod};
10use crate::shim::{
11    address::{Address, Protocol},
12    message::Message,
13};
14use ahash::{HashSet, HashSetExt as _};
15use cid::Cid;
16use enumflags2::BitFlags;
17use fvm_ipld_blockstore::Blockstore;
18
19/// Gets next nonce for the specified sender.
20pub enum MpoolGetNonce {}
21impl RpcMethod<1> for MpoolGetNonce {
22    const NAME: &'static str = "Filecoin.MpoolGetNonce";
23    const PARAM_NAMES: [&'static str; 1] = ["address"];
24    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
25    const PERMISSION: Permission = Permission::Read;
26    const DESCRIPTION: Option<&'static str> =
27        Some("Returns the current nonce for the specified address.");
28
29    type Params = (Address,);
30    type Ok = u64;
31
32    async fn handle(
33        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
34        (address,): Self::Params,
35    ) -> Result<Self::Ok, ServerError> {
36        Ok(ctx.mpool.get_sequence(&address)?)
37    }
38}
39
40/// Return `Vec` of pending messages in `mpool`
41pub enum MpoolPending {}
42impl RpcMethod<1> for MpoolPending {
43    const NAME: &'static str = "Filecoin.MpoolPending";
44    const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
45    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
46    const PERMISSION: Permission = Permission::Read;
47    const DESCRIPTION: Option<&'static str> =
48        Some("Returns the pending messages for a given tipset.");
49
50    type Params = (ApiTipsetKey,);
51    type Ok = NotNullVec<SignedMessage>;
52
53    async fn handle(
54        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
55        (ApiTipsetKey(tipset_key),): Self::Params,
56    ) -> Result<Self::Ok, ServerError> {
57        let mut ts = ctx
58            .chain_store()
59            .load_required_tipset_or_heaviest(&tipset_key)?;
60
61        let (mut pending, mpts) = ctx.mpool.pending()?;
62
63        let mut have_cids = HashSet::new();
64        for item in pending.iter() {
65            have_cids.insert(item.cid());
66        }
67
68        if mpts.epoch() > ts.epoch() {
69            return Ok(NotNullVec(pending.into_iter().collect()));
70        }
71
72        loop {
73            if mpts.epoch() == ts.epoch() {
74                if mpts == ts {
75                    break;
76                }
77
78                // mpts has different blocks than ts
79                let have = ctx
80                    .mpool
81                    .as_ref()
82                    .messages_for_blocks(ts.block_headers().iter())?;
83
84                for sm in have {
85                    have_cids.insert(sm.cid());
86                }
87            }
88
89            let msgs = ctx
90                .mpool
91                .as_ref()
92                .messages_for_blocks(ts.block_headers().iter())?;
93
94            for m in msgs {
95                if have_cids.contains(&m.cid()) {
96                    continue;
97                }
98
99                have_cids.insert(m.cid());
100                pending.push(m);
101            }
102
103            if mpts.epoch() >= ts.epoch() {
104                break;
105            }
106
107            ts = ctx.chain_index().load_required_tipset(ts.parents())?;
108        }
109        Ok(NotNullVec(pending.into_iter().collect()))
110    }
111}
112
113/// Return `Vec` of pending messages for inclusion in the next block
114pub enum MpoolSelect {}
115impl RpcMethod<2> for MpoolSelect {
116    const NAME: &'static str = "Filecoin.MpoolSelect";
117    const PARAM_NAMES: [&'static str; 2] = ["tipsetKey", "ticketQuality"];
118    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
119    const PERMISSION: Permission = Permission::Read;
120    const DESCRIPTION: Option<&'static str> =
121        Some("Returns a list of pending messages for inclusion in the next block.");
122
123    type Params = (ApiTipsetKey, f64);
124    type Ok = Vec<SignedMessage>;
125
126    async fn handle(
127        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
128        (ApiTipsetKey(tipset_key), ticket_quality): Self::Params,
129    ) -> Result<Self::Ok, ServerError> {
130        let ts = ctx
131            .chain_store()
132            .load_required_tipset_or_heaviest(&tipset_key)?;
133        Ok(ctx.mpool.select_messages(&ts, ticket_quality)?)
134    }
135}
136
137/// Add `SignedMessage` to `mpool`, return message CID
138pub enum MpoolPush {}
139impl RpcMethod<1> for MpoolPush {
140    const NAME: &'static str = "Filecoin.MpoolPush";
141    const PARAM_NAMES: [&'static str; 1] = ["message"];
142    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
143    const PERMISSION: Permission = Permission::Write;
144    const DESCRIPTION: Option<&'static str> = Some("Adds a signed message to the message pool.");
145
146    type Params = (SignedMessage,);
147    type Ok = Cid;
148
149    async fn handle(
150        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
151        (message,): Self::Params,
152    ) -> Result<Self::Ok, ServerError> {
153        let cid = ctx.mpool.as_ref().push(message).await?;
154        Ok(cid)
155    }
156}
157
158/// Add a batch of `SignedMessage`s to `mpool`, return message CIDs
159pub enum MpoolBatchPush {}
160impl RpcMethod<1> for MpoolBatchPush {
161    const NAME: &'static str = "Filecoin.MpoolBatchPush";
162    const PARAM_NAMES: [&'static str; 1] = ["messages"];
163    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
164    const PERMISSION: Permission = Permission::Write;
165    const DESCRIPTION: Option<&'static str> =
166        Some("Adds a set of signed messages to the message pool.");
167
168    type Params = (Vec<SignedMessage>,);
169    type Ok = Vec<Cid>;
170
171    async fn handle(
172        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
173        (messages,): Self::Params,
174    ) -> Result<Self::Ok, ServerError> {
175        let mut cids = vec![];
176        for msg in messages {
177            cids.push(ctx.mpool.as_ref().push(msg).await?);
178        }
179        Ok(cids)
180    }
181}
182
183/// Add `SignedMessage` from untrusted source to `mpool`, return message CID
184pub enum MpoolPushUntrusted {}
185impl RpcMethod<1> for MpoolPushUntrusted {
186    const NAME: &'static str = "Filecoin.MpoolPushUntrusted";
187    const PARAM_NAMES: [&'static str; 1] = ["message"];
188    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
189    const PERMISSION: Permission = Permission::Write;
190    const DESCRIPTION: Option<&'static str> =
191        Some("Adds a message to the message pool with verification checks.");
192
193    type Params = (SignedMessage,);
194    type Ok = Cid;
195
196    async fn handle(
197        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
198        (message,): Self::Params,
199    ) -> Result<Self::Ok, ServerError> {
200        // Lotus implements a few extra sanity checks that we skip. We skip them
201        // because those checks aren't used for messages received from peers and
202        // therefore aren't safety critical.
203        let cid = ctx.mpool.as_ref().push_untrusted(message).await?;
204        Ok(cid)
205    }
206}
207
208/// Add a batch of `SignedMessage`s to `mpool`, return message CIDs
209pub enum MpoolBatchPushUntrusted {}
210impl RpcMethod<1> for MpoolBatchPushUntrusted {
211    const NAME: &'static str = "Filecoin.MpoolBatchPushUntrusted";
212    const PARAM_NAMES: [&'static str; 1] = ["messages"];
213    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
214    const PERMISSION: Permission = Permission::Write;
215    const DESCRIPTION: Option<&'static str> =
216        Some("Adds a set of messages to the message pool with additional verification checks.");
217
218    type Params = (Vec<SignedMessage>,);
219    type Ok = Vec<Cid>;
220
221    async fn handle(
222        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
223        (messages,): Self::Params,
224    ) -> Result<Self::Ok, ServerError> {
225        // Alias of MpoolBatchPush.
226        MpoolBatchPush::handle(ctx, (messages,)).await
227    }
228}
229
230/// Sign given `UnsignedMessage` and add it to `mpool`, return `SignedMessage`
231pub enum MpoolPushMessage {}
232impl RpcMethod<2> for MpoolPushMessage {
233    const NAME: &'static str = "Filecoin.MpoolPushMessage";
234    const PARAM_NAMES: [&'static str; 2] = ["message", "sendSpec"];
235    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
236    const PERMISSION: Permission = Permission::Sign;
237    const DESCRIPTION: Option<&'static str> =
238        Some("Assigns a nonce, signs, and pushes a message to the mempool.");
239
240    type Params = (Message, Option<MessageSendSpec>);
241    type Ok = SignedMessage;
242
243    async fn handle(
244        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
245        (message, send_spec): Self::Params,
246    ) -> Result<Self::Ok, ServerError> {
247        let from = message.from;
248
249        let heaviest_tipset = ctx.chain_store().heaviest_tipset();
250        let key_addr = ctx
251            .state_manager
252            .resolve_to_key_addr(&from, &heaviest_tipset)
253            .await?;
254
255        if message.sequence != 0 {
256            return Err(anyhow::anyhow!(
257                "Expected nonce for MpoolPushMessage is 0, and will be calculated for you"
258            )
259            .into());
260        }
261        let mut message =
262            estimate_message_gas(&ctx, message, send_spec, Default::default()).await?;
263        if message.gas_premium > message.gas_fee_cap {
264            return Err(anyhow::anyhow!(
265                "After estimation, gas premium is greater than gas fee cap"
266            )
267            .into());
268        }
269
270        if from.protocol() == Protocol::ID {
271            message.from = key_addr;
272        }
273        let nonce = ctx.mpool.get_sequence(&from)?;
274        message.sequence = nonce;
275        let key = crate::key_management::Key::try_from(crate::key_management::try_find(
276            &key_addr,
277            &mut ctx.keystore.as_ref().write(),
278        )?)?;
279        let sig = crate::key_management::sign(
280            *key.key_info.key_type(),
281            key.key_info.private_key(),
282            message.cid().to_bytes().as_slice(),
283        )?;
284
285        let smsg = SignedMessage::new_from_parts(message, sig)?;
286
287        ctx.mpool.as_ref().push(smsg.clone()).await?;
288
289        Ok(smsg)
290    }
291}