forest/rpc/methods/
mpool.rs1use super::gas::estimate_message_gas;
5use crate::lotus_json::NotNullVec;
6use crate::message::SignedMessage;
7use crate::rpc::error::ServerError;
8use crate::rpc::types::{ApiTipsetKey, MessageSendSpec};
9use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod};
10use crate::shim::{
11 address::{Address, Protocol},
12 message::Message,
13};
14use ahash::{HashSet, HashSetExt as _};
15use cid::Cid;
16use enumflags2::BitFlags;
17use fvm_ipld_blockstore::Blockstore;
18
19pub enum MpoolGetNonce {}
21impl RpcMethod<1> for MpoolGetNonce {
22 const NAME: &'static str = "Filecoin.MpoolGetNonce";
23 const PARAM_NAMES: [&'static str; 1] = ["address"];
24 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
25 const PERMISSION: Permission = Permission::Read;
26 const DESCRIPTION: Option<&'static str> =
27 Some("Returns the current nonce for the specified address.");
28
29 type Params = (Address,);
30 type Ok = u64;
31
32 async fn handle(
33 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
34 (address,): Self::Params,
35 ) -> Result<Self::Ok, ServerError> {
36 Ok(ctx.mpool.get_sequence(&address)?)
37 }
38}
39
40pub enum MpoolPending {}
42impl RpcMethod<1> for MpoolPending {
43 const NAME: &'static str = "Filecoin.MpoolPending";
44 const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
45 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
46 const PERMISSION: Permission = Permission::Read;
47 const DESCRIPTION: Option<&'static str> =
48 Some("Returns the pending messages for a given tipset.");
49
50 type Params = (ApiTipsetKey,);
51 type Ok = NotNullVec<SignedMessage>;
52
53 async fn handle(
54 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
55 (ApiTipsetKey(tipset_key),): Self::Params,
56 ) -> Result<Self::Ok, ServerError> {
57 let mut ts = ctx
58 .chain_store()
59 .load_required_tipset_or_heaviest(&tipset_key)?;
60
61 let (mut pending, mpts) = ctx.mpool.pending()?;
62
63 let mut have_cids = HashSet::new();
64 for item in pending.iter() {
65 have_cids.insert(item.cid());
66 }
67
68 if mpts.epoch() > ts.epoch() {
69 return Ok(NotNullVec(pending.into_iter().collect()));
70 }
71
72 loop {
73 if mpts.epoch() == ts.epoch() {
74 if mpts == ts {
75 break;
76 }
77
78 let have = ctx
80 .mpool
81 .as_ref()
82 .messages_for_blocks(ts.block_headers().iter())?;
83
84 for sm in have {
85 have_cids.insert(sm.cid());
86 }
87 }
88
89 let msgs = ctx
90 .mpool
91 .as_ref()
92 .messages_for_blocks(ts.block_headers().iter())?;
93
94 for m in msgs {
95 if have_cids.contains(&m.cid()) {
96 continue;
97 }
98
99 have_cids.insert(m.cid());
100 pending.push(m);
101 }
102
103 if mpts.epoch() >= ts.epoch() {
104 break;
105 }
106
107 ts = ctx.chain_index().load_required_tipset(ts.parents())?;
108 }
109 Ok(NotNullVec(pending.into_iter().collect()))
110 }
111}
112
113pub enum MpoolSelect {}
115impl RpcMethod<2> for MpoolSelect {
116 const NAME: &'static str = "Filecoin.MpoolSelect";
117 const PARAM_NAMES: [&'static str; 2] = ["tipsetKey", "ticketQuality"];
118 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
119 const PERMISSION: Permission = Permission::Read;
120 const DESCRIPTION: Option<&'static str> =
121 Some("Returns a list of pending messages for inclusion in the next block.");
122
123 type Params = (ApiTipsetKey, f64);
124 type Ok = Vec<SignedMessage>;
125
126 async fn handle(
127 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
128 (ApiTipsetKey(tipset_key), ticket_quality): Self::Params,
129 ) -> Result<Self::Ok, ServerError> {
130 let ts = ctx
131 .chain_store()
132 .load_required_tipset_or_heaviest(&tipset_key)?;
133 Ok(ctx.mpool.select_messages(&ts, ticket_quality)?)
134 }
135}
136
137pub enum MpoolPush {}
139impl RpcMethod<1> for MpoolPush {
140 const NAME: &'static str = "Filecoin.MpoolPush";
141 const PARAM_NAMES: [&'static str; 1] = ["message"];
142 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
143 const PERMISSION: Permission = Permission::Write;
144 const DESCRIPTION: Option<&'static str> = Some("Adds a signed message to the message pool.");
145
146 type Params = (SignedMessage,);
147 type Ok = Cid;
148
149 async fn handle(
150 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
151 (message,): Self::Params,
152 ) -> Result<Self::Ok, ServerError> {
153 let cid = ctx.mpool.as_ref().push(message).await?;
154 Ok(cid)
155 }
156}
157
158pub enum MpoolBatchPush {}
160impl RpcMethod<1> for MpoolBatchPush {
161 const NAME: &'static str = "Filecoin.MpoolBatchPush";
162 const PARAM_NAMES: [&'static str; 1] = ["messages"];
163 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
164 const PERMISSION: Permission = Permission::Write;
165 const DESCRIPTION: Option<&'static str> =
166 Some("Adds a set of signed messages to the message pool.");
167
168 type Params = (Vec<SignedMessage>,);
169 type Ok = Vec<Cid>;
170
171 async fn handle(
172 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
173 (messages,): Self::Params,
174 ) -> Result<Self::Ok, ServerError> {
175 let mut cids = vec![];
176 for msg in messages {
177 cids.push(ctx.mpool.as_ref().push(msg).await?);
178 }
179 Ok(cids)
180 }
181}
182
183pub enum MpoolPushUntrusted {}
185impl RpcMethod<1> for MpoolPushUntrusted {
186 const NAME: &'static str = "Filecoin.MpoolPushUntrusted";
187 const PARAM_NAMES: [&'static str; 1] = ["message"];
188 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
189 const PERMISSION: Permission = Permission::Write;
190 const DESCRIPTION: Option<&'static str> =
191 Some("Adds a message to the message pool with verification checks.");
192
193 type Params = (SignedMessage,);
194 type Ok = Cid;
195
196 async fn handle(
197 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
198 (message,): Self::Params,
199 ) -> Result<Self::Ok, ServerError> {
200 let cid = ctx.mpool.as_ref().push_untrusted(message).await?;
204 Ok(cid)
205 }
206}
207
208pub enum MpoolBatchPushUntrusted {}
210impl RpcMethod<1> for MpoolBatchPushUntrusted {
211 const NAME: &'static str = "Filecoin.MpoolBatchPushUntrusted";
212 const PARAM_NAMES: [&'static str; 1] = ["messages"];
213 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
214 const PERMISSION: Permission = Permission::Write;
215 const DESCRIPTION: Option<&'static str> =
216 Some("Adds a set of messages to the message pool with additional verification checks.");
217
218 type Params = (Vec<SignedMessage>,);
219 type Ok = Vec<Cid>;
220
221 async fn handle(
222 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
223 (messages,): Self::Params,
224 ) -> Result<Self::Ok, ServerError> {
225 MpoolBatchPush::handle(ctx, (messages,)).await
227 }
228}
229
230pub enum MpoolPushMessage {}
232impl RpcMethod<2> for MpoolPushMessage {
233 const NAME: &'static str = "Filecoin.MpoolPushMessage";
234 const PARAM_NAMES: [&'static str; 2] = ["message", "sendSpec"];
235 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
236 const PERMISSION: Permission = Permission::Sign;
237 const DESCRIPTION: Option<&'static str> =
238 Some("Assigns a nonce, signs, and pushes a message to the mempool.");
239
240 type Params = (Message, Option<MessageSendSpec>);
241 type Ok = SignedMessage;
242
243 async fn handle(
244 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
245 (message, send_spec): Self::Params,
246 ) -> Result<Self::Ok, ServerError> {
247 let from = message.from;
248
249 let heaviest_tipset = ctx.chain_store().heaviest_tipset();
250 let key_addr = ctx
251 .state_manager
252 .resolve_to_key_addr(&from, &heaviest_tipset)
253 .await?;
254
255 if message.sequence != 0 {
256 return Err(anyhow::anyhow!(
257 "Expected nonce for MpoolPushMessage is 0, and will be calculated for you"
258 )
259 .into());
260 }
261 let mut message =
262 estimate_message_gas(&ctx, message, send_spec, Default::default()).await?;
263 if message.gas_premium > message.gas_fee_cap {
264 return Err(anyhow::anyhow!(
265 "After estimation, gas premium is greater than gas fee cap"
266 )
267 .into());
268 }
269
270 if from.protocol() == Protocol::ID {
271 message.from = key_addr;
272 }
273 let nonce = ctx.mpool.get_sequence(&from)?;
274 message.sequence = nonce;
275 let key = crate::key_management::Key::try_from(crate::key_management::try_find(
276 &key_addr,
277 &mut ctx.keystore.as_ref().write(),
278 )?)?;
279 let sig = crate::key_management::sign(
280 *key.key_info.key_type(),
281 key.key_info.private_key(),
282 message.cid().to_bytes().as_slice(),
283 )?;
284
285 let smsg = SignedMessage::new_from_parts(message, sig)?;
286
287 ctx.mpool.as_ref().push(smsg.clone()).await?;
288
289 Ok(smsg)
290 }
291}