forest/rpc/methods/
mpool.rs1use super::gas::estimate_message_gas;
5use crate::lotus_json::NotNullVec;
6use crate::message::SignedMessage;
7use crate::rpc::error::ServerError;
8use crate::rpc::types::{ApiTipsetKey, MessageSendSpec};
9use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod};
10use crate::shim::{
11 address::{Address, Protocol},
12 message::Message,
13};
14use ahash::{HashSet, HashSetExt as _};
15use cid::Cid;
16use enumflags2::BitFlags;
17use fvm_ipld_blockstore::Blockstore;
18
19pub enum MpoolGetNonce {}
21impl RpcMethod<1> for MpoolGetNonce {
22 const NAME: &'static str = "Filecoin.MpoolGetNonce";
23 const PARAM_NAMES: [&'static str; 1] = ["address"];
24 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
25 const PERMISSION: Permission = Permission::Read;
26 const DESCRIPTION: Option<&'static str> =
27 Some("Returns the current nonce for the specified address.");
28
29 type Params = (Address,);
30 type Ok = u64;
31
32 async fn handle(
33 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
34 (address,): Self::Params,
35 ) -> Result<Self::Ok, ServerError> {
36 Ok(ctx.mpool.get_sequence(&address)?)
37 }
38}
39
40pub enum MpoolPending {}
42impl RpcMethod<1> for MpoolPending {
43 const NAME: &'static str = "Filecoin.MpoolPending";
44 const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
45 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
46 const PERMISSION: Permission = Permission::Read;
47 const DESCRIPTION: Option<&'static str> =
48 Some("Returns the pending messages for a given tipset.");
49
50 type Params = (ApiTipsetKey,);
51 type Ok = NotNullVec<SignedMessage>;
52
53 async fn handle(
54 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
55 (ApiTipsetKey(tipset_key),): Self::Params,
56 ) -> Result<Self::Ok, ServerError> {
57 let mut ts = ctx
58 .chain_store()
59 .load_required_tipset_or_heaviest(&tipset_key)?;
60
61 let (mut pending, mpts) = ctx.mpool.pending()?;
62
63 let mut have_cids = HashSet::new();
64 for item in pending.iter() {
65 have_cids.insert(item.cid());
66 }
67
68 if mpts.epoch() > ts.epoch() {
69 return Ok(NotNullVec(pending.into_iter().collect()));
70 }
71
72 loop {
73 if mpts.epoch() == ts.epoch() {
74 if mpts == ts {
75 break;
76 }
77
78 let have = ctx
80 .mpool
81 .as_ref()
82 .messages_for_blocks(ts.block_headers().iter())?;
83
84 for sm in have {
85 have_cids.insert(sm.cid());
86 }
87 }
88
89 let msgs = ctx
90 .mpool
91 .as_ref()
92 .messages_for_blocks(ts.block_headers().iter())?;
93
94 for m in msgs {
95 if have_cids.contains(&m.cid()) {
96 continue;
97 }
98
99 have_cids.insert(m.cid());
100 pending.push(m);
101 }
102
103 if mpts.epoch() >= ts.epoch() {
104 break;
105 }
106
107 ts = ctx.chain_index().load_required_tipset(ts.parents())?;
108 }
109 Ok(NotNullVec(pending.into_iter().collect()))
110 }
111}
112
113pub enum MpoolSelect {}
115impl RpcMethod<2> for MpoolSelect {
116 const NAME: &'static str = "Filecoin.MpoolSelect";
117 const PARAM_NAMES: [&'static str; 2] = ["tipsetKey", "ticketQuality"];
118 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
119 const PERMISSION: Permission = Permission::Read;
120 const DESCRIPTION: Option<&'static str> =
121 Some("Returns a list of pending messages for inclusion in the next block.");
122
123 type Params = (ApiTipsetKey, f64);
124 type Ok = Vec<SignedMessage>;
125
126 async fn handle(
127 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
128 (ApiTipsetKey(tipset_key), ticket_quality): Self::Params,
129 ) -> Result<Self::Ok, ServerError> {
130 let ts = ctx
131 .chain_store()
132 .load_required_tipset_or_heaviest(&tipset_key)?;
133 Ok(ctx.mpool.select_messages(&ts, ticket_quality)?)
134 }
135}
136
137pub enum MpoolPush {}
139impl RpcMethod<1> for MpoolPush {
140 const NAME: &'static str = "Filecoin.MpoolPush";
141 const PARAM_NAMES: [&'static str; 1] = ["message"];
142 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
143 const PERMISSION: Permission = Permission::Write;
144 const DESCRIPTION: Option<&'static str> = Some("Adds a signed message to the message pool.");
145
146 type Params = (SignedMessage,);
147 type Ok = Cid;
148
149 async fn handle(
150 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
151 (message,): Self::Params,
152 ) -> Result<Self::Ok, ServerError> {
153 let cid = ctx.mpool.as_ref().push(message).await?;
154 Ok(cid)
155 }
156}
157
158pub enum MpoolBatchPush {}
160impl RpcMethod<1> for MpoolBatchPush {
161 const NAME: &'static str = "Filecoin.MpoolBatchPush";
162 const PARAM_NAMES: [&'static str; 1] = ["messages"];
163 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
164 const PERMISSION: Permission = Permission::Write;
165 const DESCRIPTION: Option<&'static str> =
166 Some("Adds a set of signed messages to the message pool.");
167
168 type Params = (Vec<SignedMessage>,);
169 type Ok = Vec<Cid>;
170
171 async fn handle(
172 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
173 (messages,): Self::Params,
174 ) -> Result<Self::Ok, ServerError> {
175 let mut cids = vec![];
176 for msg in messages {
177 cids.push(ctx.mpool.as_ref().push(msg).await?);
178 }
179 Ok(cids)
180 }
181}
182
183pub enum MpoolPushUntrusted {}
185impl RpcMethod<1> for MpoolPushUntrusted {
186 const NAME: &'static str = "Filecoin.MpoolPushUntrusted";
187 const PARAM_NAMES: [&'static str; 1] = ["message"];
188 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
189 const PERMISSION: Permission = Permission::Write;
190 const DESCRIPTION: Option<&'static str> =
191 Some("Adds a message to the message pool with verification checks.");
192
193 type Params = (SignedMessage,);
194 type Ok = Cid;
195
196 async fn handle(
197 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
198 (message,): Self::Params,
199 ) -> Result<Self::Ok, ServerError> {
200 MpoolPush::handle(ctx, (message,)).await
204 }
205}
206
207pub enum MpoolBatchPushUntrusted {}
209impl RpcMethod<1> for MpoolBatchPushUntrusted {
210 const NAME: &'static str = "Filecoin.MpoolBatchPushUntrusted";
211 const PARAM_NAMES: [&'static str; 1] = ["messages"];
212 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
213 const PERMISSION: Permission = Permission::Write;
214 const DESCRIPTION: Option<&'static str> =
215 Some("Adds a set of messages to the message pool with additional verification checks.");
216
217 type Params = (Vec<SignedMessage>,);
218 type Ok = Vec<Cid>;
219
220 async fn handle(
221 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
222 (messages,): Self::Params,
223 ) -> Result<Self::Ok, ServerError> {
224 MpoolBatchPush::handle(ctx, (messages,)).await
226 }
227}
228
229pub enum MpoolPushMessage {}
231impl RpcMethod<2> for MpoolPushMessage {
232 const NAME: &'static str = "Filecoin.MpoolPushMessage";
233 const PARAM_NAMES: [&'static str; 2] = ["message", "sendSpec"];
234 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
235 const PERMISSION: Permission = Permission::Sign;
236 const DESCRIPTION: Option<&'static str> =
237 Some("Assigns a nonce, signs, and pushes a message to the mempool.");
238
239 type Params = (Message, Option<MessageSendSpec>);
240 type Ok = SignedMessage;
241
242 async fn handle(
243 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
244 (message, send_spec): Self::Params,
245 ) -> Result<Self::Ok, ServerError> {
246 let from = message.from;
247
248 let heaviest_tipset = ctx.chain_store().heaviest_tipset();
249 let key_addr = ctx
250 .state_manager
251 .resolve_to_key_addr(&from, &heaviest_tipset)
252 .await?;
253
254 if message.sequence != 0 {
255 return Err(anyhow::anyhow!(
256 "Expected nonce for MpoolPushMessage is 0, and will be calculated for you"
257 )
258 .into());
259 }
260 let mut message =
261 estimate_message_gas(&ctx, message, send_spec, Default::default()).await?;
262 if message.gas_premium > message.gas_fee_cap {
263 return Err(anyhow::anyhow!(
264 "After estimation, gas premium is greater than gas fee cap"
265 )
266 .into());
267 }
268
269 if from.protocol() == Protocol::ID {
270 message.from = key_addr;
271 }
272 let nonce = ctx.mpool.get_sequence(&from)?;
273 message.sequence = nonce;
274 let key = crate::key_management::Key::try_from(crate::key_management::try_find(
275 &key_addr,
276 &mut ctx.keystore.as_ref().write(),
277 )?)?;
278 let sig = crate::key_management::sign(
279 *key.key_info.key_type(),
280 key.key_info.private_key(),
281 message.cid().to_bytes().as_slice(),
282 )?;
283
284 let smsg = SignedMessage::new_from_parts(message, sig)?;
285
286 ctx.mpool.as_ref().push(smsg.clone()).await?;
287
288 Ok(smsg)
289 }
290}