forest/rpc/methods/
mpool.rs1use super::gas::estimate_message_gas;
5use crate::db::EthMappingsStore;
6use crate::lotus_json::NotNullVec;
7use crate::message::SignedMessage;
8use crate::rpc::error::ServerError;
9use crate::rpc::types::{ApiTipsetKey, MessageSendSpec};
10use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod};
11use crate::shim::{
12 address::{Address, Protocol},
13 message::Message,
14};
15use ahash::{HashSet, HashSetExt as _};
16use cid::Cid;
17use enumflags2::BitFlags;
18use fvm_ipld_blockstore::Blockstore;
19
20pub enum MpoolGetNonce {}
22impl RpcMethod<1> for MpoolGetNonce {
23 const NAME: &'static str = "Filecoin.MpoolGetNonce";
24 const PARAM_NAMES: [&'static str; 1] = ["address"];
25 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
26 const PERMISSION: Permission = Permission::Read;
27 const DESCRIPTION: Option<&'static str> =
28 Some("Returns the current nonce for the specified address.");
29
30 type Params = (Address,);
31 type Ok = u64;
32
33 async fn handle(
34 ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
35 (address,): Self::Params,
36 _: &http::Extensions,
37 ) -> Result<Self::Ok, ServerError> {
38 Ok(ctx.mpool.get_sequence(&address)?)
39 }
40}
41
42pub enum MpoolPending {}
44impl RpcMethod<1> for MpoolPending {
45 const NAME: &'static str = "Filecoin.MpoolPending";
46 const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
47 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
48 const PERMISSION: Permission = Permission::Read;
49 const DESCRIPTION: Option<&'static str> =
50 Some("Returns the pending messages for a given tipset.");
51
52 type Params = (ApiTipsetKey,);
53 type Ok = NotNullVec<SignedMessage>;
54
55 async fn handle(
56 ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
57 (ApiTipsetKey(tipset_key),): Self::Params,
58 _: &http::Extensions,
59 ) -> Result<Self::Ok, ServerError> {
60 let mut ts = ctx
61 .chain_store()
62 .load_required_tipset_or_heaviest(&tipset_key)?;
63
64 let (mut pending, mpts) = ctx.mpool.pending();
65
66 let mut have_cids = HashSet::new();
67 for item in pending.iter() {
68 have_cids.insert(item.cid());
69 }
70
71 if mpts.epoch() > ts.epoch() {
72 return Ok(NotNullVec(pending.into_iter().collect()));
73 }
74
75 loop {
76 if mpts.epoch() == ts.epoch() {
77 if mpts == ts {
78 break;
79 }
80
81 let have = ctx
83 .mpool
84 .as_ref()
85 .messages_for_blocks(ts.block_headers().iter())?;
86
87 for sm in have {
88 have_cids.insert(sm.cid());
89 }
90 }
91
92 let msgs = ctx
93 .mpool
94 .as_ref()
95 .messages_for_blocks(ts.block_headers().iter())?;
96
97 for m in msgs {
98 if have_cids.contains(&m.cid()) {
99 continue;
100 }
101
102 have_cids.insert(m.cid());
103 pending.push(m);
104 }
105
106 if mpts.epoch() >= ts.epoch() {
107 break;
108 }
109
110 ts = ctx.chain_index().load_required_tipset(ts.parents())?;
111 }
112 Ok(NotNullVec(pending.into_iter().collect()))
113 }
114}
115
116pub enum MpoolSelect {}
118impl RpcMethod<2> for MpoolSelect {
119 const NAME: &'static str = "Filecoin.MpoolSelect";
120 const PARAM_NAMES: [&'static str; 2] = ["tipsetKey", "ticketQuality"];
121 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
122 const PERMISSION: Permission = Permission::Read;
123 const DESCRIPTION: Option<&'static str> =
124 Some("Returns a list of pending messages for inclusion in the next block.");
125
126 type Params = (ApiTipsetKey, f64);
127 type Ok = Vec<SignedMessage>;
128
129 async fn handle(
130 ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
131 (ApiTipsetKey(tipset_key), ticket_quality): Self::Params,
132 _: &http::Extensions,
133 ) -> Result<Self::Ok, ServerError> {
134 let ts = ctx
135 .chain_store()
136 .load_required_tipset_or_heaviest(&tipset_key)?;
137 Ok(ctx.mpool.select_messages(&ts, ticket_quality)?)
138 }
139}
140
141pub enum MpoolPush {}
143impl RpcMethod<1> for MpoolPush {
144 const NAME: &'static str = "Filecoin.MpoolPush";
145 const PARAM_NAMES: [&'static str; 1] = ["message"];
146 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
147 const PERMISSION: Permission = Permission::Write;
148 const DESCRIPTION: Option<&'static str> = Some("Adds a signed message to the message pool.");
149
150 type Params = (SignedMessage,);
151 type Ok = Cid;
152
153 async fn handle(
154 ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
155 (message,): Self::Params,
156 _: &http::Extensions,
157 ) -> Result<Self::Ok, ServerError> {
158 let cid = ctx.mpool.as_ref().push(message).await?;
159 Ok(cid)
160 }
161}
162
163pub enum MpoolBatchPush {}
165impl RpcMethod<1> for MpoolBatchPush {
166 const NAME: &'static str = "Filecoin.MpoolBatchPush";
167 const PARAM_NAMES: [&'static str; 1] = ["messages"];
168 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
169 const PERMISSION: Permission = Permission::Write;
170 const DESCRIPTION: Option<&'static str> =
171 Some("Adds a set of signed messages to the message pool.");
172
173 type Params = (Vec<SignedMessage>,);
174 type Ok = Vec<Cid>;
175
176 async fn handle(
177 ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
178 (messages,): Self::Params,
179 _: &http::Extensions,
180 ) -> Result<Self::Ok, ServerError> {
181 let mut cids = vec![];
182 for msg in messages {
183 cids.push(ctx.mpool.as_ref().push(msg).await?);
184 }
185 Ok(cids)
186 }
187}
188
189pub enum MpoolPushUntrusted {}
191impl RpcMethod<1> for MpoolPushUntrusted {
192 const NAME: &'static str = "Filecoin.MpoolPushUntrusted";
193 const PARAM_NAMES: [&'static str; 1] = ["message"];
194 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
195 const PERMISSION: Permission = Permission::Write;
196 const DESCRIPTION: Option<&'static str> =
197 Some("Adds a message to the message pool with verification checks.");
198
199 type Params = (SignedMessage,);
200 type Ok = Cid;
201
202 async fn handle(
203 ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
204 (message,): Self::Params,
205 _: &http::Extensions,
206 ) -> Result<Self::Ok, ServerError> {
207 let cid = ctx.mpool.as_ref().push_untrusted(message).await?;
211 Ok(cid)
212 }
213}
214
215pub enum MpoolBatchPushUntrusted {}
217impl RpcMethod<1> for MpoolBatchPushUntrusted {
218 const NAME: &'static str = "Filecoin.MpoolBatchPushUntrusted";
219 const PARAM_NAMES: [&'static str; 1] = ["messages"];
220 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
221 const PERMISSION: Permission = Permission::Write;
222 const DESCRIPTION: Option<&'static str> =
223 Some("Adds a set of messages to the message pool with additional verification checks.");
224
225 type Params = (Vec<SignedMessage>,);
226 type Ok = Vec<Cid>;
227
228 async fn handle(
229 ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
230 (messages,): Self::Params,
231 ext: &http::Extensions,
232 ) -> Result<Self::Ok, ServerError> {
233 MpoolBatchPush::handle(ctx, (messages,), ext).await
235 }
236}
237
238pub enum MpoolPushMessage {}
240impl RpcMethod<2> for MpoolPushMessage {
241 const NAME: &'static str = "Filecoin.MpoolPushMessage";
242 const PARAM_NAMES: [&'static str; 2] = ["message", "sendSpec"];
243 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
244 const PERMISSION: Permission = Permission::Sign;
245 const DESCRIPTION: Option<&'static str> =
246 Some("Assigns a nonce, signs, and pushes a message to the mempool.");
247
248 type Params = (Message, Option<MessageSendSpec>);
249 type Ok = SignedMessage;
250
251 async fn handle(
252 ctx: Ctx<impl Blockstore + EthMappingsStore + Send + Sync + 'static>,
253 (message, send_spec): Self::Params,
254 extensions: &http::Extensions,
255 ) -> Result<Self::Ok, ServerError> {
256 let from = message.from;
257
258 let heaviest_tipset = ctx.chain_store().heaviest_tipset();
259 let key_addr = ctx
260 .state_manager
261 .resolve_to_key_addr(&from, &heaviest_tipset)
262 .await?;
263
264 if message.sequence != 0 {
265 return Err(anyhow::anyhow!(
266 "Expected nonce for MpoolPushMessage is 0, and will be calculated for you"
267 )
268 .into());
269 }
270
271 let _sender_guard = ctx.mpool_locker.take_lock(key_addr).await;
272
273 let mut message =
274 estimate_message_gas(&ctx, message, send_spec, Default::default()).await?;
275 if message.gas_premium > message.gas_fee_cap {
276 return Err(anyhow::anyhow!(
277 "After estimation, gas premium is greater than gas fee cap"
278 )
279 .into());
280 }
281
282 if from.protocol() == Protocol::ID {
283 message.from = key_addr;
284 }
285
286 let balance =
287 super::wallet::WalletBalance::handle(ctx.clone(), (message.from,), extensions).await?;
288 let required_funds = &message.value + &message.gas_fee_cap * message.gas_limit;
289 if balance < required_funds {
290 return Err(anyhow::anyhow!(
291 "mpool push: not enough funds: {balance} < {required_funds}",
292 )
293 .into());
294 }
295
296 let key = crate::key_management::Key::try_from(crate::key_management::try_find(
297 &key_addr,
298 &ctx.keystore.as_ref().read(),
299 )?)?;
300 let eth_chain_id = ctx.chain_config().eth_chain_id;
301
302 let smsg = ctx
303 .nonce_tracker
304 .sign_and_push(ctx.mpool.as_ref(), message, &key, eth_chain_id)
305 .await?;
306
307 Ok(smsg)
308 }
309}