#[cfg(test)]
mod tests;
mod cache;
pub mod chain_rand;
pub mod circulating_supply;
mod errors;
pub mod utils;
pub use self::errors::*;
use self::utils::structured;
use crate::beacon::{BeaconEntry, BeaconSchedule};
use crate::blocks::{Tipset, TipsetKey};
use crate::chain::{
ChainStore,
index::{ChainIndex, ResolveNullTipset},
};
use crate::interpreter::{
BlockMessages, CalledAt, ExecutionContext, IMPLICIT_MESSAGE_GAS_LIMIT, VM, resolve_to_key_addr,
};
use crate::interpreter::{MessageCallbackCtx, VMTrace};
use crate::lotus_json::{LotusJson, lotus_json_with_self};
use crate::message::{ChainMessage, MessageRead as _, MessageReadWrite as _, SignedMessage};
use crate::networks::ChainConfig;
use crate::rpc::state::{ApiInvocResult, InvocResult, MessageGasCost};
use crate::rpc::types::{MiningBaseInfo, SectorOnChainInfo};
use crate::shim::actors::init::{self, State};
use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition};
use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim};
use crate::shim::actors::*;
use crate::shim::address::AddressId;
use crate::shim::crypto::{Signature, SignatureType};
use crate::shim::{
actors::{
LoadActorStateFromBlockstore, miner::ext::MinerStateExt as _,
verifreg::ext::VerifiedRegistryStateExt as _,
},
executor::{ApplyRet, Receipt, StampedEvent},
};
use crate::shim::{
address::{Address, Payload, Protocol},
clock::ChainEpoch,
econ::TokenAmount,
machine::{GLOBAL_MULTI_ENGINE, MultiEngine},
message::Message,
randomness::Randomness,
runtime::Policy,
state_tree::{ActorState, StateTree},
version::NetworkVersion,
};
use crate::state_manager::cache::TipsetStateCache;
use crate::state_manager::chain_rand::draw_randomness;
use crate::state_migration::run_state_migrations;
use crate::utils::ShallowClone as _;
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::get_size::{GetSize, vec_heap_size_helper};
use ahash::{HashMap, HashMapExt};
use anyhow::{Context as _, bail, ensure};
use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _};
use chain_rand::ChainRand;
use cid::Cid;
pub use circulating_supply::GenesisInfo;
use fil_actor_verifreg_state::v12::DataCap;
use fil_actor_verifreg_state::v13::ClaimID;
use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0};
use fil_actors_shared::fvm_ipld_bitfield::BitField;
use fil_actors_shared::v12::runtime::DomainSeparationTag;
use futures::{FutureExt, channel::oneshot, select};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::to_vec;
use fvm_shared4::crypto::signature::SECP_SIG_LEN;
use itertools::Itertools as _;
use nonzero_ext::nonzero;
use num::BigInt;
use num_traits::identities::Zero;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::ops::RangeInclusive;
use std::time::Duration;
use std::{num::NonZeroUsize, sync::Arc};
use tokio::sync::{RwLock, broadcast::error::RecvError};
use tracing::{error, info, instrument, warn};
const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
const DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
pub const EVENTS_AMT_BITWIDTH: u32 = 5;
pub type IdToAddressCache = SizeTrackingLruCache<AddressId, Address>;
#[derive(Debug, Clone)]
pub struct ExecutedMessage {
pub message: ChainMessage,
pub receipt: Receipt,
pub events: Option<Vec<StampedEvent>>,
}
impl GetSize for ExecutedMessage {
fn get_heap_size(&self) -> usize {
self.message.get_heap_size()
+ self.receipt.get_heap_size()
+ self
.events
.as_ref()
.map(vec_heap_size_helper)
.unwrap_or_default()
}
}
#[derive(Debug, Clone, GetSize)]
pub struct ExecutedTipset {
#[get_size(ignore)]
pub state_root: Cid,
#[get_size(ignore)]
pub receipt_root: Cid,
pub executed_messages: Arc<Vec<ExecutedMessage>>,
}
#[derive(Debug, Clone, GetSize)]
pub struct TipsetState {
#[get_size(ignore)]
pub state_root: Cid,
#[allow(dead_code)]
#[get_size(ignore)]
pub receipt_root: Cid,
}
impl From<ExecutedTipset> for TipsetState {
fn from(
ExecutedTipset {
state_root,
receipt_root,
..
}: ExecutedTipset,
) -> Self {
Self {
state_root,
receipt_root,
}
}
}
impl From<&ExecutedTipset> for TipsetState {
fn from(
ExecutedTipset {
state_root,
receipt_root,
..
}: &ExecutedTipset,
) -> Self {
Self {
state_root: *state_root,
receipt_root: *receipt_root,
}
}
}
#[derive(
Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema,
)]
#[serde(rename_all = "PascalCase")]
pub struct MarketBalance {
#[schemars(with = "LotusJson<TokenAmount>")]
#[serde(with = "crate::lotus_json")]
pub escrow: TokenAmount,
#[schemars(with = "LotusJson<TokenAmount>")]
#[serde(with = "crate::lotus_json")]
pub locked: TokenAmount,
}
lotus_json_with_self!(MarketBalance);
pub struct StateManager<DB> {
cs: Arc<ChainStore<DB>>,
cache: TipsetStateCache<ExecutedTipset>,
id_to_deterministic_address_cache: IdToAddressCache,
beacon: Arc<crate::beacon::BeaconSchedule>,
engine: Arc<MultiEngine>,
}
#[allow(clippy::type_complexity)]
pub const NO_CALLBACK: Option<fn(MessageCallbackCtx<'_>) -> anyhow::Result<()>> = None;
impl<DB> StateManager<DB>
where
DB: Blockstore,
{
pub fn new(cs: Arc<ChainStore<DB>>) -> anyhow::Result<Self> {
Self::new_with_engine(cs, GLOBAL_MULTI_ENGINE.clone())
}
pub fn new_with_engine(
cs: Arc<ChainStore<DB>>,
engine: Arc<MultiEngine>,
) -> anyhow::Result<Self> {
let genesis = cs.genesis_block_header();
let beacon = Arc::new(cs.chain_config().get_beacon_schedule(genesis.timestamp));
Ok(Self {
cs,
cache: TipsetStateCache::new("executed_tipset"), beacon,
engine,
id_to_deterministic_address_cache: SizeTrackingLruCache::new_with_metrics(
"id_to_deterministic_address".into(),
DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE,
),
})
}
pub fn heaviest_tipset(&self) -> Tipset {
self.chain_store().heaviest_tipset()
}
pub fn maybe_rewind_heaviest_tipset(&self) -> anyhow::Result<()> {
while self.maybe_rewind_heaviest_tipset_once()? {}
Ok(())
}
fn maybe_rewind_heaviest_tipset_once(&self) -> anyhow::Result<bool> {
let head = self.heaviest_tipset();
if let Some(info) = self
.chain_config()
.network_height_with_actor_bundle(head.epoch())
{
let expected_height_info = info.info;
let expected_bundle = info.manifest(self.blockstore())?;
let expected_bundle_metadata = expected_bundle.metadata()?;
let state = self.get_state_tree(head.parent_state())?;
let bundle_metadata = state.get_actor_bundle_metadata()?;
if expected_bundle_metadata != bundle_metadata {
let current_epoch = head.epoch();
let target_head = self.chain_index().tipset_by_height(
(expected_height_info.epoch - 1).max(0),
head,
ResolveNullTipset::TakeOlder,
)?;
let target_epoch = target_head.epoch();
let bundle_version = &bundle_metadata.version;
let expected_bundle_version = &expected_bundle_metadata.version;
if target_epoch < current_epoch {
tracing::warn!(
"rewinding chain head from {current_epoch} to {target_epoch}, actor bundle: {bundle_version}, expected: {expected_bundle_version}"
);
if self.blockstore().has(target_head.parent_state())? {
self.chain_store().set_heaviest_tipset(target_head)?;
return Ok(true);
} else {
anyhow::bail!(
"failed to rewind, state tree @ {target_epoch} is missing from blockstore: {}",
target_head.parent_state()
);
}
}
}
}
Ok(false)
}
pub fn beacon_schedule(&self) -> &Arc<BeaconSchedule> {
&self.beacon
}
pub fn get_network_version(&self, epoch: ChainEpoch) -> NetworkVersion {
self.chain_config().network_version(epoch)
}
pub fn get_state_tree(&self, state_cid: &Cid) -> anyhow::Result<StateTree<DB>> {
StateTree::new_from_root(self.blockstore_owned(), state_cid)
}
pub fn get_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<Option<ActorState>> {
let state = self.get_state_tree(&state_cid)?;
state.get_actor(addr)
}
pub fn get_actor_state<S: LoadActorStateFromBlockstore>(
&self,
ts: &Tipset,
) -> anyhow::Result<S> {
let state_tree = self.get_state_tree(ts.parent_state())?;
state_tree.get_actor_state()
}
pub fn get_actor_state_from_address<S: LoadActorStateFromBlockstore>(
&self,
ts: &Tipset,
actor_address: &Address,
) -> anyhow::Result<S> {
let state_tree = self.get_state_tree(ts.parent_state())?;
state_tree.get_actor_state_from_address(actor_address)
}
pub fn get_required_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<ActorState> {
let state = self.get_state_tree(&state_cid)?;
state.get_actor(addr)?.with_context(|| {
format!("Failed to load actor with addr={addr}, state_cid={state_cid}")
})
}
pub fn blockstore(&self) -> &Arc<DB> {
self.cs.blockstore()
}
pub fn blockstore_owned(&self) -> Arc<DB> {
self.blockstore().clone()
}
pub fn chain_store(&self) -> &Arc<ChainStore<DB>> {
&self.cs
}
pub fn chain_index(&self) -> &ChainIndex<DB> {
self.cs.chain_index()
}
pub fn chain_config(&self) -> &Arc<ChainConfig> {
self.cs.chain_config()
}
pub fn chain_rand(&self, tipset: Tipset) -> ChainRand<DB> {
ChainRand::new(
self.chain_config().shallow_clone(),
tipset,
self.chain_index().shallow_clone(),
self.beacon.shallow_clone(),
)
}
pub fn get_network_state_name(
&self,
state_cid: Cid,
) -> anyhow::Result<crate::networks::StateNetworkName> {
let init_act = self
.get_actor(&init::ADDRESS.into(), state_cid)?
.ok_or_else(|| Error::state("Init actor address could not be resolved"))?;
Ok(
State::load(self.blockstore(), init_act.code, init_act.state)?
.into_network_name()
.into(),
)
}
pub fn is_miner_slashed(&self, addr: &Address, state_cid: &Cid) -> anyhow::Result<bool, Error> {
let actor = self
.get_actor(&Address::POWER_ACTOR, *state_cid)?
.ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
Ok(spas.miner_power(self.blockstore(), addr)?.is_none())
}
pub fn get_miner_work_addr(&self, state_cid: Cid, addr: &Address) -> Result<Address, Error> {
let state =
StateTree::new_from_root(self.blockstore_owned(), &state_cid).map_err(Error::other)?;
let ms: miner::State = state.get_actor_state_from_address(addr)?;
let info = ms.info(self.blockstore()).map_err(|e| e.to_string())?;
let addr = resolve_to_key_addr(&state, self.blockstore(), &info.worker())?;
Ok(addr)
}
pub fn get_power(
&self,
state_cid: &Cid,
addr: Option<&Address>,
) -> anyhow::Result<Option<(power::Claim, power::Claim)>, Error> {
let actor = self
.get_actor(&Address::POWER_ACTOR, *state_cid)?
.ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
let t_pow = spas.total_power();
if let Some(maddr) = addr {
let m_pow = spas
.miner_power(self.blockstore(), maddr)?
.ok_or_else(|| Error::state(format!("Miner for address {maddr} not found")))?;
let min_pow = spas.miner_nominal_power_meets_consensus_minimum(
&self.chain_config().policy,
self.blockstore(),
maddr,
)?;
if min_pow {
return Ok(Some((m_pow, t_pow)));
}
}
Ok(None)
}
pub fn get_all_sectors(
&self,
addr: &Address,
ts: &Tipset,
) -> anyhow::Result<Vec<SectorOnChainInfo>> {
let actor = self
.get_actor(addr, *ts.parent_state())?
.ok_or_else(|| Error::state("Miner actor not found"))?;
let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
state.load_sectors_ext(self.blockstore(), None)
}
}
impl<DB> StateManager<DB>
where
DB: Blockstore + Send + Sync + 'static,
{
pub async fn load_tipset_state(self: &Arc<Self>, ts: &Tipset) -> anyhow::Result<TipsetState> {
if let Some(state) = self.cache.get_map(ts.key(), |et| et.into()) {
Ok(state)
} else if let Ok(receipt_ts) = self.chain_store().load_child_tipset(ts) {
Ok(TipsetState {
state_root: *receipt_ts.parent_state(),
receipt_root: *receipt_ts.parent_message_receipts(),
})
} else {
Ok(self.load_executed_tipset(ts).await?.into())
}
}
pub async fn load_executed_tipset(
self: &Arc<Self>,
ts: &Tipset,
) -> anyhow::Result<ExecutedTipset> {
if ts.epoch() >= self.heaviest_tipset().epoch()
&& let Some(cached) = self.cache.get(ts.key())
{
if StateTree::new_from_root(self.blockstore_owned(), &cached.state_root).is_ok() {
return Ok(cached);
} else {
self.cache.remove(ts.key());
}
}
self.cache
.get_or_else(ts.key(), || async move {
let receipt_ts = self.chain_store().load_child_tipset(ts).ok();
self.load_executed_tipset_inner(ts, receipt_ts.as_ref())
.await
})
.await
}
async fn load_executed_tipset_inner(
self: &Arc<Self>,
msg_ts: &Tipset,
receipt_ts: Option<&Tipset>,
) -> anyhow::Result<ExecutedTipset> {
if let Some(receipt_ts) = receipt_ts {
anyhow::ensure!(
msg_ts.key() == receipt_ts.parents(),
"message tipset should be the parent of message receipt tipset"
);
}
let mut recomputed = false;
let (state_root, receipt_root, receipts) = match receipt_ts.and_then(|ts| {
let receipt_root = *ts.parent_message_receipts();
Receipt::get_receipts(self.cs.blockstore(), receipt_root)
.ok()
.map(|r| (*ts.parent_state(), receipt_root, r))
}) {
Some((state_root, receipt_root, receipts)) => (state_root, receipt_root, receipts),
None => {
let state_output = self
.compute_tipset_state(msg_ts.shallow_clone(), NO_CALLBACK, VMTrace::NotTraced)
.await?;
recomputed = true;
(
state_output.state_root,
state_output.receipt_root,
Receipt::get_receipts(self.cs.blockstore(), state_output.receipt_root)?,
)
}
};
let messages = self.chain_store().messages_for_tipset(msg_ts)?;
anyhow::ensure!(
messages.len() == receipts.len(),
"mismatching message and receipt counts ({} messages, {} receipts)",
messages.len(),
receipts.len()
);
let mut executed_messages = Vec::with_capacity(messages.len());
for (message, receipt) in messages.iter().cloned().zip(receipts) {
let events = if let Some(events_root) = receipt.events_root() {
Some(
match StampedEvent::get_events(self.cs.blockstore(), &events_root) {
Ok(events) => events,
Err(e) if recomputed => return Err(e),
Err(_) => {
self.compute_tipset_state(
msg_ts.shallow_clone(),
NO_CALLBACK,
VMTrace::NotTraced,
)
.await?;
recomputed = true;
StampedEvent::get_events(self.cs.blockstore(), &events_root)?
}
},
)
} else {
None
};
executed_messages.push(ExecutedMessage {
message,
receipt,
events,
});
}
Ok(ExecutedTipset {
state_root,
receipt_root,
executed_messages: Arc::new(executed_messages),
})
}
#[instrument(skip(self, rand))]
fn call_raw(
&self,
state_cid: Option<Cid>,
msg: &Message,
rand: ChainRand<DB>,
tipset: &Tipset,
) -> Result<ApiInvocResult, Error> {
let mut msg = msg.clone();
let state_cid = state_cid.unwrap_or(*tipset.parent_state());
let tipset_messages = self
.chain_store()
.messages_for_tipset(tipset)
.map_err(|err| Error::Other(err.to_string()))?;
let prior_messsages = tipset_messages
.iter()
.filter(|ts_msg| ts_msg.message().from() == msg.from());
let height = tipset.epoch();
let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
let mut vm = VM::new(
ExecutionContext {
heaviest_tipset: tipset.shallow_clone(),
state_tree_root: state_cid,
epoch: height,
rand: Box::new(rand),
base_fee: tipset.block_headers().first().parent_base_fee.clone(),
circ_supply: genesis_info.get_vm_circulating_supply(
height,
self.blockstore(),
&state_cid,
)?,
chain_config: self.chain_config().shallow_clone(),
chain_index: self.chain_index().shallow_clone(),
timestamp: tipset.min_timestamp(),
},
&self.engine,
VMTrace::Traced,
)?;
for m in prior_messsages {
vm.apply_message(m)?;
}
let state_cid = vm.flush()?;
let state = StateTree::new_from_root(self.blockstore_owned(), &state_cid)?;
let from_actor = state
.get_actor(&msg.from())?
.ok_or_else(|| anyhow::anyhow!("actor not found"))?;
msg.set_sequence(from_actor.sequence);
let mut msg = msg.clone();
msg.gas_limit = IMPLICIT_MESSAGE_GAS_LIMIT as u64;
let (apply_ret, duration) = vm.apply_implicit_message(&msg)?;
Ok(ApiInvocResult {
msg: msg.clone(),
msg_rct: Some(apply_ret.msg_receipt()),
msg_cid: msg.cid(),
error: apply_ret.failure_info().unwrap_or_default(),
duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
gas_cost: MessageGasCost::default(),
execution_trace: structured::parse_events(apply_ret.exec_trace()).unwrap_or_default(),
})
}
pub fn call(&self, message: &Message, tipset: Option<Tipset>) -> Result<ApiInvocResult, Error> {
let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
let chain_rand = self.chain_rand(ts.shallow_clone());
self.call_raw(None, message, chain_rand, &ts)
}
pub fn call_on_state(
&self,
state_cid: Cid,
message: &Message,
tipset: Option<Tipset>,
) -> Result<ApiInvocResult, Error> {
let ts = tipset.unwrap_or_else(|| self.cs.heaviest_tipset());
let chain_rand = self.chain_rand(ts.shallow_clone());
self.call_raw(Some(state_cid), message, chain_rand, &ts)
}
pub async fn apply_on_state_with_gas(
self: &Arc<Self>,
tipset: Option<Tipset>,
msg: Message,
vm_flush: VMFlush,
) -> anyhow::Result<(ApiInvocResult, Option<Cid>)> {
let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
let from_a = self.resolve_to_key_addr(&msg.from, &ts).await?;
let mut chain_msg = match from_a.protocol() {
Protocol::Secp256k1 => SignedMessage::new_unchecked(
msg.clone(),
Signature::new_secp256k1(vec![0; SECP_SIG_LEN]),
)
.into(),
Protocol::Delegated => SignedMessage::new_unchecked(
msg.clone(),
Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]),
)
.into(),
_ => msg.clone().into(),
};
let (_invoc_res, apply_ret, duration, state_root) = self
.call_with_gas(&mut chain_msg, &[], Some(ts), vm_flush)
.await?;
Ok((
ApiInvocResult {
msg_cid: msg.cid(),
msg,
msg_rct: Some(apply_ret.msg_receipt()),
error: apply_ret.failure_info().unwrap_or_default(),
duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
gas_cost: MessageGasCost::default(),
execution_trace: structured::parse_events(apply_ret.exec_trace())
.unwrap_or_default(),
},
state_root,
))
}
pub async fn call_with_gas(
self: &Arc<Self>,
message: &mut ChainMessage,
prior_messages: &[ChainMessage],
tipset: Option<Tipset>,
vm_flush: VMFlush,
) -> Result<(InvocResult, ApplyRet, Duration, Option<Cid>), Error> {
let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
let TipsetState { state_root, .. } = self
.load_tipset_state(&ts)
.await
.map_err(|e| Error::Other(format!("Could not load tipset state: {e:#}")))?;
let chain_rand = self.chain_rand(ts.clone());
let epoch = ts.epoch() + 1;
let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
let (ret, duration, state_cid) = stacker::grow(64 << 20, || -> anyhow::Result<_> {
let mut vm = VM::new(
ExecutionContext {
heaviest_tipset: ts.clone(),
state_tree_root: state_root,
epoch,
rand: Box::new(chain_rand),
base_fee: ts.block_headers().first().parent_base_fee.clone(),
circ_supply: genesis_info.get_vm_circulating_supply(
epoch,
self.blockstore(),
&state_root,
)?,
chain_config: self.chain_config().shallow_clone(),
chain_index: self.chain_index().shallow_clone(),
timestamp: ts.min_timestamp(),
},
&self.engine,
VMTrace::NotTraced,
)?;
for msg in prior_messages {
vm.apply_message(msg)?;
}
let from_actor = vm
.get_actor(&message.from())
.map_err(|e| Error::Other(format!("Could not get actor from state: {e:#}")))?
.ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?;
message.set_sequence(from_actor.sequence);
let (ret, duration) = vm.apply_message(message)?;
let state_root = match vm_flush {
VMFlush::Flush => Some(vm.flush()?),
VMFlush::Skip => None,
};
Ok((ret, duration, state_root))
})?;
Ok((
InvocResult::new(message.message().clone(), &ret),
ret,
duration,
state_cid,
))
}
pub async fn replay(self: &Arc<Self>, ts: Tipset, mcid: Cid) -> Result<ApiInvocResult, Error> {
let this = Arc::clone(self);
tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid)).await?
}
pub fn replay_blocking(
self: &Arc<Self>,
ts: Tipset,
mcid: Cid,
) -> Result<ApiInvocResult, Error> {
const REPLAY_HALT: &str = "replay_halt";
let mut api_invoc_result = None;
let callback = |ctx: MessageCallbackCtx<'_>| {
match ctx.at {
CalledAt::Applied | CalledAt::Reward
if api_invoc_result.is_none() && ctx.cid == mcid =>
{
api_invoc_result = Some(ApiInvocResult {
msg_cid: ctx.message.cid(),
msg: ctx.message.message().clone(),
msg_rct: Some(ctx.apply_ret.msg_receipt()),
error: ctx.apply_ret.failure_info().unwrap_or_default(),
duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
.unwrap_or_default(),
});
anyhow::bail!(REPLAY_HALT);
}
_ => Ok(()), }
};
let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced);
if let Err(error_message) = result
&& error_message.to_string() != REPLAY_HALT
{
return Err(Error::Other(format!(
"unexpected error during execution : {error_message:}"
)));
}
api_invoc_result.ok_or_else(|| Error::Other("failed to replay".into()))
}
pub async fn replay_for_prestate(
self: &Arc<Self>,
ts: Tipset,
target_message_cid: Cid,
) -> Result<(Cid, ApiInvocResult, Cid), Error> {
let this = Arc::clone(self);
tokio::task::spawn_blocking(move || {
this.replay_for_prestate_blocking(ts, target_message_cid)
})
.await
.map_err(|e| Error::Other(format!("{e}")))?
}
fn replay_for_prestate_blocking(
self: &Arc<Self>,
ts: Tipset,
target_msg_cid: Cid,
) -> Result<(Cid, ApiInvocResult, Cid), Error> {
if ts.epoch() == 0 {
return Err(Error::Other(
"cannot trace messages in the genesis block".into(),
));
}
let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
let exec = TipsetExecutor::new(
self.chain_index().shallow_clone(),
self.chain_config().shallow_clone(),
self.beacon_schedule().shallow_clone(),
&self.engine,
ts.shallow_clone(),
);
let mut no_cb = NO_CALLBACK;
let (parent_state, epoch, block_messages) =
exec.prepare_parent_state(genesis_timestamp, VMTrace::NotTraced, &mut no_cb)?;
Ok(stacker::grow(64 << 20, || {
let mut vm =
exec.create_vm(parent_state, epoch, ts.min_timestamp(), VMTrace::NotTraced)?;
let mut processed = ahash::HashSet::default();
for block in block_messages.iter() {
let mut penalty = TokenAmount::zero();
let mut gas_reward = TokenAmount::zero();
for msg in block.messages.iter() {
let cid = msg.cid();
if processed.contains(&cid) {
continue;
}
processed.insert(cid);
if cid == target_msg_cid {
let pre_root = vm.flush()?;
let mut traced_vm =
exec.create_vm(pre_root, epoch, ts.min_timestamp(), VMTrace::Traced)?;
let (ret, duration) = traced_vm.apply_message(msg)?;
let post_root = traced_vm.flush()?;
return Ok((
pre_root,
ApiInvocResult {
msg_cid: cid,
msg: msg.message().clone(),
msg_rct: Some(ret.msg_receipt()),
error: ret.failure_info().unwrap_or_default(),
duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
gas_cost: MessageGasCost::default(),
execution_trace: structured::parse_events(ret.exec_trace())
.unwrap_or_default(),
},
post_root,
));
}
let (ret, _) = vm.apply_message(msg)?;
gas_reward += ret.miner_tip();
penalty += ret.penalty();
}
if let Some(rew_msg) =
vm.reward_message(epoch, block.miner, block.win_count, penalty, gas_reward)?
{
let (ret, _) = vm.apply_implicit_message(&rew_msg)?;
if let Some(err) = ret.failure_info() {
bail!(
"failed to apply reward message for miner {}: {err}",
block.miner
);
}
if !ret.msg_receipt().exit_code().is_success() {
bail!(
"reward application message failed (exit: {:?})",
ret.msg_receipt().exit_code()
);
}
}
}
bail!("message {target_msg_cid} not found in tipset")
})?)
}
pub fn eligible_to_mine(
&self,
address: &Address,
base_tipset: &Tipset,
lookback_tipset: &Tipset,
) -> anyhow::Result<bool, Error> {
let hmp =
self.miner_has_min_power(&self.chain_config().policy, address, lookback_tipset)?;
let version = self.get_network_version(base_tipset.epoch());
if version <= NetworkVersion::V3 {
return Ok(hmp);
}
if !hmp {
return Ok(false);
}
let actor = self
.get_actor(&Address::POWER_ACTOR, *base_tipset.parent_state())?
.ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
let power_state = power::State::load(self.blockstore(), actor.code, actor.state)?;
let actor = self
.get_actor(address, *base_tipset.parent_state())?
.ok_or_else(|| Error::state("Miner actor address could not be resolved"))?;
let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
let claim = power_state
.miner_power(self.blockstore(), address)?
.ok_or_else(|| Error::Other("Could not get claim".to_string()))?;
if claim.quality_adj_power <= BigInt::zero() {
return Ok(false);
}
if !miner_state.fee_debt().is_zero() {
return Ok(false);
}
let info = miner_state.info(self.blockstore())?;
if base_tipset.epoch() <= info.consensus_fault_elapsed {
return Ok(false);
}
Ok(true)
}
pub async fn compute_tipset_state(
self: &Arc<Self>,
tipset: Tipset,
callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
enable_tracing: VMTrace,
) -> Result<ExecutedTipset, Error> {
let this = Arc::clone(self);
tokio::task::spawn_blocking(move || {
this.compute_tipset_state_blocking(tipset, callback, enable_tracing)
})
.await?
}
pub fn compute_tipset_state_blocking(
&self,
tipset: Tipset,
callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
enable_tracing: VMTrace,
) -> Result<ExecutedTipset, Error> {
let epoch = tipset.epoch();
let has_callback = callback.is_some();
info!(
"Evaluating tipset: EPOCH={epoch}, blocks={}, tsk={}",
tipset.len(),
tipset.key(),
);
Ok(apply_block_messages(
self.chain_store().genesis_block_header().timestamp,
self.chain_index().shallow_clone(),
self.chain_config().shallow_clone(),
self.beacon_schedule().shallow_clone(),
&self.engine,
tipset,
callback,
enable_tracing,
)
.map_err(|e| {
if has_callback {
e
} else {
e.context(format!("Failed to compute tipset state@{epoch}"))
}
})?)
}
#[instrument(skip_all)]
pub async fn compute_state(
self: &Arc<Self>,
height: ChainEpoch,
messages: Vec<Message>,
tipset: Tipset,
callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
enable_tracing: VMTrace,
) -> Result<ExecutedTipset, Error> {
let this = Arc::clone(self);
tokio::task::spawn_blocking(move || {
this.compute_state_blocking(height, messages, tipset, callback, enable_tracing)
})
.await?
}
#[tracing::instrument(skip_all)]
pub fn compute_state_blocking(
&self,
height: ChainEpoch,
messages: Vec<Message>,
tipset: Tipset,
callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
enable_tracing: VMTrace,
) -> Result<ExecutedTipset, Error> {
Ok(compute_state(
height,
messages,
tipset,
self.chain_store().genesis_block_header().timestamp,
self.chain_index().shallow_clone(),
self.chain_config().shallow_clone(),
self.beacon_schedule().shallow_clone(),
&self.engine,
callback,
enable_tracing,
)?)
}
fn tipset_executed_message(
&self,
tipset: &Tipset,
message: &ChainMessage,
allow_replaced: bool,
) -> Result<Option<Receipt>, Error> {
if tipset.epoch() == 0 {
return Ok(None);
}
let message_from_address = message.from();
let message_sequence = message.sequence();
let pts = self
.chain_index()
.load_required_tipset(tipset.parents())
.map_err(|err| Error::Other(format!("Failed to load tipset: {err}")))?;
let messages = self
.cs
.messages_for_tipset(&pts)
.map_err(|err| Error::Other(format!("Failed to load messages for tipset: {err}")))?;
messages
.iter()
.enumerate()
.rev()
.filter(|(_, s)| {
s.sequence() == message_sequence
&& s.from() == message_from_address
&& s.equal_call(message)
})
.map(|(index, m)| {
if !allow_replaced && message.cid() != m.cid(){
Err(Error::Other(format!(
"found message with equal nonce and call params but different CID. wanted {}, found: {}, nonce: {}, from: {}",
message.cid(),
m.cid(),
message.sequence(),
message.from(),
)))
} else {
let block_header = tipset.block_headers().first();
crate::chain::get_parent_receipt(
self.blockstore(),
block_header,
index,
)
.map_err(|err| Error::Other(format!("Failed to get parent receipt (message_receipts={}, index={index}, error={err})", block_header.message_receipts)))
}
})
.next()
.unwrap_or(Ok(None))
}
fn check_search(
&self,
mut current: Tipset,
message: &ChainMessage,
lookback_max_epoch: ChainEpoch,
allow_replaced: bool,
) -> Result<Option<(Tipset, Receipt)>, Error> {
let message_from_address = message.from();
let message_sequence = message.sequence();
let mut current_actor_state = self
.get_required_actor(&message_from_address, *current.parent_state())
.map_err(Error::state)?;
let message_from_id = self.lookup_required_id(&message_from_address, ¤t)?;
while current.epoch() >= lookback_max_epoch {
let parent_tipset = self
.chain_index()
.load_required_tipset(current.parents())
.map_err(|err| {
Error::Other(format!(
"failed to load tipset during msg wait searchback: {err:}"
))
})?;
let parent_actor_state = self
.get_actor(&message_from_id, *parent_tipset.parent_state())
.map_err(|e| Error::State(e.to_string()))?;
if parent_actor_state.is_none()
|| (current_actor_state.sequence > message_sequence
&& parent_actor_state.as_ref().unwrap().sequence <= message_sequence)
{
let receipt = self
.tipset_executed_message(¤t, message, allow_replaced)?
.context("Failed to get receipt with tipset_executed_message")?;
return Ok(Some((current, receipt)));
}
if let Some(parent_actor_state) = parent_actor_state {
current = parent_tipset;
current_actor_state = parent_actor_state;
} else {
break;
}
}
Ok(None)
}
fn search_back_for_message(
&self,
current: Tipset,
message: &ChainMessage,
look_back_limit: Option<i64>,
allow_replaced: Option<bool>,
) -> Result<Option<(Tipset, Receipt)>, Error> {
let current_epoch = current.epoch();
let allow_replaced = allow_replaced.unwrap_or(true);
let lookback_max_epoch = match look_back_limit {
Some(0) => return Ok(None),
Some(limit) if limit > 0 => (current_epoch - limit + 1).max(0),
_ => 0,
};
self.check_search(current, message, lookback_max_epoch, allow_replaced)
}
pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result<Receipt, Error> {
let m = crate::chain::get_chain_message(self.blockstore(), &msg)
.map_err(|e| Error::Other(e.to_string()))?;
let message_receipt = self.tipset_executed_message(&tipset, &m, true)?;
if let Some(receipt) = message_receipt {
return Ok(receipt);
}
let maybe_tuple = self.search_back_for_message(tipset, &m, None, None)?;
let message_receipt = maybe_tuple
.ok_or_else(|| {
Error::Other("Could not get receipt from search back message".to_string())
})?
.1;
Ok(message_receipt)
}
pub async fn wait_for_message(
self: &Arc<Self>,
msg_cid: Cid,
confidence: i64,
look_back_limit: Option<ChainEpoch>,
allow_replaced: Option<bool>,
) -> Result<(Option<Tipset>, Option<Receipt>), Error> {
let mut head_changes_rx = self.cs.subscribe_head_changes();
let (sender, mut receiver) = oneshot::channel::<()>();
let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
.map_err(|err| Error::Other(format!("failed to load message {err:}")))?;
let current_tipset = self.heaviest_tipset();
let maybe_message_receipt =
self.tipset_executed_message(¤t_tipset, &message, true)?;
if let Some(r) = maybe_message_receipt {
return Ok((Some(current_tipset.shallow_clone()), Some(r)));
}
let mut candidate_tipset: Option<Tipset> = None;
let mut candidate_receipt: Option<Receipt> = None;
let sm_cloned = self.shallow_clone();
let message_for_task = message.clone();
let height_of_head = current_tipset.epoch();
let task = tokio::task::spawn(async move {
let back_tuple = sm_cloned.search_back_for_message(
current_tipset,
&message_for_task,
look_back_limit,
allow_replaced,
)?;
sender
.send(())
.map_err(|e| Error::Other(format!("Could not send to channel {e:?}")))?;
Ok::<_, Error>(back_tuple)
});
let reverts: Arc<RwLock<HashMap<TipsetKey, bool>>> = Arc::new(RwLock::new(HashMap::new()));
let block_revert = reverts.clone();
let sm_cloned = Arc::clone(self);
let mut subscriber_poll = tokio::task::spawn(async move {
loop {
match head_changes_rx.recv().await {
Ok(head_changes) => {
for tipset in head_changes.reverts {
if candidate_tipset
.as_ref()
.is_some_and(|candidate| candidate.key() == tipset.key())
{
candidate_tipset = None;
candidate_receipt = None;
}
}
for tipset in head_changes.applies {
if candidate_tipset
.as_ref()
.map(|s| tipset.epoch() >= s.epoch() + confidence)
.unwrap_or_default()
{
return Ok((candidate_tipset, candidate_receipt));
}
let poll_receiver = receiver.try_recv();
if let Ok(Some(_)) = poll_receiver {
block_revert
.write()
.await
.insert(tipset.key().to_owned(), true);
}
let maybe_receipt =
sm_cloned.tipset_executed_message(&tipset, &message, true)?;
if let Some(receipt) = maybe_receipt {
if confidence == 0 {
return Ok((Some(tipset), Some(receipt)));
}
candidate_tipset = Some(tipset);
candidate_receipt = Some(receipt)
}
}
}
Err(RecvError::Lagged(i)) => {
warn!(
"wait for message head change subscriber lagged, skipped {} events",
i
);
}
Err(RecvError::Closed) => break,
}
}
Ok((None, None))
})
.fuse();
let mut search_back_poll = tokio::task::spawn(async move {
let back_tuple = task.await.map_err(|e| {
Error::Other(format!("Could not search backwards for message {e}"))
})??;
if let Some((back_tipset, back_receipt)) = back_tuple {
let should_revert = *reverts
.read()
.await
.get(back_tipset.key())
.unwrap_or(&false);
let larger_height_of_head = height_of_head >= back_tipset.epoch() + confidence;
if !should_revert && larger_height_of_head {
return Ok::<_, Error>((Some(back_tipset), Some(back_receipt)));
}
return Ok((None, None));
}
Ok((None, None))
})
.fuse();
loop {
select! {
res = subscriber_poll => {
return res?
}
res = search_back_poll => {
if let Ok((Some(ts), Some(rct))) = res? {
return Ok((Some(ts), Some(rct)));
}
}
}
}
}
pub async fn search_for_message(
&self,
from: Option<Tipset>,
msg_cid: Cid,
look_back_limit: Option<i64>,
allow_replaced: Option<bool>,
) -> Result<Option<(Tipset, Receipt)>, Error> {
let from = from.unwrap_or_else(|| self.heaviest_tipset());
let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
.map_err(|err| Error::Other(format!("failed to load message {err}")))?;
let current_tipset = self.heaviest_tipset();
let maybe_message_receipt =
self.tipset_executed_message(&from, &message, allow_replaced.unwrap_or(true))?;
if let Some(r) = maybe_message_receipt {
Ok(Some((from, r)))
} else {
self.search_back_for_message(current_tipset, &message, look_back_limit, allow_replaced)
}
}
pub fn get_bls_public_key(
db: &Arc<DB>,
addr: &Address,
state_cid: Cid,
) -> Result<BlsPublicKey, Error> {
let state = StateTree::new_from_root(Arc::clone(db), &state_cid)
.map_err(|e| Error::Other(e.to_string()))?;
let kaddr =
resolve_to_key_addr(&state, db, addr).context("Failed to resolve key address")?;
match kaddr.into_payload() {
Payload::BLS(key) => BlsPublicKey::from_bytes(&key)
.context("Failed to construct bls public key")
.map_err(Error::from),
_ => Err(Error::state(
"Address must be BLS address to load bls public key",
)),
}
}
pub fn lookup_id(&self, addr: &Address, ts: &Tipset) -> Result<Option<Address>, Error> {
let state_tree = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
.map_err(|e| format!("{e:?}"))?;
Ok(state_tree
.lookup_id(addr)
.map_err(|e| Error::Other(e.to_string()))?
.map(Address::new_id))
}
pub fn lookup_required_id(&self, addr: &Address, ts: &Tipset) -> Result<Address, Error> {
self.lookup_id(addr, ts)?
.ok_or_else(|| Error::Other(format!("Failed to lookup the id address {addr}")))
}
pub fn market_state(&self, ts: &Tipset) -> Result<market::State, Error> {
let actor = self.get_required_actor(&Address::MARKET_ACTOR, *ts.parent_state())?;
let market_state = market::State::load(self.blockstore(), actor.code, actor.state)?;
Ok(market_state)
}
pub fn market_balance(&self, addr: &Address, ts: &Tipset) -> Result<MarketBalance, Error> {
let market_state = self.market_state(ts)?;
let new_addr = self.lookup_required_id(addr, ts)?;
let out = MarketBalance {
escrow: {
market_state
.escrow_table(self.blockstore())?
.get(&new_addr)?
},
locked: {
market_state
.locked_table(self.blockstore())?
.get(&new_addr)?
},
};
Ok(out)
}
pub fn miner_info(&self, addr: &Address, ts: &Tipset) -> Result<MinerInfo, Error> {
let actor = self
.get_actor(addr, *ts.parent_state())?
.ok_or_else(|| Error::state("Miner actor not found"))?;
let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
Ok(state.info(self.blockstore())?)
}
pub fn miner_faults(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
self.all_partition_sectors(addr, ts, |partition| partition.faulty_sectors().clone())
}
pub fn miner_recoveries(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
self.all_partition_sectors(addr, ts, |partition| partition.recovering_sectors().clone())
}
fn all_partition_sectors(
&self,
addr: &Address,
ts: &Tipset,
get_sector: impl Fn(Partition<'_>) -> BitField,
) -> Result<BitField, Error> {
let actor = self
.get_actor(addr, *ts.parent_state())?
.ok_or_else(|| Error::state("Miner actor not found"))?;
let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
let mut partitions = Vec::new();
state.for_each_deadline(
&self.chain_config().policy,
self.blockstore(),
|_, deadline| {
deadline.for_each(self.blockstore(), |_, partition| {
partitions.push(get_sector(partition));
Ok(())
})
},
)?;
Ok(BitField::union(partitions.iter()))
}
pub fn miner_power(&self, addr: &Address, ts: &Tipset) -> Result<MinerPower, Error> {
if let Some((miner_power, total_power)) = self.get_power(ts.parent_state(), Some(addr))? {
return Ok(MinerPower {
miner_power,
total_power,
has_min_power: true,
});
}
Ok(MinerPower {
has_min_power: false,
miner_power: Default::default(),
total_power: Default::default(),
})
}
pub async fn resolve_to_key_addr(
self: &Arc<Self>,
addr: &Address,
ts: &Tipset,
) -> anyhow::Result<Address> {
match addr.protocol() {
Protocol::BLS | Protocol::Secp256k1 | Protocol::Delegated => return Ok(*addr),
Protocol::Actor => {
return Err(Error::Other(
"cannot resolve actor address to key address".to_string(),
)
.into());
}
_ => {}
};
let state = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())?;
if let Ok(addr) = resolve_to_key_addr(&state, self.blockstore(), addr) {
return Ok(addr);
}
let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?;
let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
resolve_to_key_addr(&state, self.blockstore(), addr)
}
pub async fn miner_get_base_info(
self: &Arc<Self>,
beacon_schedule: &BeaconSchedule,
tipset: Tipset,
addr: Address,
epoch: ChainEpoch,
) -> anyhow::Result<Option<MiningBaseInfo>> {
let prev_beacon = self
.chain_store()
.chain_index()
.latest_beacon_entry(tipset.clone())?;
let entries: Vec<BeaconEntry> = beacon_schedule
.beacon_entries_for_block(
self.chain_config().network_version(epoch),
epoch,
tipset.epoch(),
&prev_beacon,
)
.await?;
let base = entries.last().unwrap_or(&prev_beacon);
let (lb_tipset, lb_state_root) = ChainStore::get_lookback_tipset_for_round(
self.chain_index(),
self.chain_config(),
&tipset,
epoch,
)?;
let actor = self.get_required_actor(&addr, *tipset.parent_state())?;
if self.get_actor(&addr, lb_state_root)?.is_none() {
return Ok(None);
}
let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
let addr_buf = to_vec(&addr)?;
let rand = draw_randomness(
base.signature(),
DomainSeparationTag::WinningPoStChallengeSeed as i64,
epoch,
&addr_buf,
)?;
let network_version = self.chain_config().network_version(tipset.epoch());
let sectors = self.get_sectors_for_winning_post(
&lb_state_root,
network_version,
&addr,
Randomness::new(rand.to_vec()),
)?;
if sectors.is_empty() {
return Ok(None);
}
let (miner_power, total_power) = self
.get_power(&lb_state_root, Some(&addr))?
.context("failed to get power")?;
let info = miner_state.info(self.blockstore())?;
let worker_key = self
.resolve_to_deterministic_address(info.worker, &tipset)
.await?;
let eligible = self.eligible_to_mine(&addr, &tipset, &lb_tipset)?;
Ok(Some(MiningBaseInfo {
miner_power: miner_power.quality_adj_power,
network_power: total_power.quality_adj_power,
sectors,
worker_key,
sector_size: info.sector_size,
prev_beacon_entry: prev_beacon,
beacon_entries: entries,
eligible_for_mining: eligible,
}))
}
pub fn miner_has_min_power(
&self,
policy: &Policy,
addr: &Address,
ts: &Tipset,
) -> anyhow::Result<bool> {
let actor = self
.get_actor(&Address::POWER_ACTOR, *ts.parent_state())?
.ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
let ps = power::State::load(self.blockstore(), actor.code, actor.state)?;
ps.miner_nominal_power_meets_consensus_minimum(policy, self.blockstore(), addr)
}
#[tracing::instrument(skip(self))]
pub fn validate_range(&self, epochs: RangeInclusive<i64>) -> anyhow::Result<()> {
let heaviest = self.heaviest_tipset();
let heaviest_epoch = heaviest.epoch();
let end = self
.chain_index()
.tipset_by_height(*epochs.end(), heaviest, ResolveNullTipset::TakeOlder)
.with_context(|| {
format!(
"couldn't get a tipset at height {} behind heaviest tipset at height {heaviest_epoch}",
*epochs.end(),
)
})?;
let tipsets = end
.chain(self.blockstore())
.take_while(|ts| ts.epoch() >= *epochs.start());
self.validate_tipsets(tipsets)
}
pub fn validate_tipsets<T>(&self, tipsets: T) -> anyhow::Result<()>
where
T: Iterator<Item = Tipset> + Send,
{
let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
validate_tipsets(
genesis_timestamp,
self.chain_index(),
self.chain_config(),
self.beacon_schedule(),
&self.engine,
tipsets,
)
}
pub fn get_verified_registry_actor_state(
&self,
ts: &Tipset,
) -> anyhow::Result<verifreg::State> {
let act = self
.get_actor(&Address::VERIFIED_REGISTRY_ACTOR, *ts.parent_state())
.map_err(Error::state)?
.ok_or_else(|| Error::state("actor not found"))?;
verifreg::State::load(self.blockstore(), act.code, act.state)
}
pub fn get_claim(
&self,
addr: &Address,
ts: &Tipset,
claim_id: ClaimID,
) -> anyhow::Result<Option<Claim>> {
let id_address = self.lookup_required_id(addr, ts)?;
let state = self.get_verified_registry_actor_state(ts)?;
state.get_claim(self.blockstore(), id_address, claim_id)
}
pub fn get_all_claims(&self, ts: &Tipset) -> anyhow::Result<HashMap<ClaimID, Claim>> {
let state = self.get_verified_registry_actor_state(ts)?;
state.get_all_claims(self.blockstore())
}
pub fn get_allocation(
&self,
addr: &Address,
ts: &Tipset,
allocation_id: AllocationID,
) -> anyhow::Result<Option<Allocation>> {
let id_address = self.lookup_required_id(addr, ts)?;
let state = self.get_verified_registry_actor_state(ts)?;
state.get_allocation(self.blockstore(), id_address.id()?, allocation_id)
}
pub fn get_all_allocations(
&self,
ts: &Tipset,
) -> anyhow::Result<HashMap<AllocationID, Allocation>> {
let state = self.get_verified_registry_actor_state(ts)?;
state.get_all_allocations(self.blockstore())
}
pub fn verified_client_status(
&self,
addr: &Address,
ts: &Tipset,
) -> anyhow::Result<Option<DataCap>> {
let id = self.lookup_required_id(addr, ts)?;
let network_version = self.get_network_version(ts.epoch());
if (u32::from(network_version.0)) < 17 {
let state = self.get_verified_registry_actor_state(ts)?;
return state.verified_client_data_cap(self.blockstore(), id);
}
let act = self
.get_actor(&Address::DATACAP_TOKEN_ACTOR, *ts.parent_state())
.map_err(Error::state)?
.ok_or_else(|| Error::state("Miner actor not found"))?;
let state = datacap::State::load(self.blockstore(), act.code, act.state)?;
state.verified_client_data_cap(self.blockstore(), id)
}
pub async fn resolve_to_deterministic_address(
self: &Arc<Self>,
address: Address,
ts: &Tipset,
) -> anyhow::Result<Address> {
use crate::shim::address::Protocol::*;
match address.protocol() {
BLS | Secp256k1 | Delegated => Ok(address),
Actor => anyhow::bail!("cannot resolve actor address to key address"),
ID => {
let id = address.id()?;
if let Some(cached) = self.id_to_deterministic_address_cache.get_cloned(&id) {
return Ok(cached);
}
let resolved = if let Ok(state) =
StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
&& let Ok(address) = state
.resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
{
address
} else {
let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?;
let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
state.resolve_to_deterministic_addr(self.chain_store().blockstore(), address)?
};
self.id_to_deterministic_address_cache.push(id, resolved);
Ok(resolved)
}
}
}
pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec<ApiInvocResult>)> {
let mut invoc_trace = vec![];
let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
let callback = |ctx: MessageCallbackCtx<'_>| {
match ctx.at {
CalledAt::Applied | CalledAt::Reward => {
invoc_trace.push(ApiInvocResult {
msg_cid: ctx.message.cid(),
msg: ctx.message.message().clone(),
msg_rct: Some(ctx.apply_ret.msg_receipt()),
error: ctx.apply_ret.failure_info().unwrap_or_default(),
duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
.unwrap_or_default(),
});
Ok(())
}
_ => Ok(()), }
};
let ExecutedTipset { state_root, .. } = apply_block_messages(
genesis_timestamp,
self.chain_index().shallow_clone(),
self.chain_config().shallow_clone(),
self.beacon_schedule().shallow_clone(),
&self.engine,
tipset.shallow_clone(),
Some(callback),
VMTrace::Traced,
)?;
Ok((state_root, invoc_trace))
}
}
pub fn validate_tipsets<DB, T>(
genesis_timestamp: u64,
chain_index: &ChainIndex<DB>,
chain_config: &Arc<ChainConfig>,
beacon: &Arc<BeaconSchedule>,
engine: &MultiEngine,
tipsets: T,
) -> anyhow::Result<()>
where
DB: Blockstore + Send + Sync + 'static,
T: Iterator<Item = Tipset> + Send,
{
for (child, parent) in tipsets.tuple_windows() {
info!(height = parent.epoch(), "compute parent state");
let ExecutedTipset {
state_root: actual_state,
receipt_root: actual_receipt,
..
} = apply_block_messages(
genesis_timestamp,
chain_index.shallow_clone(),
chain_config.shallow_clone(),
beacon.shallow_clone(),
engine,
parent,
NO_CALLBACK,
VMTrace::NotTraced,
)
.context("couldn't compute tipset state")?;
let expected_receipt = child.min_ticket_block().message_receipts;
let expected_state = child.parent_state();
if (expected_state, expected_receipt) != (&actual_state, actual_receipt) {
error!(
height = child.epoch(),
?expected_state,
?expected_receipt,
?actual_state,
?actual_receipt,
"state mismatch"
);
bail!("state mismatch");
}
}
Ok(())
}
struct TipsetExecutor<'a, DB: Blockstore + Send + Sync + 'static> {
tipset: Tipset,
rand: ChainRand<DB>,
chain_config: Arc<ChainConfig>,
chain_index: ChainIndex<DB>,
genesis_info: GenesisInfo,
engine: &'a MultiEngine,
}
impl<'a, DB: Blockstore + Send + Sync + 'static> TipsetExecutor<'a, DB> {
fn new(
chain_index: ChainIndex<DB>,
chain_config: Arc<ChainConfig>,
beacon: Arc<BeaconSchedule>,
engine: &'a MultiEngine,
tipset: Tipset,
) -> Self {
let rand = ChainRand::new(
chain_config.shallow_clone(),
tipset.shallow_clone(),
chain_index.shallow_clone(),
beacon,
);
let genesis_info = GenesisInfo::from_chain_config(chain_config.shallow_clone());
Self {
tipset,
rand,
chain_config,
chain_index,
genesis_info,
engine,
}
}
fn create_vm(
&self,
state_root: Cid,
epoch: ChainEpoch,
timestamp: u64,
trace: VMTrace,
) -> anyhow::Result<VM<DB>> {
let circ_supply = self.genesis_info.get_vm_circulating_supply(
epoch,
self.chain_index.db(),
&state_root,
)?;
VM::new(
ExecutionContext {
heaviest_tipset: self.tipset.shallow_clone(),
state_tree_root: state_root,
epoch,
rand: Box::new(self.rand.shallow_clone()),
base_fee: self.tipset.min_ticket_block().parent_base_fee.clone(),
circ_supply,
chain_config: self.chain_config.shallow_clone(),
chain_index: self.chain_index.shallow_clone(),
timestamp,
},
self.engine,
trace,
)
}
fn prepare_parent_state<F>(
&self,
genesis_timestamp: u64,
null_epoch_trace: VMTrace,
cron_callback: &mut Option<F>,
) -> anyhow::Result<(Cid, ChainEpoch, Vec<BlockMessages>)>
where
F: FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>,
{
use crate::shim::clock::EPOCH_DURATION_SECONDS;
let mut parent_state = *self.tipset.parent_state();
let parent_epoch = self
.chain_index
.load_required_tipset(self.tipset.parents())?
.epoch();
let epoch = self.tipset.epoch();
for epoch_i in parent_epoch..epoch {
if epoch_i > parent_epoch {
let timestamp = genesis_timestamp + ((EPOCH_DURATION_SECONDS * epoch_i) as u64);
parent_state = stacker::grow(64 << 20, || -> anyhow::Result<Cid> {
let mut vm =
self.create_vm(parent_state, epoch_i, timestamp, null_epoch_trace)?;
if let Err(e) = vm.run_cron(epoch_i, cron_callback.as_mut()) {
error!("Beginning of epoch cron failed to run: {e:#}");
return Err(e);
}
vm.flush()
})?;
}
if let Some(new_state) = run_state_migrations(
epoch_i,
&self.chain_config,
self.chain_index.db(),
&parent_state,
)? {
parent_state = new_state;
}
}
let block_messages = BlockMessages::for_tipset(self.chain_index.db(), &self.tipset)?;
Ok((parent_state, epoch, block_messages))
}
}
#[allow(clippy::too_many_arguments)]
pub fn apply_block_messages<DB>(
genesis_timestamp: u64,
chain_index: ChainIndex<DB>,
chain_config: Arc<ChainConfig>,
beacon: Arc<BeaconSchedule>,
engine: &MultiEngine,
tipset: Tipset,
mut callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
enable_tracing: VMTrace,
) -> anyhow::Result<ExecutedTipset>
where
DB: Blockstore + Send + Sync + 'static,
{
if tipset.epoch() == 0 {
let message_receipts = tipset.min_ticket_block().message_receipts;
return Ok(ExecutedTipset {
state_root: *tipset.parent_state(),
receipt_root: message_receipts,
executed_messages: vec![].into(),
});
}
let exec = TipsetExecutor::new(
chain_index.shallow_clone(),
chain_config,
beacon,
engine,
tipset.shallow_clone(),
);
let (parent_state, epoch, block_messages) =
exec.prepare_parent_state(genesis_timestamp, enable_tracing, &mut callback)?;
stacker::grow(64 << 20, || -> anyhow::Result<ExecutedTipset> {
let mut vm = exec.create_vm(parent_state, epoch, tipset.min_timestamp(), enable_tracing)?;
let (receipts, events, events_roots) =
vm.apply_block_messages(&block_messages, epoch, callback)?;
let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts.iter())?;
for (events, events_root) in events.iter().zip(events_roots.iter()) {
if let Some(events) = events {
let event_root =
events_root.context("events root should be present when events present")?;
let derived_event_root = Amt::new_from_iter_with_bit_width(
chain_index.db(),
EVENTS_AMT_BITWIDTH,
events.iter(),
)
.map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;
ensure!(
derived_event_root == event_root,
"Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
);
}
}
let state_root = vm.flush()?;
let messages: Vec<ChainMessage> = block_messages
.into_iter()
.flat_map(|bm| bm.messages)
.collect_vec();
anyhow::ensure!(
messages.len() == receipts.len() && messages.len() == events.len(),
"length of messages, receipts, and events should match",
);
Ok(ExecutedTipset {
state_root,
receipt_root,
executed_messages: messages
.into_iter()
.zip(receipts)
.zip(events)
.map(|((message, receipt), events)| ExecutedMessage {
message,
receipt,
events,
})
.collect_vec()
.into(),
})
})
}
#[allow(clippy::too_many_arguments)]
pub fn compute_state<DB>(
_height: ChainEpoch,
messages: Vec<Message>,
tipset: Tipset,
genesis_timestamp: u64,
chain_index: ChainIndex<DB>,
chain_config: Arc<ChainConfig>,
beacon: Arc<BeaconSchedule>,
engine: &MultiEngine,
callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
enable_tracing: VMTrace,
) -> anyhow::Result<ExecutedTipset>
where
DB: Blockstore + Send + Sync + 'static,
{
if !messages.is_empty() {
anyhow::bail!("Applying messages is not yet implemented.");
}
let output = apply_block_messages(
genesis_timestamp,
chain_index,
chain_config,
beacon,
engine,
tipset,
callback,
enable_tracing,
)?;
Ok(output)
}
#[derive(Debug, Copy, Clone, Default)]
pub enum VMFlush {
Flush,
#[default]
Skip,
}