forest/rpc/methods/
mpool.rs1use super::gas::estimate_message_gas;
5use crate::lotus_json::NotNullVec;
6use crate::message::SignedMessage;
7use crate::rpc::error::ServerError;
8use crate::rpc::types::{ApiTipsetKey, MessageSendSpec};
9use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod};
10use crate::shim::{
11 address::{Address, Protocol},
12 message::Message,
13};
14use ahash::{HashSet, HashSetExt as _};
15use cid::Cid;
16use enumflags2::BitFlags;
17use fvm_ipld_blockstore::Blockstore;
18
19pub enum MpoolGetNonce {}
21impl RpcMethod<1> for MpoolGetNonce {
22 const NAME: &'static str = "Filecoin.MpoolGetNonce";
23 const PARAM_NAMES: [&'static str; 1] = ["address"];
24 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
25 const PERMISSION: Permission = Permission::Read;
26 const DESCRIPTION: Option<&'static str> =
27 Some("Returns the current nonce for the specified address.");
28
29 type Params = (Address,);
30 type Ok = u64;
31
32 async fn handle(
33 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
34 (address,): Self::Params,
35 _: &http::Extensions,
36 ) -> Result<Self::Ok, ServerError> {
37 Ok(ctx.mpool.get_sequence(&address)?)
38 }
39}
40
41pub enum MpoolPending {}
43impl RpcMethod<1> for MpoolPending {
44 const NAME: &'static str = "Filecoin.MpoolPending";
45 const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
46 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
47 const PERMISSION: Permission = Permission::Read;
48 const DESCRIPTION: Option<&'static str> =
49 Some("Returns the pending messages for a given tipset.");
50
51 type Params = (ApiTipsetKey,);
52 type Ok = NotNullVec<SignedMessage>;
53
54 async fn handle(
55 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
56 (ApiTipsetKey(tipset_key),): Self::Params,
57 _: &http::Extensions,
58 ) -> Result<Self::Ok, ServerError> {
59 let mut ts = ctx
60 .chain_store()
61 .load_required_tipset_or_heaviest(&tipset_key)?;
62
63 let (mut pending, mpts) = ctx.mpool.pending()?;
64
65 let mut have_cids = HashSet::new();
66 for item in pending.iter() {
67 have_cids.insert(item.cid());
68 }
69
70 if mpts.epoch() > ts.epoch() {
71 return Ok(NotNullVec(pending.into_iter().collect()));
72 }
73
74 loop {
75 if mpts.epoch() == ts.epoch() {
76 if mpts == ts {
77 break;
78 }
79
80 let have = ctx
82 .mpool
83 .as_ref()
84 .messages_for_blocks(ts.block_headers().iter())?;
85
86 for sm in have {
87 have_cids.insert(sm.cid());
88 }
89 }
90
91 let msgs = ctx
92 .mpool
93 .as_ref()
94 .messages_for_blocks(ts.block_headers().iter())?;
95
96 for m in msgs {
97 if have_cids.contains(&m.cid()) {
98 continue;
99 }
100
101 have_cids.insert(m.cid());
102 pending.push(m);
103 }
104
105 if mpts.epoch() >= ts.epoch() {
106 break;
107 }
108
109 ts = ctx.chain_index().load_required_tipset(ts.parents())?;
110 }
111 Ok(NotNullVec(pending.into_iter().collect()))
112 }
113}
114
115pub enum MpoolSelect {}
117impl RpcMethod<2> for MpoolSelect {
118 const NAME: &'static str = "Filecoin.MpoolSelect";
119 const PARAM_NAMES: [&'static str; 2] = ["tipsetKey", "ticketQuality"];
120 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
121 const PERMISSION: Permission = Permission::Read;
122 const DESCRIPTION: Option<&'static str> =
123 Some("Returns a list of pending messages for inclusion in the next block.");
124
125 type Params = (ApiTipsetKey, f64);
126 type Ok = Vec<SignedMessage>;
127
128 async fn handle(
129 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
130 (ApiTipsetKey(tipset_key), ticket_quality): Self::Params,
131 _: &http::Extensions,
132 ) -> Result<Self::Ok, ServerError> {
133 let ts = ctx
134 .chain_store()
135 .load_required_tipset_or_heaviest(&tipset_key)?;
136 Ok(ctx.mpool.select_messages(&ts, ticket_quality)?)
137 }
138}
139
140pub enum MpoolPush {}
142impl RpcMethod<1> for MpoolPush {
143 const NAME: &'static str = "Filecoin.MpoolPush";
144 const PARAM_NAMES: [&'static str; 1] = ["message"];
145 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
146 const PERMISSION: Permission = Permission::Write;
147 const DESCRIPTION: Option<&'static str> = Some("Adds a signed message to the message pool.");
148
149 type Params = (SignedMessage,);
150 type Ok = Cid;
151
152 async fn handle(
153 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
154 (message,): Self::Params,
155 _: &http::Extensions,
156 ) -> Result<Self::Ok, ServerError> {
157 let cid = ctx.mpool.as_ref().push(message).await?;
158 Ok(cid)
159 }
160}
161
162pub enum MpoolBatchPush {}
164impl RpcMethod<1> for MpoolBatchPush {
165 const NAME: &'static str = "Filecoin.MpoolBatchPush";
166 const PARAM_NAMES: [&'static str; 1] = ["messages"];
167 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
168 const PERMISSION: Permission = Permission::Write;
169 const DESCRIPTION: Option<&'static str> =
170 Some("Adds a set of signed messages to the message pool.");
171
172 type Params = (Vec<SignedMessage>,);
173 type Ok = Vec<Cid>;
174
175 async fn handle(
176 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
177 (messages,): Self::Params,
178 _: &http::Extensions,
179 ) -> Result<Self::Ok, ServerError> {
180 let mut cids = vec![];
181 for msg in messages {
182 cids.push(ctx.mpool.as_ref().push(msg).await?);
183 }
184 Ok(cids)
185 }
186}
187
188pub enum MpoolPushUntrusted {}
190impl RpcMethod<1> for MpoolPushUntrusted {
191 const NAME: &'static str = "Filecoin.MpoolPushUntrusted";
192 const PARAM_NAMES: [&'static str; 1] = ["message"];
193 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
194 const PERMISSION: Permission = Permission::Write;
195 const DESCRIPTION: Option<&'static str> =
196 Some("Adds a message to the message pool with verification checks.");
197
198 type Params = (SignedMessage,);
199 type Ok = Cid;
200
201 async fn handle(
202 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
203 (message,): Self::Params,
204 _: &http::Extensions,
205 ) -> Result<Self::Ok, ServerError> {
206 let cid = ctx.mpool.as_ref().push_untrusted(message).await?;
210 Ok(cid)
211 }
212}
213
214pub enum MpoolBatchPushUntrusted {}
216impl RpcMethod<1> for MpoolBatchPushUntrusted {
217 const NAME: &'static str = "Filecoin.MpoolBatchPushUntrusted";
218 const PARAM_NAMES: [&'static str; 1] = ["messages"];
219 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
220 const PERMISSION: Permission = Permission::Write;
221 const DESCRIPTION: Option<&'static str> =
222 Some("Adds a set of messages to the message pool with additional verification checks.");
223
224 type Params = (Vec<SignedMessage>,);
225 type Ok = Vec<Cid>;
226
227 async fn handle(
228 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
229 (messages,): Self::Params,
230 ext: &http::Extensions,
231 ) -> Result<Self::Ok, ServerError> {
232 MpoolBatchPush::handle(ctx, (messages,), ext).await
234 }
235}
236
237pub enum MpoolPushMessage {}
239impl RpcMethod<2> for MpoolPushMessage {
240 const NAME: &'static str = "Filecoin.MpoolPushMessage";
241 const PARAM_NAMES: [&'static str; 2] = ["message", "sendSpec"];
242 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
243 const PERMISSION: Permission = Permission::Sign;
244 const DESCRIPTION: Option<&'static str> =
245 Some("Assigns a nonce, signs, and pushes a message to the mempool.");
246
247 type Params = (Message, Option<MessageSendSpec>);
248 type Ok = SignedMessage;
249
250 async fn handle(
251 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
252 (message, send_spec): Self::Params,
253 _: &http::Extensions,
254 ) -> Result<Self::Ok, ServerError> {
255 let from = message.from;
256
257 let heaviest_tipset = ctx.chain_store().heaviest_tipset();
258 let key_addr = ctx
259 .state_manager
260 .resolve_to_key_addr(&from, &heaviest_tipset)
261 .await?;
262
263 if message.sequence != 0 {
264 return Err(anyhow::anyhow!(
265 "Expected nonce for MpoolPushMessage is 0, and will be calculated for you"
266 )
267 .into());
268 }
269 let mut message =
270 estimate_message_gas(&ctx, message, send_spec, Default::default()).await?;
271 if message.gas_premium > message.gas_fee_cap {
272 return Err(anyhow::anyhow!(
273 "After estimation, gas premium is greater than gas fee cap"
274 )
275 .into());
276 }
277
278 if from.protocol() == Protocol::ID {
279 message.from = key_addr;
280 }
281 let nonce = ctx.mpool.get_sequence(&from)?;
282 message.sequence = nonce;
283 let key = crate::key_management::Key::try_from(crate::key_management::try_find(
284 &key_addr,
285 &mut ctx.keystore.as_ref().write(),
286 )?)?;
287 let sig = crate::key_management::sign(
288 *key.key_info.key_type(),
289 key.key_info.private_key(),
290 message.cid().to_bytes().as_slice(),
291 )?;
292
293 let smsg = SignedMessage::new_from_parts(message, sig)?;
294
295 ctx.mpool.as_ref().push(smsg.clone()).await?;
296
297 Ok(smsg)
298 }
299}