1#[cfg(test)]
5mod tests;
6
7mod cache;
8pub mod chain_rand;
9pub mod circulating_supply;
10mod errors;
11pub mod utils;
12
13pub use self::errors::*;
14use self::utils::structured;
15
16use crate::beacon::{BeaconEntry, BeaconSchedule};
17use crate::blocks::{Tipset, TipsetKey};
18use crate::chain::{
19 ChainStore,
20 index::{ChainIndex, ResolveNullTipset},
21};
22use crate::interpreter::{
23 BlockMessages, CalledAt, ExecutionContext, IMPLICIT_MESSAGE_GAS_LIMIT, VM, resolve_to_key_addr,
24};
25use crate::interpreter::{MessageCallbackCtx, VMTrace};
26use crate::lotus_json::{LotusJson, lotus_json_with_self};
27use crate::message::{ChainMessage, MessageRead as _, MessageReadWrite as _, SignedMessage};
28use crate::networks::ChainConfig;
29use crate::rpc::state::{ApiInvocResult, InvocResult, MessageGasCost};
30use crate::rpc::types::{MiningBaseInfo, SectorOnChainInfo};
31use crate::shim::actors::init::{self, State};
32use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition};
33use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim};
34use crate::shim::actors::*;
35use crate::shim::address::AddressId;
36use crate::shim::crypto::{Signature, SignatureType};
37use crate::shim::{
38 actors::{
39 LoadActorStateFromBlockstore, miner::ext::MinerStateExt as _,
40 verifreg::ext::VerifiedRegistryStateExt as _,
41 },
42 executor::{ApplyRet, Receipt, StampedEvent},
43};
44use crate::shim::{
45 address::{Address, Payload, Protocol},
46 clock::ChainEpoch,
47 econ::TokenAmount,
48 machine::{GLOBAL_MULTI_ENGINE, MultiEngine},
49 message::Message,
50 randomness::Randomness,
51 runtime::Policy,
52 state_tree::{ActorState, StateTree},
53 version::NetworkVersion,
54};
55use crate::state_manager::cache::TipsetStateCache;
56use crate::state_manager::chain_rand::draw_randomness;
57use crate::state_migration::run_state_migrations;
58use crate::utils::ShallowClone as _;
59use crate::utils::cache::SizeTrackingLruCache;
60use crate::utils::get_size::{GetSize, vec_heap_size_helper};
61use ahash::{HashMap, HashMapExt};
62use anyhow::{Context as _, bail, ensure};
63use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _};
64use chain_rand::ChainRand;
65use cid::Cid;
66pub use circulating_supply::GenesisInfo;
67use fil_actor_verifreg_state::v12::DataCap;
68use fil_actor_verifreg_state::v13::ClaimID;
69use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0};
70use fil_actors_shared::fvm_ipld_bitfield::BitField;
71use fil_actors_shared::v12::runtime::DomainSeparationTag;
72use futures::{FutureExt, channel::oneshot, select};
73use fvm_ipld_blockstore::Blockstore;
74use fvm_ipld_encoding::to_vec;
75use fvm_shared4::crypto::signature::SECP_SIG_LEN;
76use itertools::Itertools as _;
77use nonzero_ext::nonzero;
78use num::BigInt;
79use num_traits::identities::Zero;
80use schemars::JsonSchema;
81use serde::{Deserialize, Serialize};
82use std::ops::RangeInclusive;
83use std::time::Duration;
84use std::{num::NonZeroUsize, sync::Arc};
85use tokio::sync::{RwLock, broadcast::error::RecvError};
86use tracing::{error, info, instrument, warn};
87
88const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
89const DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
90pub const EVENTS_AMT_BITWIDTH: u32 = 5;
91pub type IdToAddressCache = SizeTrackingLruCache<AddressId, Address>;
92
93#[derive(Debug, Clone)]
98pub struct ExecutedMessage {
99 pub message: ChainMessage,
100 pub receipt: Receipt,
101 pub events: Option<Vec<StampedEvent>>,
102}
103
104impl GetSize for ExecutedMessage {
105 fn get_heap_size(&self) -> usize {
106 self.message.get_heap_size()
107 + self.receipt.get_heap_size()
108 + self
109 .events
110 .as_ref()
111 .map(vec_heap_size_helper)
112 .unwrap_or_default()
113 }
114}
115
116#[derive(Debug, Clone, GetSize)]
118pub struct ExecutedTipset {
119 #[get_size(ignore)]
121 pub state_root: Cid,
122 #[get_size(ignore)]
124 pub receipt_root: Cid,
125 pub executed_messages: Arc<Vec<ExecutedMessage>>,
128}
129
130#[derive(Debug, Clone, GetSize)]
132pub struct TipsetState {
133 #[get_size(ignore)]
135 pub state_root: Cid,
136 #[allow(dead_code)]
138 #[get_size(ignore)]
139 pub receipt_root: Cid,
140}
141
142impl From<ExecutedTipset> for TipsetState {
143 fn from(
144 ExecutedTipset {
145 state_root,
146 receipt_root,
147 ..
148 }: ExecutedTipset,
149 ) -> Self {
150 Self {
151 state_root,
152 receipt_root,
153 }
154 }
155}
156
157impl From<&ExecutedTipset> for TipsetState {
158 fn from(
159 ExecutedTipset {
160 state_root,
161 receipt_root,
162 ..
163 }: &ExecutedTipset,
164 ) -> Self {
165 Self {
166 state_root: *state_root,
167 receipt_root: *receipt_root,
168 }
169 }
170}
171
172#[derive(
174 Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema,
175)]
176#[serde(rename_all = "PascalCase")]
177pub struct MarketBalance {
178 #[schemars(with = "LotusJson<TokenAmount>")]
179 #[serde(with = "crate::lotus_json")]
180 pub escrow: TokenAmount,
181 #[schemars(with = "LotusJson<TokenAmount>")]
182 #[serde(with = "crate::lotus_json")]
183 pub locked: TokenAmount,
184}
185lotus_json_with_self!(MarketBalance);
186
187pub struct StateManager<DB> {
193 cs: Arc<ChainStore<DB>>,
195 cache: TipsetStateCache<ExecutedTipset>,
197 id_to_deterministic_address_cache: IdToAddressCache,
198 beacon: Arc<crate::beacon::BeaconSchedule>,
199 engine: Arc<MultiEngine>,
200}
201
202#[allow(clippy::type_complexity)]
203pub const NO_CALLBACK: Option<fn(MessageCallbackCtx<'_>) -> anyhow::Result<()>> = None;
204
205impl<DB> StateManager<DB>
206where
207 DB: Blockstore,
208{
209 pub fn new(cs: Arc<ChainStore<DB>>) -> anyhow::Result<Self> {
210 Self::new_with_engine(cs, GLOBAL_MULTI_ENGINE.clone())
211 }
212
213 pub fn new_with_engine(
214 cs: Arc<ChainStore<DB>>,
215 engine: Arc<MultiEngine>,
216 ) -> anyhow::Result<Self> {
217 let genesis = cs.genesis_block_header();
218 let beacon = Arc::new(cs.chain_config().get_beacon_schedule(genesis.timestamp));
219
220 Ok(Self {
221 cs,
222 cache: TipsetStateCache::new("executed_tipset"), beacon,
224 engine,
225 id_to_deterministic_address_cache: SizeTrackingLruCache::new_with_metrics(
226 "id_to_deterministic_address".into(),
227 DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE,
228 ),
229 })
230 }
231
232 pub fn heaviest_tipset(&self) -> Tipset {
234 self.chain_store().heaviest_tipset()
235 }
236
237 pub fn maybe_rewind_heaviest_tipset(&self) -> anyhow::Result<()> {
242 while self.maybe_rewind_heaviest_tipset_once()? {}
243 Ok(())
244 }
245
246 fn maybe_rewind_heaviest_tipset_once(&self) -> anyhow::Result<bool> {
247 let head = self.heaviest_tipset();
248 if let Some(info) = self
249 .chain_config()
250 .network_height_with_actor_bundle(head.epoch())
251 {
252 let expected_height_info = info.info;
253 let expected_bundle = info.manifest(self.blockstore())?;
254 let expected_bundle_metadata = expected_bundle.metadata()?;
255 let state = self.get_state_tree(head.parent_state())?;
256 let bundle_metadata = state.get_actor_bundle_metadata()?;
257 if expected_bundle_metadata != bundle_metadata {
258 let current_epoch = head.epoch();
259 let target_head = self.chain_index().tipset_by_height(
260 (expected_height_info.epoch - 1).max(0),
261 head,
262 ResolveNullTipset::TakeOlder,
263 )?;
264 let target_epoch = target_head.epoch();
265 let bundle_version = &bundle_metadata.version;
266 let expected_bundle_version = &expected_bundle_metadata.version;
267 if target_epoch < current_epoch {
268 tracing::warn!(
269 "rewinding chain head from {current_epoch} to {target_epoch}, actor bundle: {bundle_version}, expected: {expected_bundle_version}"
270 );
271 if self.blockstore().has(target_head.parent_state())? {
272 self.chain_store().set_heaviest_tipset(target_head)?;
273 return Ok(true);
274 } else {
275 anyhow::bail!(
276 "failed to rewind, state tree @ {target_epoch} is missing from blockstore: {}",
277 target_head.parent_state()
278 );
279 }
280 }
281 }
282 }
283 Ok(false)
284 }
285
286 pub fn beacon_schedule(&self) -> &Arc<BeaconSchedule> {
287 &self.beacon
288 }
289
290 pub fn get_network_version(&self, epoch: ChainEpoch) -> NetworkVersion {
292 self.chain_config().network_version(epoch)
293 }
294
295 pub fn get_state_tree(&self, state_cid: &Cid) -> anyhow::Result<StateTree<DB>> {
297 StateTree::new_from_root(self.blockstore_owned(), state_cid)
298 }
299
300 pub fn get_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<Option<ActorState>> {
302 let state = self.get_state_tree(&state_cid)?;
303 state.get_actor(addr)
304 }
305
306 pub fn get_actor_state<S: LoadActorStateFromBlockstore>(
308 &self,
309 ts: &Tipset,
310 ) -> anyhow::Result<S> {
311 let state_tree = self.get_state_tree(ts.parent_state())?;
312 state_tree.get_actor_state()
313 }
314
315 pub fn get_actor_state_from_address<S: LoadActorStateFromBlockstore>(
317 &self,
318 ts: &Tipset,
319 actor_address: &Address,
320 ) -> anyhow::Result<S> {
321 let state_tree = self.get_state_tree(ts.parent_state())?;
322 state_tree.get_actor_state_from_address(actor_address)
323 }
324
325 pub fn get_required_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<ActorState> {
327 let state = self.get_state_tree(&state_cid)?;
328 state.get_actor(addr)?.with_context(|| {
329 format!("Failed to load actor with addr={addr}, state_cid={state_cid}")
330 })
331 }
332
333 pub fn blockstore(&self) -> &Arc<DB> {
335 self.cs.blockstore()
336 }
337
338 pub fn blockstore_owned(&self) -> Arc<DB> {
339 self.blockstore().clone()
340 }
341
342 pub fn chain_store(&self) -> &Arc<ChainStore<DB>> {
344 &self.cs
345 }
346
347 pub fn chain_index(&self) -> &ChainIndex<DB> {
349 self.cs.chain_index()
350 }
351
352 pub fn chain_config(&self) -> &Arc<ChainConfig> {
354 self.cs.chain_config()
355 }
356
357 pub fn chain_rand(&self, tipset: Tipset) -> ChainRand<DB> {
358 ChainRand::new(
359 self.chain_config().shallow_clone(),
360 tipset,
361 self.chain_index().shallow_clone(),
362 self.beacon.shallow_clone(),
363 )
364 }
365
366 pub fn get_network_state_name(
368 &self,
369 state_cid: Cid,
370 ) -> anyhow::Result<crate::networks::StateNetworkName> {
371 let init_act = self
372 .get_actor(&init::ADDRESS.into(), state_cid)?
373 .ok_or_else(|| Error::state("Init actor address could not be resolved"))?;
374 Ok(
375 State::load(self.blockstore(), init_act.code, init_act.state)?
376 .into_network_name()
377 .into(),
378 )
379 }
380
381 pub fn is_miner_slashed(&self, addr: &Address, state_cid: &Cid) -> anyhow::Result<bool, Error> {
383 let actor = self
384 .get_actor(&Address::POWER_ACTOR, *state_cid)?
385 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
386
387 let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
388
389 Ok(spas.miner_power(self.blockstore(), addr)?.is_none())
390 }
391
392 pub fn get_miner_work_addr(&self, state_cid: Cid, addr: &Address) -> Result<Address, Error> {
394 let state =
395 StateTree::new_from_root(self.blockstore_owned(), &state_cid).map_err(Error::other)?;
396 let ms: miner::State = state.get_actor_state_from_address(addr)?;
397 let info = ms.info(self.blockstore()).map_err(|e| e.to_string())?;
398 let addr = resolve_to_key_addr(&state, self.blockstore(), &info.worker())?;
399 Ok(addr)
400 }
401
402 pub fn get_power(
405 &self,
406 state_cid: &Cid,
407 addr: Option<&Address>,
408 ) -> anyhow::Result<Option<(power::Claim, power::Claim)>, Error> {
409 let actor = self
410 .get_actor(&Address::POWER_ACTOR, *state_cid)?
411 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
412
413 let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
414
415 let t_pow = spas.total_power();
416
417 if let Some(maddr) = addr {
418 let m_pow = spas
419 .miner_power(self.blockstore(), maddr)?
420 .ok_or_else(|| Error::state(format!("Miner for address {maddr} not found")))?;
421
422 let min_pow = spas.miner_nominal_power_meets_consensus_minimum(
423 &self.chain_config().policy,
424 self.blockstore(),
425 maddr,
426 )?;
427 if min_pow {
428 return Ok(Some((m_pow, t_pow)));
429 }
430 }
431
432 Ok(None)
433 }
434
435 pub fn get_all_sectors(
437 &self,
438 addr: &Address,
439 ts: &Tipset,
440 ) -> anyhow::Result<Vec<SectorOnChainInfo>> {
441 let actor = self
442 .get_actor(addr, *ts.parent_state())?
443 .ok_or_else(|| Error::state("Miner actor not found"))?;
444 let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
445 state.load_sectors_ext(self.blockstore(), None)
446 }
447}
448
449impl<DB> StateManager<DB>
450where
451 DB: Blockstore + Send + Sync + 'static,
452{
453 pub async fn load_tipset_state(self: &Arc<Self>, ts: &Tipset) -> anyhow::Result<TipsetState> {
455 if let Some(state) = self.cache.get_map(ts.key(), |et| et.into()) {
456 Ok(state)
457 } else if let Ok(receipt_ts) = self.chain_store().load_child_tipset(ts) {
458 Ok(TipsetState {
459 state_root: *receipt_ts.parent_state(),
460 receipt_root: *receipt_ts.parent_message_receipts(),
461 })
462 } else {
463 Ok(self.load_executed_tipset(ts).await?.into())
464 }
465 }
466
467 pub async fn load_executed_tipset(
469 self: &Arc<Self>,
470 ts: &Tipset,
471 ) -> anyhow::Result<ExecutedTipset> {
472 if ts.epoch() >= self.heaviest_tipset().epoch()
474 && let Some(cached) = self.cache.get(ts.key())
475 {
476 if StateTree::new_from_root(self.blockstore_owned(), &cached.state_root).is_ok() {
477 return Ok(cached);
478 } else {
479 self.cache.remove(ts.key());
480 }
481 }
482 self.cache
483 .get_or_else(ts.key(), || async move {
484 let receipt_ts = self.chain_store().load_child_tipset(ts).ok();
485 self.load_executed_tipset_inner(ts, receipt_ts.as_ref())
486 .await
487 })
488 .await
489 }
490
491 async fn load_executed_tipset_inner(
492 self: &Arc<Self>,
493 msg_ts: &Tipset,
494 receipt_ts: Option<&Tipset>,
496 ) -> anyhow::Result<ExecutedTipset> {
497 if let Some(receipt_ts) = receipt_ts {
498 anyhow::ensure!(
499 msg_ts.key() == receipt_ts.parents(),
500 "message tipset should be the parent of message receipt tipset"
501 );
502 }
503 let mut recomputed = false;
504 let (state_root, receipt_root, receipts) = match receipt_ts.and_then(|ts| {
505 let receipt_root = *ts.parent_message_receipts();
506 Receipt::get_receipts(self.cs.blockstore(), receipt_root)
507 .ok()
508 .map(|r| (*ts.parent_state(), receipt_root, r))
509 }) {
510 Some((state_root, receipt_root, receipts)) => (state_root, receipt_root, receipts),
511 None => {
512 let state_output = self
513 .compute_tipset_state(msg_ts.shallow_clone(), NO_CALLBACK, VMTrace::NotTraced)
514 .await?;
515 recomputed = true;
516 (
517 state_output.state_root,
518 state_output.receipt_root,
519 Receipt::get_receipts(self.cs.blockstore(), state_output.receipt_root)?,
520 )
521 }
522 };
523
524 let messages = self.chain_store().messages_for_tipset(msg_ts)?;
525 anyhow::ensure!(
526 messages.len() == receipts.len(),
527 "mismatching message and receipt counts ({} messages, {} receipts)",
528 messages.len(),
529 receipts.len()
530 );
531 let mut executed_messages = Vec::with_capacity(messages.len());
532 for (message, receipt) in messages.iter().cloned().zip(receipts) {
533 let events = if let Some(events_root) = receipt.events_root() {
534 Some(
535 match StampedEvent::get_events(self.cs.blockstore(), &events_root) {
536 Ok(events) => events,
537 Err(e) if recomputed => return Err(e),
538 Err(_) => {
539 self.compute_tipset_state(
540 msg_ts.shallow_clone(),
541 NO_CALLBACK,
542 VMTrace::NotTraced,
543 )
544 .await?;
545 recomputed = true;
546 StampedEvent::get_events(self.cs.blockstore(), &events_root)?
547 }
548 },
549 )
550 } else {
551 None
552 };
553 executed_messages.push(ExecutedMessage {
554 message,
555 receipt,
556 events,
557 });
558 }
559 Ok(ExecutedTipset {
560 state_root,
561 receipt_root,
562 executed_messages: Arc::new(executed_messages),
563 })
564 }
565
566 #[instrument(skip(self, rand))]
567 fn call_raw(
568 &self,
569 state_cid: Option<Cid>,
570 msg: &Message,
571 rand: ChainRand<DB>,
572 tipset: &Tipset,
573 ) -> Result<ApiInvocResult, Error> {
574 let mut msg = msg.clone();
575
576 let state_cid = state_cid.unwrap_or(*tipset.parent_state());
577
578 let tipset_messages = self
579 .chain_store()
580 .messages_for_tipset(tipset)
581 .map_err(|err| Error::Other(err.to_string()))?;
582
583 let prior_messsages = tipset_messages
584 .iter()
585 .filter(|ts_msg| ts_msg.message().from() == msg.from());
586
587 let height = tipset.epoch();
590 let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
591 let mut vm = VM::new(
592 ExecutionContext {
593 heaviest_tipset: tipset.shallow_clone(),
594 state_tree_root: state_cid,
595 epoch: height,
596 rand: Box::new(rand),
597 base_fee: tipset.block_headers().first().parent_base_fee.clone(),
598 circ_supply: genesis_info.get_vm_circulating_supply(
599 height,
600 self.blockstore(),
601 &state_cid,
602 )?,
603 chain_config: self.chain_config().shallow_clone(),
604 chain_index: self.chain_index().shallow_clone(),
605 timestamp: tipset.min_timestamp(),
606 },
607 &self.engine,
608 VMTrace::Traced,
609 )?;
610
611 for m in prior_messsages {
612 vm.apply_message(m)?;
613 }
614
615 let state_cid = vm.flush()?;
618
619 let state = StateTree::new_from_root(self.blockstore_owned(), &state_cid)?;
620
621 let from_actor = state
622 .get_actor(&msg.from())?
623 .ok_or_else(|| anyhow::anyhow!("actor not found"))?;
624 msg.set_sequence(from_actor.sequence);
625
626 let mut msg = msg.clone();
628 msg.gas_limit = IMPLICIT_MESSAGE_GAS_LIMIT as u64;
629
630 let (apply_ret, duration) = vm.apply_implicit_message(&msg)?;
631
632 Ok(ApiInvocResult {
633 msg: msg.clone(),
634 msg_rct: Some(apply_ret.msg_receipt()),
635 msg_cid: msg.cid(),
636 error: apply_ret.failure_info().unwrap_or_default(),
637 duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
638 gas_cost: MessageGasCost::default(),
639 execution_trace: structured::parse_events(apply_ret.exec_trace()).unwrap_or_default(),
640 })
641 }
642
643 pub fn call(&self, message: &Message, tipset: Option<Tipset>) -> Result<ApiInvocResult, Error> {
646 let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
647 let chain_rand = self.chain_rand(ts.shallow_clone());
648 self.call_raw(None, message, chain_rand, &ts)
649 }
650
651 pub fn call_on_state(
654 &self,
655 state_cid: Cid,
656 message: &Message,
657 tipset: Option<Tipset>,
658 ) -> Result<ApiInvocResult, Error> {
659 let ts = tipset.unwrap_or_else(|| self.cs.heaviest_tipset());
660 let chain_rand = self.chain_rand(ts.shallow_clone());
661 self.call_raw(Some(state_cid), message, chain_rand, &ts)
662 }
663
664 pub async fn apply_on_state_with_gas(
665 self: &Arc<Self>,
666 tipset: Option<Tipset>,
667 msg: Message,
668 vm_flush: VMFlush,
669 ) -> anyhow::Result<(ApiInvocResult, Option<Cid>)> {
670 let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
671
672 let from_a = self.resolve_to_key_addr(&msg.from, &ts).await?;
673
674 let mut chain_msg = match from_a.protocol() {
678 Protocol::Secp256k1 => SignedMessage::new_unchecked(
679 msg.clone(),
680 Signature::new_secp256k1(vec![0; SECP_SIG_LEN]),
681 )
682 .into(),
683 Protocol::Delegated => SignedMessage::new_unchecked(
684 msg.clone(),
685 Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]),
688 )
689 .into(),
690 _ => msg.clone().into(),
691 };
692
693 let (_invoc_res, apply_ret, duration, state_root) = self
694 .call_with_gas(&mut chain_msg, &[], Some(ts), vm_flush)
695 .await?;
696
697 Ok((
698 ApiInvocResult {
699 msg_cid: msg.cid(),
700 msg,
701 msg_rct: Some(apply_ret.msg_receipt()),
702 error: apply_ret.failure_info().unwrap_or_default(),
703 duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
704 gas_cost: MessageGasCost::default(),
705 execution_trace: structured::parse_events(apply_ret.exec_trace())
706 .unwrap_or_default(),
707 },
708 state_root,
709 ))
710 }
711
712 pub async fn call_with_gas(
715 self: &Arc<Self>,
716 message: &mut ChainMessage,
717 prior_messages: &[ChainMessage],
718 tipset: Option<Tipset>,
719 vm_flush: VMFlush,
720 ) -> Result<(InvocResult, ApplyRet, Duration, Option<Cid>), Error> {
721 let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
722 let TipsetState { state_root, .. } = self
723 .load_tipset_state(&ts)
724 .await
725 .map_err(|e| Error::Other(format!("Could not load tipset state: {e:#}")))?;
726 let chain_rand = self.chain_rand(ts.clone());
727
728 let epoch = ts.epoch() + 1;
731 let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
732 let (ret, duration, state_cid) = stacker::grow(64 << 20, || -> anyhow::Result<_> {
735 let mut vm = VM::new(
736 ExecutionContext {
737 heaviest_tipset: ts.clone(),
738 state_tree_root: state_root,
739 epoch,
740 rand: Box::new(chain_rand),
741 base_fee: ts.block_headers().first().parent_base_fee.clone(),
742 circ_supply: genesis_info.get_vm_circulating_supply(
743 epoch,
744 self.blockstore(),
745 &state_root,
746 )?,
747 chain_config: self.chain_config().shallow_clone(),
748 chain_index: self.chain_index().shallow_clone(),
749 timestamp: ts.min_timestamp(),
750 },
751 &self.engine,
752 VMTrace::NotTraced,
753 )?;
754
755 for msg in prior_messages {
756 vm.apply_message(msg)?;
757 }
758 let from_actor = vm
759 .get_actor(&message.from())
760 .map_err(|e| Error::Other(format!("Could not get actor from state: {e:#}")))?
761 .ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?;
762
763 message.set_sequence(from_actor.sequence);
764 let (ret, duration) = vm.apply_message(message)?;
765 let state_root = match vm_flush {
766 VMFlush::Flush => Some(vm.flush()?),
767 VMFlush::Skip => None,
768 };
769 Ok((ret, duration, state_root))
770 })?;
771
772 Ok((
773 InvocResult::new(message.message().clone(), &ret),
774 ret,
775 duration,
776 state_cid,
777 ))
778 }
779
780 pub async fn replay(self: &Arc<Self>, ts: Tipset, mcid: Cid) -> Result<ApiInvocResult, Error> {
783 let this = Arc::clone(self);
784 tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid)).await?
785 }
786
787 pub fn replay_blocking(
789 self: &Arc<Self>,
790 ts: Tipset,
791 mcid: Cid,
792 ) -> Result<ApiInvocResult, Error> {
793 const REPLAY_HALT: &str = "replay_halt";
794
795 let mut api_invoc_result = None;
796 let callback = |ctx: MessageCallbackCtx<'_>| {
797 match ctx.at {
798 CalledAt::Applied | CalledAt::Reward
799 if api_invoc_result.is_none() && ctx.cid == mcid =>
800 {
801 api_invoc_result = Some(ApiInvocResult {
802 msg_cid: ctx.message.cid(),
803 msg: ctx.message.message().clone(),
804 msg_rct: Some(ctx.apply_ret.msg_receipt()),
805 error: ctx.apply_ret.failure_info().unwrap_or_default(),
806 duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
807 gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
808 execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
809 .unwrap_or_default(),
810 });
811 anyhow::bail!(REPLAY_HALT);
812 }
813 _ => Ok(()), }
815 };
816 let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced);
817 if let Err(error_message) = result
818 && error_message.to_string() != REPLAY_HALT
819 {
820 return Err(Error::Other(format!(
821 "unexpected error during execution : {error_message:}"
822 )));
823 }
824 api_invoc_result.ok_or_else(|| Error::Other("failed to replay".into()))
825 }
826
827 pub async fn replay_for_prestate(
830 self: &Arc<Self>,
831 ts: Tipset,
832 target_message_cid: Cid,
833 ) -> Result<(Cid, ApiInvocResult, Cid), Error> {
834 let this = Arc::clone(self);
835 tokio::task::spawn_blocking(move || {
836 this.replay_for_prestate_blocking(ts, target_message_cid)
837 })
838 .await
839 .map_err(|e| Error::Other(format!("{e}")))?
840 }
841
842 fn replay_for_prestate_blocking(
843 self: &Arc<Self>,
844 ts: Tipset,
845 target_msg_cid: Cid,
846 ) -> Result<(Cid, ApiInvocResult, Cid), Error> {
847 if ts.epoch() == 0 {
848 return Err(Error::Other(
849 "cannot trace messages in the genesis block".into(),
850 ));
851 }
852
853 let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
854 let exec = TipsetExecutor::new(
855 self.chain_index().shallow_clone(),
856 self.chain_config().shallow_clone(),
857 self.beacon_schedule().shallow_clone(),
858 &self.engine,
859 ts.shallow_clone(),
860 );
861 let mut no_cb = NO_CALLBACK;
862 let (parent_state, epoch, block_messages) =
863 exec.prepare_parent_state(genesis_timestamp, VMTrace::NotTraced, &mut no_cb)?;
864
865 Ok(stacker::grow(64 << 20, || {
866 let mut vm =
867 exec.create_vm(parent_state, epoch, ts.min_timestamp(), VMTrace::NotTraced)?;
868 let mut processed = ahash::HashSet::default();
869
870 for block in block_messages.iter() {
871 let mut penalty = TokenAmount::zero();
872 let mut gas_reward = TokenAmount::zero();
873
874 for msg in block.messages.iter() {
875 let cid = msg.cid();
876 if processed.contains(&cid) {
877 continue;
878 }
879
880 processed.insert(cid);
881
882 if cid == target_msg_cid {
883 let pre_root = vm.flush()?;
884 let mut traced_vm =
885 exec.create_vm(pre_root, epoch, ts.min_timestamp(), VMTrace::Traced)?;
886 let (ret, duration) = traced_vm.apply_message(msg)?;
887 let post_root = traced_vm.flush()?;
888
889 return Ok((
890 pre_root,
891 ApiInvocResult {
892 msg_cid: cid,
893 msg: msg.message().clone(),
894 msg_rct: Some(ret.msg_receipt()),
895 error: ret.failure_info().unwrap_or_default(),
896 duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
897 gas_cost: MessageGasCost::default(),
898 execution_trace: structured::parse_events(ret.exec_trace())
899 .unwrap_or_default(),
900 },
901 post_root,
902 ));
903 }
904
905 let (ret, _) = vm.apply_message(msg)?;
906 gas_reward += ret.miner_tip();
907 penalty += ret.penalty();
908 }
909
910 if let Some(rew_msg) =
911 vm.reward_message(epoch, block.miner, block.win_count, penalty, gas_reward)?
912 {
913 let (ret, _) = vm.apply_implicit_message(&rew_msg)?;
914 if let Some(err) = ret.failure_info() {
915 bail!(
916 "failed to apply reward message for miner {}: {err}",
917 block.miner
918 );
919 }
920
921 if !ret.msg_receipt().exit_code().is_success() {
923 bail!(
924 "reward application message failed (exit: {:?})",
925 ret.msg_receipt().exit_code()
926 );
927 }
928 }
929 }
930
931 bail!("message {target_msg_cid} not found in tipset")
932 })?)
933 }
934
935 pub fn eligible_to_mine(
938 &self,
939 address: &Address,
940 base_tipset: &Tipset,
941 lookback_tipset: &Tipset,
942 ) -> anyhow::Result<bool, Error> {
943 let hmp =
944 self.miner_has_min_power(&self.chain_config().policy, address, lookback_tipset)?;
945 let version = self.get_network_version(base_tipset.epoch());
946
947 if version <= NetworkVersion::V3 {
948 return Ok(hmp);
949 }
950
951 if !hmp {
952 return Ok(false);
953 }
954
955 let actor = self
956 .get_actor(&Address::POWER_ACTOR, *base_tipset.parent_state())?
957 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
958
959 let power_state = power::State::load(self.blockstore(), actor.code, actor.state)?;
960
961 let actor = self
962 .get_actor(address, *base_tipset.parent_state())?
963 .ok_or_else(|| Error::state("Miner actor address could not be resolved"))?;
964
965 let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
966
967 let claim = power_state
969 .miner_power(self.blockstore(), address)?
970 .ok_or_else(|| Error::Other("Could not get claim".to_string()))?;
971 if claim.quality_adj_power <= BigInt::zero() {
972 return Ok(false);
973 }
974
975 if !miner_state.fee_debt().is_zero() {
977 return Ok(false);
978 }
979
980 let info = miner_state.info(self.blockstore())?;
982 if base_tipset.epoch() <= info.consensus_fault_elapsed {
983 return Ok(false);
984 }
985
986 Ok(true)
987 }
988
989 pub async fn compute_tipset_state(
1011 self: &Arc<Self>,
1012 tipset: Tipset,
1013 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
1014 enable_tracing: VMTrace,
1015 ) -> Result<ExecutedTipset, Error> {
1016 let this = Arc::clone(self);
1017 tokio::task::spawn_blocking(move || {
1018 this.compute_tipset_state_blocking(tipset, callback, enable_tracing)
1019 })
1020 .await?
1021 }
1022
1023 pub fn compute_tipset_state_blocking(
1025 &self,
1026 tipset: Tipset,
1027 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
1028 enable_tracing: VMTrace,
1029 ) -> Result<ExecutedTipset, Error> {
1030 let epoch = tipset.epoch();
1031 let has_callback = callback.is_some();
1032 info!(
1033 "Evaluating tipset: EPOCH={epoch}, blocks={}, tsk={}",
1034 tipset.len(),
1035 tipset.key(),
1036 );
1037 Ok(apply_block_messages(
1038 self.chain_store().genesis_block_header().timestamp,
1039 self.chain_index().shallow_clone(),
1040 self.chain_config().shallow_clone(),
1041 self.beacon_schedule().shallow_clone(),
1042 &self.engine,
1043 tipset,
1044 callback,
1045 enable_tracing,
1046 )
1047 .map_err(|e| {
1048 if has_callback {
1049 e
1050 } else {
1051 e.context(format!("Failed to compute tipset state@{epoch}"))
1052 }
1053 })?)
1054 }
1055
1056 #[instrument(skip_all)]
1057 pub async fn compute_state(
1058 self: &Arc<Self>,
1059 height: ChainEpoch,
1060 messages: Vec<Message>,
1061 tipset: Tipset,
1062 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
1063 enable_tracing: VMTrace,
1064 ) -> Result<ExecutedTipset, Error> {
1065 let this = Arc::clone(self);
1066 tokio::task::spawn_blocking(move || {
1067 this.compute_state_blocking(height, messages, tipset, callback, enable_tracing)
1068 })
1069 .await?
1070 }
1071
1072 #[tracing::instrument(skip_all)]
1074 pub fn compute_state_blocking(
1075 &self,
1076 height: ChainEpoch,
1077 messages: Vec<Message>,
1078 tipset: Tipset,
1079 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
1080 enable_tracing: VMTrace,
1081 ) -> Result<ExecutedTipset, Error> {
1082 Ok(compute_state(
1083 height,
1084 messages,
1085 tipset,
1086 self.chain_store().genesis_block_header().timestamp,
1087 self.chain_index().shallow_clone(),
1088 self.chain_config().shallow_clone(),
1089 self.beacon_schedule().shallow_clone(),
1090 &self.engine,
1091 callback,
1092 enable_tracing,
1093 )?)
1094 }
1095
1096 fn tipset_executed_message(
1099 &self,
1100 tipset: &Tipset,
1101 message: &ChainMessage,
1102 allow_replaced: bool,
1103 ) -> Result<Option<Receipt>, Error> {
1104 if tipset.epoch() == 0 {
1105 return Ok(None);
1106 }
1107 let message_from_address = message.from();
1108 let message_sequence = message.sequence();
1109 let pts = self
1111 .chain_index()
1112 .load_required_tipset(tipset.parents())
1113 .map_err(|err| Error::Other(format!("Failed to load tipset: {err}")))?;
1114 let messages = self
1115 .cs
1116 .messages_for_tipset(&pts)
1117 .map_err(|err| Error::Other(format!("Failed to load messages for tipset: {err}")))?;
1118 messages
1119 .iter()
1120 .enumerate()
1121 .rev()
1123 .filter(|(_, s)| {
1124 s.sequence() == message_sequence
1125 && s.from() == message_from_address
1126 && s.equal_call(message)
1127 })
1128 .map(|(index, m)| {
1129 if !allow_replaced && message.cid() != m.cid(){
1133 Err(Error::Other(format!(
1134 "found message with equal nonce and call params but different CID. wanted {}, found: {}, nonce: {}, from: {}",
1135 message.cid(),
1136 m.cid(),
1137 message.sequence(),
1138 message.from(),
1139 )))
1140 } else {
1141 let block_header = tipset.block_headers().first();
1142 crate::chain::get_parent_receipt(
1143 self.blockstore(),
1144 block_header,
1145 index,
1146 )
1147 .map_err(|err| Error::Other(format!("Failed to get parent receipt (message_receipts={}, index={index}, error={err})", block_header.message_receipts)))
1148 }
1149 })
1150 .next()
1151 .unwrap_or(Ok(None))
1152 }
1153
1154 fn check_search(
1155 &self,
1156 mut current: Tipset,
1157 message: &ChainMessage,
1158 lookback_max_epoch: ChainEpoch,
1159 allow_replaced: bool,
1160 ) -> Result<Option<(Tipset, Receipt)>, Error> {
1161 let message_from_address = message.from();
1162 let message_sequence = message.sequence();
1163 let mut current_actor_state = self
1164 .get_required_actor(&message_from_address, *current.parent_state())
1165 .map_err(Error::state)?;
1166 let message_from_id = self.lookup_required_id(&message_from_address, ¤t)?;
1167
1168 while current.epoch() >= lookback_max_epoch {
1169 let parent_tipset = self
1170 .chain_index()
1171 .load_required_tipset(current.parents())
1172 .map_err(|err| {
1173 Error::Other(format!(
1174 "failed to load tipset during msg wait searchback: {err:}"
1175 ))
1176 })?;
1177
1178 let parent_actor_state = self
1179 .get_actor(&message_from_id, *parent_tipset.parent_state())
1180 .map_err(|e| Error::State(e.to_string()))?;
1181
1182 if parent_actor_state.is_none()
1183 || (current_actor_state.sequence > message_sequence
1184 && parent_actor_state.as_ref().unwrap().sequence <= message_sequence)
1185 {
1186 let receipt = self
1187 .tipset_executed_message(¤t, message, allow_replaced)?
1188 .context("Failed to get receipt with tipset_executed_message")?;
1189 return Ok(Some((current, receipt)));
1190 }
1191
1192 if let Some(parent_actor_state) = parent_actor_state {
1193 current = parent_tipset;
1194 current_actor_state = parent_actor_state;
1195 } else {
1196 break;
1197 }
1198 }
1199
1200 Ok(None)
1201 }
1202
1203 fn search_back_for_message(
1205 &self,
1206 current: Tipset,
1207 message: &ChainMessage,
1208 look_back_limit: Option<i64>,
1209 allow_replaced: Option<bool>,
1210 ) -> Result<Option<(Tipset, Receipt)>, Error> {
1211 let current_epoch = current.epoch();
1212 let allow_replaced = allow_replaced.unwrap_or(true);
1213
1214 let lookback_max_epoch = match look_back_limit {
1216 Some(0) => return Ok(None),
1218 Some(limit) if limit > 0 => (current_epoch - limit + 1).max(0),
1222 _ => 0,
1224 };
1225
1226 self.check_search(current, message, lookback_max_epoch, allow_replaced)
1227 }
1228
1229 pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result<Receipt, Error> {
1231 let m = crate::chain::get_chain_message(self.blockstore(), &msg)
1232 .map_err(|e| Error::Other(e.to_string()))?;
1233 let message_receipt = self.tipset_executed_message(&tipset, &m, true)?;
1234 if let Some(receipt) = message_receipt {
1235 return Ok(receipt);
1236 }
1237
1238 let maybe_tuple = self.search_back_for_message(tipset, &m, None, None)?;
1239 let message_receipt = maybe_tuple
1240 .ok_or_else(|| {
1241 Error::Other("Could not get receipt from search back message".to_string())
1242 })?
1243 .1;
1244 Ok(message_receipt)
1245 }
1246
1247 pub async fn wait_for_message(
1252 self: &Arc<Self>,
1253 msg_cid: Cid,
1254 confidence: i64,
1255 look_back_limit: Option<ChainEpoch>,
1256 allow_replaced: Option<bool>,
1257 ) -> Result<(Option<Tipset>, Option<Receipt>), Error> {
1258 let mut head_changes_rx = self.cs.subscribe_head_changes();
1259 let (sender, mut receiver) = oneshot::channel::<()>();
1260 let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1261 .map_err(|err| Error::Other(format!("failed to load message {err:}")))?;
1262 let current_tipset = self.heaviest_tipset();
1263 let maybe_message_receipt =
1264 self.tipset_executed_message(¤t_tipset, &message, true)?;
1265 if let Some(r) = maybe_message_receipt {
1266 return Ok((Some(current_tipset.shallow_clone()), Some(r)));
1267 }
1268
1269 let mut candidate_tipset: Option<Tipset> = None;
1270 let mut candidate_receipt: Option<Receipt> = None;
1271
1272 let sm_cloned = self.shallow_clone();
1273
1274 let message_for_task = message.clone();
1275 let height_of_head = current_tipset.epoch();
1276 let task = tokio::task::spawn(async move {
1277 let back_tuple = sm_cloned.search_back_for_message(
1278 current_tipset,
1279 &message_for_task,
1280 look_back_limit,
1281 allow_replaced,
1282 )?;
1283 sender
1284 .send(())
1285 .map_err(|e| Error::Other(format!("Could not send to channel {e:?}")))?;
1286 Ok::<_, Error>(back_tuple)
1287 });
1288
1289 let reverts: Arc<RwLock<HashMap<TipsetKey, bool>>> = Arc::new(RwLock::new(HashMap::new()));
1290 let block_revert = reverts.clone();
1291 let sm_cloned = Arc::clone(self);
1292
1293 let mut subscriber_poll = tokio::task::spawn(async move {
1295 loop {
1296 match head_changes_rx.recv().await {
1297 Ok(head_changes) => {
1298 for tipset in head_changes.reverts {
1299 if candidate_tipset
1300 .as_ref()
1301 .is_some_and(|candidate| candidate.key() == tipset.key())
1302 {
1303 candidate_tipset = None;
1304 candidate_receipt = None;
1305 }
1306 }
1307 for tipset in head_changes.applies {
1308 if candidate_tipset
1309 .as_ref()
1310 .map(|s| tipset.epoch() >= s.epoch() + confidence)
1311 .unwrap_or_default()
1312 {
1313 return Ok((candidate_tipset, candidate_receipt));
1314 }
1315 let poll_receiver = receiver.try_recv();
1316 if let Ok(Some(_)) = poll_receiver {
1317 block_revert
1318 .write()
1319 .await
1320 .insert(tipset.key().to_owned(), true);
1321 }
1322
1323 let maybe_receipt =
1324 sm_cloned.tipset_executed_message(&tipset, &message, true)?;
1325 if let Some(receipt) = maybe_receipt {
1326 if confidence == 0 {
1327 return Ok((Some(tipset), Some(receipt)));
1328 }
1329 candidate_tipset = Some(tipset);
1330 candidate_receipt = Some(receipt)
1331 }
1332 }
1333 }
1334 Err(RecvError::Lagged(i)) => {
1335 warn!(
1336 "wait for message head change subscriber lagged, skipped {} events",
1337 i
1338 );
1339 }
1340 Err(RecvError::Closed) => break,
1341 }
1342 }
1343 Ok((None, None))
1344 })
1345 .fuse();
1346
1347 let mut search_back_poll = tokio::task::spawn(async move {
1349 let back_tuple = task.await.map_err(|e| {
1350 Error::Other(format!("Could not search backwards for message {e}"))
1351 })??;
1352 if let Some((back_tipset, back_receipt)) = back_tuple {
1353 let should_revert = *reverts
1354 .read()
1355 .await
1356 .get(back_tipset.key())
1357 .unwrap_or(&false);
1358 let larger_height_of_head = height_of_head >= back_tipset.epoch() + confidence;
1359 if !should_revert && larger_height_of_head {
1360 return Ok::<_, Error>((Some(back_tipset), Some(back_receipt)));
1361 }
1362 return Ok((None, None));
1363 }
1364 Ok((None, None))
1365 })
1366 .fuse();
1367
1368 loop {
1370 select! {
1371 res = subscriber_poll => {
1372 return res?
1373 }
1374 res = search_back_poll => {
1375 if let Ok((Some(ts), Some(rct))) = res? {
1376 return Ok((Some(ts), Some(rct)));
1377 }
1378 }
1379 }
1380 }
1381 }
1382
1383 pub async fn search_for_message(
1384 &self,
1385 from: Option<Tipset>,
1386 msg_cid: Cid,
1387 look_back_limit: Option<i64>,
1388 allow_replaced: Option<bool>,
1389 ) -> Result<Option<(Tipset, Receipt)>, Error> {
1390 let from = from.unwrap_or_else(|| self.heaviest_tipset());
1391 let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1392 .map_err(|err| Error::Other(format!("failed to load message {err}")))?;
1393 let current_tipset = self.heaviest_tipset();
1394 let maybe_message_receipt =
1395 self.tipset_executed_message(&from, &message, allow_replaced.unwrap_or(true))?;
1396 if let Some(r) = maybe_message_receipt {
1397 Ok(Some((from, r)))
1398 } else {
1399 self.search_back_for_message(current_tipset, &message, look_back_limit, allow_replaced)
1400 }
1401 }
1402
1403 pub fn get_bls_public_key(
1405 db: &Arc<DB>,
1406 addr: &Address,
1407 state_cid: Cid,
1408 ) -> Result<BlsPublicKey, Error> {
1409 let state = StateTree::new_from_root(Arc::clone(db), &state_cid)
1410 .map_err(|e| Error::Other(e.to_string()))?;
1411 let kaddr =
1412 resolve_to_key_addr(&state, db, addr).context("Failed to resolve key address")?;
1413
1414 match kaddr.into_payload() {
1415 Payload::BLS(key) => BlsPublicKey::from_bytes(&key)
1416 .context("Failed to construct bls public key")
1417 .map_err(Error::from),
1418 _ => Err(Error::state(
1419 "Address must be BLS address to load bls public key",
1420 )),
1421 }
1422 }
1423
1424 pub fn lookup_id(&self, addr: &Address, ts: &Tipset) -> Result<Option<Address>, Error> {
1426 let state_tree = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1427 .map_err(|e| format!("{e:?}"))?;
1428 Ok(state_tree
1429 .lookup_id(addr)
1430 .map_err(|e| Error::Other(e.to_string()))?
1431 .map(Address::new_id))
1432 }
1433
1434 pub fn lookup_required_id(&self, addr: &Address, ts: &Tipset) -> Result<Address, Error> {
1436 self.lookup_id(addr, ts)?
1437 .ok_or_else(|| Error::Other(format!("Failed to lookup the id address {addr}")))
1438 }
1439
1440 pub fn market_state(&self, ts: &Tipset) -> Result<market::State, Error> {
1442 let actor = self.get_required_actor(&Address::MARKET_ACTOR, *ts.parent_state())?;
1443 let market_state = market::State::load(self.blockstore(), actor.code, actor.state)?;
1444 Ok(market_state)
1445 }
1446
1447 pub fn market_balance(&self, addr: &Address, ts: &Tipset) -> Result<MarketBalance, Error> {
1449 let market_state = self.market_state(ts)?;
1450 let new_addr = self.lookup_required_id(addr, ts)?;
1451 let out = MarketBalance {
1452 escrow: {
1453 market_state
1454 .escrow_table(self.blockstore())?
1455 .get(&new_addr)?
1456 },
1457 locked: {
1458 market_state
1459 .locked_table(self.blockstore())?
1460 .get(&new_addr)?
1461 },
1462 };
1463
1464 Ok(out)
1465 }
1466
1467 pub fn miner_info(&self, addr: &Address, ts: &Tipset) -> Result<MinerInfo, Error> {
1469 let actor = self
1470 .get_actor(addr, *ts.parent_state())?
1471 .ok_or_else(|| Error::state("Miner actor not found"))?;
1472 let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1473
1474 Ok(state.info(self.blockstore())?)
1475 }
1476
1477 pub fn miner_faults(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1479 self.all_partition_sectors(addr, ts, |partition| partition.faulty_sectors().clone())
1480 }
1481
1482 pub fn miner_recoveries(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1484 self.all_partition_sectors(addr, ts, |partition| partition.recovering_sectors().clone())
1485 }
1486
1487 fn all_partition_sectors(
1488 &self,
1489 addr: &Address,
1490 ts: &Tipset,
1491 get_sector: impl Fn(Partition<'_>) -> BitField,
1492 ) -> Result<BitField, Error> {
1493 let actor = self
1494 .get_actor(addr, *ts.parent_state())?
1495 .ok_or_else(|| Error::state("Miner actor not found"))?;
1496
1497 let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1498
1499 let mut partitions = Vec::new();
1500
1501 state.for_each_deadline(
1502 &self.chain_config().policy,
1503 self.blockstore(),
1504 |_, deadline| {
1505 deadline.for_each(self.blockstore(), |_, partition| {
1506 partitions.push(get_sector(partition));
1507 Ok(())
1508 })
1509 },
1510 )?;
1511
1512 Ok(BitField::union(partitions.iter()))
1513 }
1514
1515 pub fn miner_power(&self, addr: &Address, ts: &Tipset) -> Result<MinerPower, Error> {
1517 if let Some((miner_power, total_power)) = self.get_power(ts.parent_state(), Some(addr))? {
1518 return Ok(MinerPower {
1519 miner_power,
1520 total_power,
1521 has_min_power: true,
1522 });
1523 }
1524
1525 Ok(MinerPower {
1526 has_min_power: false,
1527 miner_power: Default::default(),
1528 total_power: Default::default(),
1529 })
1530 }
1531
1532 pub async fn resolve_to_key_addr(
1535 self: &Arc<Self>,
1536 addr: &Address,
1537 ts: &Tipset,
1538 ) -> anyhow::Result<Address> {
1539 match addr.protocol() {
1540 Protocol::BLS | Protocol::Secp256k1 | Protocol::Delegated => return Ok(*addr),
1541 Protocol::Actor => {
1542 return Err(Error::Other(
1543 "cannot resolve actor address to key address".to_string(),
1544 )
1545 .into());
1546 }
1547 _ => {}
1548 };
1549
1550 let state = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())?;
1553 if let Ok(addr) = resolve_to_key_addr(&state, self.blockstore(), addr) {
1554 return Ok(addr);
1555 }
1556
1557 let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?;
1559 let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
1560
1561 resolve_to_key_addr(&state, self.blockstore(), addr)
1562 }
1563
1564 pub async fn miner_get_base_info(
1565 self: &Arc<Self>,
1566 beacon_schedule: &BeaconSchedule,
1567 tipset: Tipset,
1568 addr: Address,
1569 epoch: ChainEpoch,
1570 ) -> anyhow::Result<Option<MiningBaseInfo>> {
1571 let prev_beacon = self
1572 .chain_store()
1573 .chain_index()
1574 .latest_beacon_entry(tipset.clone())?;
1575
1576 let entries: Vec<BeaconEntry> = beacon_schedule
1577 .beacon_entries_for_block(
1578 self.chain_config().network_version(epoch),
1579 epoch,
1580 tipset.epoch(),
1581 &prev_beacon,
1582 )
1583 .await?;
1584
1585 let base = entries.last().unwrap_or(&prev_beacon);
1586
1587 let (lb_tipset, lb_state_root) = ChainStore::get_lookback_tipset_for_round(
1588 self.chain_index(),
1589 self.chain_config(),
1590 &tipset,
1591 epoch,
1592 )?;
1593
1594 let actor = self.get_required_actor(&addr, *tipset.parent_state())?;
1599 if self.get_actor(&addr, lb_state_root)?.is_none() {
1600 return Ok(None);
1601 }
1602
1603 let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1604
1605 let addr_buf = to_vec(&addr)?;
1606 let rand = draw_randomness(
1607 base.signature(),
1608 DomainSeparationTag::WinningPoStChallengeSeed as i64,
1609 epoch,
1610 &addr_buf,
1611 )?;
1612
1613 let network_version = self.chain_config().network_version(tipset.epoch());
1614 let sectors = self.get_sectors_for_winning_post(
1615 &lb_state_root,
1616 network_version,
1617 &addr,
1618 Randomness::new(rand.to_vec()),
1619 )?;
1620
1621 if sectors.is_empty() {
1622 return Ok(None);
1623 }
1624
1625 let (miner_power, total_power) = self
1626 .get_power(&lb_state_root, Some(&addr))?
1627 .context("failed to get power")?;
1628
1629 let info = miner_state.info(self.blockstore())?;
1630
1631 let worker_key = self
1632 .resolve_to_deterministic_address(info.worker, &tipset)
1633 .await?;
1634 let eligible = self.eligible_to_mine(&addr, &tipset, &lb_tipset)?;
1635
1636 Ok(Some(MiningBaseInfo {
1637 miner_power: miner_power.quality_adj_power,
1638 network_power: total_power.quality_adj_power,
1639 sectors,
1640 worker_key,
1641 sector_size: info.sector_size,
1642 prev_beacon_entry: prev_beacon,
1643 beacon_entries: entries,
1644 eligible_for_mining: eligible,
1645 }))
1646 }
1647
1648 pub fn miner_has_min_power(
1651 &self,
1652 policy: &Policy,
1653 addr: &Address,
1654 ts: &Tipset,
1655 ) -> anyhow::Result<bool> {
1656 let actor = self
1657 .get_actor(&Address::POWER_ACTOR, *ts.parent_state())?
1658 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
1659 let ps = power::State::load(self.blockstore(), actor.code, actor.state)?;
1660
1661 ps.miner_nominal_power_meets_consensus_minimum(policy, self.blockstore(), addr)
1662 }
1663
1664 #[tracing::instrument(skip(self))]
1686 pub fn validate_range(&self, epochs: RangeInclusive<i64>) -> anyhow::Result<()> {
1687 let heaviest = self.heaviest_tipset();
1688 let heaviest_epoch = heaviest.epoch();
1689 let end = self
1690 .chain_index()
1691 .tipset_by_height(*epochs.end(), heaviest, ResolveNullTipset::TakeOlder)
1692 .with_context(|| {
1693 format!(
1694 "couldn't get a tipset at height {} behind heaviest tipset at height {heaviest_epoch}",
1695 *epochs.end(),
1696 )
1697 })?;
1698
1699 let tipsets = end
1701 .chain(self.blockstore())
1702 .take_while(|ts| ts.epoch() >= *epochs.start());
1703
1704 self.validate_tipsets(tipsets)
1705 }
1706
1707 pub fn validate_tipsets<T>(&self, tipsets: T) -> anyhow::Result<()>
1708 where
1709 T: Iterator<Item = Tipset> + Send,
1710 {
1711 let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1712 validate_tipsets(
1713 genesis_timestamp,
1714 self.chain_index(),
1715 self.chain_config(),
1716 self.beacon_schedule(),
1717 &self.engine,
1718 tipsets,
1719 )
1720 }
1721
1722 pub fn get_verified_registry_actor_state(
1723 &self,
1724 ts: &Tipset,
1725 ) -> anyhow::Result<verifreg::State> {
1726 let act = self
1727 .get_actor(&Address::VERIFIED_REGISTRY_ACTOR, *ts.parent_state())
1728 .map_err(Error::state)?
1729 .ok_or_else(|| Error::state("actor not found"))?;
1730 verifreg::State::load(self.blockstore(), act.code, act.state)
1731 }
1732 pub fn get_claim(
1733 &self,
1734 addr: &Address,
1735 ts: &Tipset,
1736 claim_id: ClaimID,
1737 ) -> anyhow::Result<Option<Claim>> {
1738 let id_address = self.lookup_required_id(addr, ts)?;
1739 let state = self.get_verified_registry_actor_state(ts)?;
1740 state.get_claim(self.blockstore(), id_address, claim_id)
1741 }
1742
1743 pub fn get_all_claims(&self, ts: &Tipset) -> anyhow::Result<HashMap<ClaimID, Claim>> {
1744 let state = self.get_verified_registry_actor_state(ts)?;
1745 state.get_all_claims(self.blockstore())
1746 }
1747
1748 pub fn get_allocation(
1749 &self,
1750 addr: &Address,
1751 ts: &Tipset,
1752 allocation_id: AllocationID,
1753 ) -> anyhow::Result<Option<Allocation>> {
1754 let id_address = self.lookup_required_id(addr, ts)?;
1755 let state = self.get_verified_registry_actor_state(ts)?;
1756 state.get_allocation(self.blockstore(), id_address.id()?, allocation_id)
1757 }
1758
1759 pub fn get_all_allocations(
1760 &self,
1761 ts: &Tipset,
1762 ) -> anyhow::Result<HashMap<AllocationID, Allocation>> {
1763 let state = self.get_verified_registry_actor_state(ts)?;
1764 state.get_all_allocations(self.blockstore())
1765 }
1766
1767 pub fn verified_client_status(
1768 &self,
1769 addr: &Address,
1770 ts: &Tipset,
1771 ) -> anyhow::Result<Option<DataCap>> {
1772 let id = self.lookup_required_id(addr, ts)?;
1773 let network_version = self.get_network_version(ts.epoch());
1774
1775 if (u32::from(network_version.0)) < 17 {
1779 let state = self.get_verified_registry_actor_state(ts)?;
1780 return state.verified_client_data_cap(self.blockstore(), id);
1781 }
1782
1783 let act = self
1784 .get_actor(&Address::DATACAP_TOKEN_ACTOR, *ts.parent_state())
1785 .map_err(Error::state)?
1786 .ok_or_else(|| Error::state("Miner actor not found"))?;
1787
1788 let state = datacap::State::load(self.blockstore(), act.code, act.state)?;
1789
1790 state.verified_client_data_cap(self.blockstore(), id)
1791 }
1792
1793 pub async fn resolve_to_deterministic_address(
1796 self: &Arc<Self>,
1797 address: Address,
1798 ts: &Tipset,
1799 ) -> anyhow::Result<Address> {
1800 use crate::shim::address::Protocol::*;
1801 match address.protocol() {
1802 BLS | Secp256k1 | Delegated => Ok(address),
1803 Actor => anyhow::bail!("cannot resolve actor address to key address"),
1804 ID => {
1805 let id = address.id()?;
1806 if let Some(cached) = self.id_to_deterministic_address_cache.get_cloned(&id) {
1807 return Ok(cached);
1808 }
1809 let resolved = if let Ok(state) =
1811 StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1812 && let Ok(address) = state
1813 .resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
1814 {
1815 address
1816 } else {
1817 let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?;
1819 let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
1820 state.resolve_to_deterministic_addr(self.chain_store().blockstore(), address)?
1821 };
1822 self.id_to_deterministic_address_cache.push(id, resolved);
1823 Ok(resolved)
1824 }
1825 }
1826 }
1827
1828 pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec<ApiInvocResult>)> {
1829 let mut invoc_trace = vec![];
1830
1831 let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1832
1833 let callback = |ctx: MessageCallbackCtx<'_>| {
1834 match ctx.at {
1835 CalledAt::Applied | CalledAt::Reward => {
1836 invoc_trace.push(ApiInvocResult {
1837 msg_cid: ctx.message.cid(),
1838 msg: ctx.message.message().clone(),
1839 msg_rct: Some(ctx.apply_ret.msg_receipt()),
1840 error: ctx.apply_ret.failure_info().unwrap_or_default(),
1841 duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
1842 gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
1843 execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
1844 .unwrap_or_default(),
1845 });
1846 Ok(())
1847 }
1848 _ => Ok(()), }
1850 };
1851
1852 let ExecutedTipset { state_root, .. } = apply_block_messages(
1853 genesis_timestamp,
1854 self.chain_index().shallow_clone(),
1855 self.chain_config().shallow_clone(),
1856 self.beacon_schedule().shallow_clone(),
1857 &self.engine,
1858 tipset.shallow_clone(),
1859 Some(callback),
1860 VMTrace::Traced,
1861 )?;
1862
1863 Ok((state_root, invoc_trace))
1864 }
1865}
1866
1867pub fn validate_tipsets<DB, T>(
1868 genesis_timestamp: u64,
1869 chain_index: &ChainIndex<DB>,
1870 chain_config: &Arc<ChainConfig>,
1871 beacon: &Arc<BeaconSchedule>,
1872 engine: &MultiEngine,
1873 tipsets: T,
1874) -> anyhow::Result<()>
1875where
1876 DB: Blockstore + Send + Sync + 'static,
1877 T: Iterator<Item = Tipset> + Send,
1878{
1879 for (child, parent) in tipsets.tuple_windows() {
1884 info!(height = parent.epoch(), "compute parent state");
1885 let ExecutedTipset {
1886 state_root: actual_state,
1887 receipt_root: actual_receipt,
1888 ..
1889 } = apply_block_messages(
1890 genesis_timestamp,
1891 chain_index.shallow_clone(),
1892 chain_config.shallow_clone(),
1893 beacon.shallow_clone(),
1894 engine,
1895 parent,
1896 NO_CALLBACK,
1897 VMTrace::NotTraced,
1898 )
1899 .context("couldn't compute tipset state")?;
1900 let expected_receipt = child.min_ticket_block().message_receipts;
1901 let expected_state = child.parent_state();
1902 if (expected_state, expected_receipt) != (&actual_state, actual_receipt) {
1903 error!(
1904 height = child.epoch(),
1905 ?expected_state,
1906 ?expected_receipt,
1907 ?actual_state,
1908 ?actual_receipt,
1909 "state mismatch"
1910 );
1911 bail!("state mismatch");
1912 }
1913 }
1914 Ok(())
1915}
1916
1917struct TipsetExecutor<'a, DB: Blockstore + Send + Sync + 'static> {
1922 tipset: Tipset,
1923 rand: ChainRand<DB>,
1924 chain_config: Arc<ChainConfig>,
1925 chain_index: ChainIndex<DB>,
1926 genesis_info: GenesisInfo,
1927 engine: &'a MultiEngine,
1928}
1929
1930impl<'a, DB: Blockstore + Send + Sync + 'static> TipsetExecutor<'a, DB> {
1931 fn new(
1932 chain_index: ChainIndex<DB>,
1933 chain_config: Arc<ChainConfig>,
1934 beacon: Arc<BeaconSchedule>,
1935 engine: &'a MultiEngine,
1936 tipset: Tipset,
1937 ) -> Self {
1938 let rand = ChainRand::new(
1939 chain_config.shallow_clone(),
1940 tipset.shallow_clone(),
1941 chain_index.shallow_clone(),
1942 beacon,
1943 );
1944 let genesis_info = GenesisInfo::from_chain_config(chain_config.shallow_clone());
1945 Self {
1946 tipset,
1947 rand,
1948 chain_config,
1949 chain_index,
1950 genesis_info,
1951 engine,
1952 }
1953 }
1954
1955 fn create_vm(
1956 &self,
1957 state_root: Cid,
1958 epoch: ChainEpoch,
1959 timestamp: u64,
1960 trace: VMTrace,
1961 ) -> anyhow::Result<VM<DB>> {
1962 let circ_supply = self.genesis_info.get_vm_circulating_supply(
1963 epoch,
1964 self.chain_index.db(),
1965 &state_root,
1966 )?;
1967 VM::new(
1968 ExecutionContext {
1969 heaviest_tipset: self.tipset.shallow_clone(),
1970 state_tree_root: state_root,
1971 epoch,
1972 rand: Box::new(self.rand.shallow_clone()),
1973 base_fee: self.tipset.min_ticket_block().parent_base_fee.clone(),
1974 circ_supply,
1975 chain_config: self.chain_config.shallow_clone(),
1976 chain_index: self.chain_index.shallow_clone(),
1977 timestamp,
1978 },
1979 self.engine,
1980 trace,
1981 )
1982 }
1983
1984 fn prepare_parent_state<F>(
1987 &self,
1988 genesis_timestamp: u64,
1989 null_epoch_trace: VMTrace,
1990 cron_callback: &mut Option<F>,
1991 ) -> anyhow::Result<(Cid, ChainEpoch, Vec<BlockMessages>)>
1992 where
1993 F: FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>,
1994 {
1995 use crate::shim::clock::EPOCH_DURATION_SECONDS;
1996
1997 let mut parent_state = *self.tipset.parent_state();
1998 let parent_epoch = self
1999 .chain_index
2000 .load_required_tipset(self.tipset.parents())?
2001 .epoch();
2002 let epoch = self.tipset.epoch();
2003
2004 for epoch_i in parent_epoch..epoch {
2005 if epoch_i > parent_epoch {
2006 let timestamp = genesis_timestamp + ((EPOCH_DURATION_SECONDS * epoch_i) as u64);
2007 parent_state = stacker::grow(64 << 20, || -> anyhow::Result<Cid> {
2008 let mut vm =
2009 self.create_vm(parent_state, epoch_i, timestamp, null_epoch_trace)?;
2010 if let Err(e) = vm.run_cron(epoch_i, cron_callback.as_mut()) {
2011 error!("Beginning of epoch cron failed to run: {e:#}");
2012 return Err(e);
2013 }
2014 vm.flush()
2015 })?;
2016 }
2017 if let Some(new_state) = run_state_migrations(
2018 epoch_i,
2019 &self.chain_config,
2020 self.chain_index.db(),
2021 &parent_state,
2022 )? {
2023 parent_state = new_state;
2024 }
2025 }
2026
2027 let block_messages = BlockMessages::for_tipset(self.chain_index.db(), &self.tipset)?;
2028 Ok((parent_state, epoch, block_messages))
2029 }
2030}
2031
2032#[allow(clippy::too_many_arguments)]
2109pub fn apply_block_messages<DB>(
2110 genesis_timestamp: u64,
2111 chain_index: ChainIndex<DB>,
2112 chain_config: Arc<ChainConfig>,
2113 beacon: Arc<BeaconSchedule>,
2114 engine: &MultiEngine,
2115 tipset: Tipset,
2116 mut callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
2117 enable_tracing: VMTrace,
2118) -> anyhow::Result<ExecutedTipset>
2119where
2120 DB: Blockstore + Send + Sync + 'static,
2121{
2122 if tipset.epoch() == 0 {
2131 let message_receipts = tipset.min_ticket_block().message_receipts;
2136 return Ok(ExecutedTipset {
2137 state_root: *tipset.parent_state(),
2138 receipt_root: message_receipts,
2139 executed_messages: vec![].into(),
2140 });
2141 }
2142
2143 let exec = TipsetExecutor::new(
2144 chain_index.shallow_clone(),
2145 chain_config,
2146 beacon,
2147 engine,
2148 tipset.shallow_clone(),
2149 );
2150
2151 let (parent_state, epoch, block_messages) =
2154 exec.prepare_parent_state(genesis_timestamp, enable_tracing, &mut callback)?;
2155
2156 stacker::grow(64 << 20, || -> anyhow::Result<ExecutedTipset> {
2159 let mut vm = exec.create_vm(parent_state, epoch, tipset.min_timestamp(), enable_tracing)?;
2160
2161 let (receipts, events, events_roots) =
2163 vm.apply_block_messages(&block_messages, epoch, callback)?;
2164
2165 let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts.iter())?;
2167
2168 for (events, events_root) in events.iter().zip(events_roots.iter()) {
2170 if let Some(events) = events {
2171 let event_root =
2172 events_root.context("events root should be present when events present")?;
2173 let derived_event_root = Amt::new_from_iter_with_bit_width(
2175 chain_index.db(),
2176 EVENTS_AMT_BITWIDTH,
2177 events.iter(),
2178 )
2179 .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;
2180
2181 ensure!(
2183 derived_event_root == event_root,
2184 "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
2185 );
2186 }
2187 }
2188
2189 let state_root = vm.flush()?;
2190
2191 let messages: Vec<ChainMessage> = block_messages
2193 .into_iter()
2194 .flat_map(|bm| bm.messages)
2195 .collect_vec();
2196 anyhow::ensure!(
2197 messages.len() == receipts.len() && messages.len() == events.len(),
2198 "length of messages, receipts, and events should match",
2199 );
2200 Ok(ExecutedTipset {
2201 state_root,
2202 receipt_root,
2203 executed_messages: messages
2204 .into_iter()
2205 .zip(receipts)
2206 .zip(events)
2207 .map(|((message, receipt), events)| ExecutedMessage {
2208 message,
2209 receipt,
2210 events,
2211 })
2212 .collect_vec()
2213 .into(),
2214 })
2215 })
2216}
2217
2218#[allow(clippy::too_many_arguments)]
2219pub fn compute_state<DB>(
2220 _height: ChainEpoch,
2221 messages: Vec<Message>,
2222 tipset: Tipset,
2223 genesis_timestamp: u64,
2224 chain_index: ChainIndex<DB>,
2225 chain_config: Arc<ChainConfig>,
2226 beacon: Arc<BeaconSchedule>,
2227 engine: &MultiEngine,
2228 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
2229 enable_tracing: VMTrace,
2230) -> anyhow::Result<ExecutedTipset>
2231where
2232 DB: Blockstore + Send + Sync + 'static,
2233{
2234 if !messages.is_empty() {
2235 anyhow::bail!("Applying messages is not yet implemented.");
2236 }
2237
2238 let output = apply_block_messages(
2239 genesis_timestamp,
2240 chain_index,
2241 chain_config,
2242 beacon,
2243 engine,
2244 tipset,
2245 callback,
2246 enable_tracing,
2247 )?;
2248
2249 Ok(output)
2250}
2251
2252#[derive(Debug, Copy, Clone, Default)]
2254pub enum VMFlush {
2255 Flush,
2256 #[default]
2257 Skip,
2258}