use super::gas::estimate_message_gas;
use crate::lotus_json::NotNullVec;
use crate::message::SignedMessage;
use crate::rpc::error::ServerError;
use crate::rpc::types::{ApiTipsetKey, MessageSendSpec};
use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod};
use crate::shim::{
address::{Address, Protocol},
message::Message,
};
use ahash::{HashSet, HashSetExt as _};
use cid::Cid;
use enumflags2::BitFlags;
use fvm_ipld_blockstore::Blockstore;
pub enum MpoolGetNonce {}
impl RpcMethod<1> for MpoolGetNonce {
const NAME: &'static str = "Filecoin.MpoolGetNonce";
const PARAM_NAMES: [&'static str; 1] = ["address"];
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
const PERMISSION: Permission = Permission::Read;
const DESCRIPTION: Option<&'static str> =
Some("Returns the current nonce for the specified address.");
type Params = (Address,);
type Ok = u64;
async fn handle(
ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
(address,): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
Ok(ctx.mpool.get_sequence(&address)?)
}
}
pub enum MpoolPending {}
impl RpcMethod<1> for MpoolPending {
const NAME: &'static str = "Filecoin.MpoolPending";
const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
const PERMISSION: Permission = Permission::Read;
const DESCRIPTION: Option<&'static str> =
Some("Returns the pending messages for a given tipset.");
type Params = (ApiTipsetKey,);
type Ok = NotNullVec<SignedMessage>;
async fn handle(
ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
(ApiTipsetKey(tipset_key),): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
let mut ts = ctx
.chain_store()
.load_required_tipset_or_heaviest(&tipset_key)?;
let (mut pending, mpts) = ctx.mpool.pending();
let mut have_cids = HashSet::new();
for item in pending.iter() {
have_cids.insert(item.cid());
}
if mpts.epoch() > ts.epoch() {
return Ok(NotNullVec(pending.into_iter().collect()));
}
loop {
if mpts.epoch() == ts.epoch() {
if mpts == ts {
break;
}
let have = ctx
.mpool
.as_ref()
.messages_for_blocks(ts.block_headers().iter())?;
for sm in have {
have_cids.insert(sm.cid());
}
}
let msgs = ctx
.mpool
.as_ref()
.messages_for_blocks(ts.block_headers().iter())?;
for m in msgs {
if have_cids.contains(&m.cid()) {
continue;
}
have_cids.insert(m.cid());
pending.push(m);
}
if mpts.epoch() >= ts.epoch() {
break;
}
ts = ctx.chain_index().load_required_tipset(ts.parents())?;
}
Ok(NotNullVec(pending.into_iter().collect()))
}
}
pub enum MpoolSelect {}
impl RpcMethod<2> for MpoolSelect {
const NAME: &'static str = "Filecoin.MpoolSelect";
const PARAM_NAMES: [&'static str; 2] = ["tipsetKey", "ticketQuality"];
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
const PERMISSION: Permission = Permission::Read;
const DESCRIPTION: Option<&'static str> =
Some("Returns a list of pending messages for inclusion in the next block.");
type Params = (ApiTipsetKey, f64);
type Ok = Vec<SignedMessage>;
async fn handle(
ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
(ApiTipsetKey(tipset_key), ticket_quality): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
let ts = ctx
.chain_store()
.load_required_tipset_or_heaviest(&tipset_key)?;
Ok(ctx.mpool.select_messages(&ts, ticket_quality)?)
}
}
pub enum MpoolPush {}
impl RpcMethod<1> for MpoolPush {
const NAME: &'static str = "Filecoin.MpoolPush";
const PARAM_NAMES: [&'static str; 1] = ["message"];
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
const PERMISSION: Permission = Permission::Write;
const DESCRIPTION: Option<&'static str> = Some("Adds a signed message to the message pool.");
type Params = (SignedMessage,);
type Ok = Cid;
async fn handle(
ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
(message,): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
let cid = ctx.mpool.as_ref().push(message).await?;
Ok(cid)
}
}
pub enum MpoolBatchPush {}
impl RpcMethod<1> for MpoolBatchPush {
const NAME: &'static str = "Filecoin.MpoolBatchPush";
const PARAM_NAMES: [&'static str; 1] = ["messages"];
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
const PERMISSION: Permission = Permission::Write;
const DESCRIPTION: Option<&'static str> =
Some("Adds a set of signed messages to the message pool.");
type Params = (Vec<SignedMessage>,);
type Ok = Vec<Cid>;
async fn handle(
ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
(messages,): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
let mut cids = vec![];
for msg in messages {
cids.push(ctx.mpool.as_ref().push(msg).await?);
}
Ok(cids)
}
}
pub enum MpoolPushUntrusted {}
impl RpcMethod<1> for MpoolPushUntrusted {
const NAME: &'static str = "Filecoin.MpoolPushUntrusted";
const PARAM_NAMES: [&'static str; 1] = ["message"];
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
const PERMISSION: Permission = Permission::Write;
const DESCRIPTION: Option<&'static str> =
Some("Adds a message to the message pool with verification checks.");
type Params = (SignedMessage,);
type Ok = Cid;
async fn handle(
ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
(message,): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
let cid = ctx.mpool.as_ref().push_untrusted(message).await?;
Ok(cid)
}
}
pub enum MpoolBatchPushUntrusted {}
impl RpcMethod<1> for MpoolBatchPushUntrusted {
const NAME: &'static str = "Filecoin.MpoolBatchPushUntrusted";
const PARAM_NAMES: [&'static str; 1] = ["messages"];
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
const PERMISSION: Permission = Permission::Write;
const DESCRIPTION: Option<&'static str> =
Some("Adds a set of messages to the message pool with additional verification checks.");
type Params = (Vec<SignedMessage>,);
type Ok = Vec<Cid>;
async fn handle(
ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
(messages,): Self::Params,
ext: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
MpoolBatchPush::handle(ctx, (messages,), ext).await
}
}
pub enum MpoolPushMessage {}
impl RpcMethod<2> for MpoolPushMessage {
const NAME: &'static str = "Filecoin.MpoolPushMessage";
const PARAM_NAMES: [&'static str; 2] = ["message", "sendSpec"];
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
const PERMISSION: Permission = Permission::Sign;
const DESCRIPTION: Option<&'static str> =
Some("Assigns a nonce, signs, and pushes a message to the mempool.");
type Params = (Message, Option<MessageSendSpec>);
type Ok = SignedMessage;
async fn handle(
ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
(message, send_spec): Self::Params,
extensions: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
let from = message.from;
let heaviest_tipset = ctx.chain_store().heaviest_tipset();
let key_addr = ctx
.state_manager
.resolve_to_key_addr(&from, &heaviest_tipset)
.await?;
if message.sequence != 0 {
return Err(anyhow::anyhow!(
"Expected nonce for MpoolPushMessage is 0, and will be calculated for you"
)
.into());
}
let _sender_guard = ctx.mpool_locker.take_lock(key_addr).await;
let mut message =
estimate_message_gas(&ctx, message, send_spec, Default::default()).await?;
if message.gas_premium > message.gas_fee_cap {
return Err(anyhow::anyhow!(
"After estimation, gas premium is greater than gas fee cap"
)
.into());
}
if from.protocol() == Protocol::ID {
message.from = key_addr;
}
let balance =
super::wallet::WalletBalance::handle(ctx.clone(), (message.from,), extensions).await?;
let required_funds = &message.value + &message.gas_fee_cap * message.gas_limit;
if balance < required_funds {
return Err(anyhow::anyhow!(
"mpool push: not enough funds: {balance} < {required_funds}",
)
.into());
}
let key = crate::key_management::Key::try_from(crate::key_management::try_find(
&key_addr,
&ctx.keystore.as_ref().read(),
)?)?;
let eth_chain_id = ctx.chain_config().eth_chain_id;
let smsg = ctx
.nonce_tracker
.sign_and_push(ctx.mpool.as_ref(), message, &key, eth_chain_id)
.await?;
Ok(smsg)
}
}