1#[cfg(test)]
5mod tests;
6
7mod cache;
8pub mod chain_rand;
9pub mod circulating_supply;
10mod errors;
11pub mod utils;
12
13pub use self::errors::*;
14use self::utils::structured;
15
16use crate::beacon::{BeaconEntry, BeaconSchedule};
17use crate::blocks::{Tipset, TipsetKey};
18use crate::chain::{
19 ChainStore, HeadChange,
20 index::{ChainIndex, ResolveNullTipset},
21};
22use crate::interpreter::{
23 ApplyResult, BlockMessages, CalledAt, ExecutionContext, IMPLICIT_MESSAGE_GAS_LIMIT, VM,
24 resolve_to_key_addr,
25};
26use crate::interpreter::{MessageCallbackCtx, VMTrace};
27use crate::lotus_json::{LotusJson, lotus_json_with_self};
28use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage};
29use crate::networks::ChainConfig;
30use crate::rpc::state::{ApiInvocResult, InvocResult, MessageGasCost};
31use crate::rpc::types::{MiningBaseInfo, SectorOnChainInfo};
32use crate::shim::actors::init::{self, State};
33use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition};
34use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim};
35use crate::shim::actors::*;
36use crate::shim::crypto::{Signature, SignatureType};
37use crate::shim::{
38 actors::{
39 LoadActorStateFromBlockstore, miner::ext::MinerStateExt as _,
40 verifreg::ext::VerifiedRegistryStateExt as _,
41 },
42 executor::{ApplyRet, Receipt, StampedEvent},
43};
44use crate::shim::{
45 address::{Address, Payload, Protocol},
46 clock::ChainEpoch,
47 econ::TokenAmount,
48 machine::{GLOBAL_MULTI_ENGINE, MultiEngine},
49 message::Message,
50 randomness::Randomness,
51 runtime::Policy,
52 state_tree::{ActorState, StateTree},
53 version::NetworkVersion,
54};
55use crate::state_manager::cache::{
56 DisabledTipsetDataCache, EnabledTipsetDataCache, TipsetReceiptEventCacheHandler,
57 TipsetStateCache,
58};
59use crate::state_manager::chain_rand::draw_randomness;
60use crate::state_migration::run_state_migrations;
61use crate::utils::get_size::{
62 GetSize, vec_heap_size_helper, vec_with_stack_only_item_heap_size_helper,
63};
64use ahash::{HashMap, HashMapExt};
65use anyhow::{Context as _, bail, ensure};
66use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _};
67use chain_rand::ChainRand;
68use cid::Cid;
69pub use circulating_supply::GenesisInfo;
70use fil_actor_verifreg_state::v12::DataCap;
71use fil_actor_verifreg_state::v13::ClaimID;
72use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0};
73use fil_actors_shared::fvm_ipld_bitfield::BitField;
74use fil_actors_shared::v12::runtime::DomainSeparationTag;
75use futures::{FutureExt, channel::oneshot, select};
76use fvm_ipld_blockstore::Blockstore;
77use fvm_ipld_encoding::to_vec;
78use fvm_shared4::crypto::signature::SECP_SIG_LEN;
79use itertools::Itertools as _;
80use nonzero_ext::nonzero;
81use num::BigInt;
82use num_traits::identities::Zero;
83use rayon::prelude::ParallelBridge;
84use schemars::JsonSchema;
85use serde::{Deserialize, Serialize};
86use std::ops::RangeInclusive;
87use std::time::Duration;
88use std::{num::NonZeroUsize, sync::Arc};
89use tokio::sync::{RwLock, broadcast::error::RecvError};
90use tracing::{error, info, instrument, trace, warn};
91
92const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
93pub const EVENTS_AMT_BITWIDTH: u32 = 5;
94
95type CidPair = (Cid, Cid);
97
98#[derive(Debug, Clone, GetSize)] pub struct StateEvents {
100 #[get_size(size_fn = vec_heap_size_helper)]
101 pub events: Vec<Vec<StampedEvent>>,
102 #[get_size(size_fn = vec_with_stack_only_item_heap_size_helper)]
103 pub roots: Vec<Option<Cid>>,
104}
105
106#[derive(Clone)]
107pub struct StateOutput {
108 pub state_root: Cid,
109 pub receipt_root: Cid,
110 pub events: Vec<Vec<StampedEvent>>,
111 pub events_roots: Vec<Option<Cid>>,
112}
113
114#[derive(Debug, Default, Clone, GetSize)]
115pub struct StateOutputValue {
116 #[get_size(ignore)]
117 pub state_root: Cid,
118 #[get_size(ignore)]
119 pub receipt_root: Cid,
120}
121
122impl From<StateOutputValue> for StateOutput {
123 fn from(value: StateOutputValue) -> Self {
124 Self {
125 state_root: value.state_root,
126 receipt_root: value.receipt_root,
127 events: vec![],
128 events_roots: vec![],
129 }
130 }
131}
132
133impl From<StateOutput> for StateOutputValue {
134 fn from(value: StateOutput) -> Self {
135 StateOutputValue {
136 state_root: value.state_root,
137 receipt_root: value.receipt_root,
138 }
139 }
140}
141
142#[derive(
144 Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema,
145)]
146#[serde(rename_all = "PascalCase")]
147pub struct MarketBalance {
148 #[schemars(with = "LotusJson<TokenAmount>")]
149 #[serde(with = "crate::lotus_json")]
150 pub escrow: TokenAmount,
151 #[schemars(with = "LotusJson<TokenAmount>")]
152 #[serde(with = "crate::lotus_json")]
153 pub locked: TokenAmount,
154}
155lotus_json_with_self!(MarketBalance);
156
157pub struct StateManager<DB> {
163 cs: Arc<ChainStore<DB>>,
165 cache: TipsetStateCache<StateOutputValue>,
167 beacon: Arc<crate::beacon::BeaconSchedule>,
168 engine: Arc<MultiEngine>,
169 receipt_event_cache_handler: Box<dyn TipsetReceiptEventCacheHandler>,
171}
172
173#[allow(clippy::type_complexity)]
174pub const NO_CALLBACK: Option<fn(MessageCallbackCtx<'_>) -> anyhow::Result<()>> = None;
175
176impl<DB> StateManager<DB>
177where
178 DB: Blockstore,
179{
180 pub fn new(cs: Arc<ChainStore<DB>>) -> Result<Self, anyhow::Error> {
181 Self::new_with_engine(cs, GLOBAL_MULTI_ENGINE.clone())
182 }
183
184 pub fn new_with_engine(
185 cs: Arc<ChainStore<DB>>,
186 engine: Arc<MultiEngine>,
187 ) -> Result<Self, anyhow::Error> {
188 let genesis = cs.genesis_block_header();
189 let beacon = Arc::new(cs.chain_config().get_beacon_schedule(genesis.timestamp));
190
191 let cache_handler: Box<dyn TipsetReceiptEventCacheHandler> =
192 if cs.chain_config().enable_receipt_event_caching {
193 Box::new(EnabledTipsetDataCache::new())
194 } else {
195 Box::new(DisabledTipsetDataCache::new())
196 };
197
198 Ok(Self {
199 cs,
200 cache: TipsetStateCache::new("state_output"), beacon,
202 engine,
203 receipt_event_cache_handler: cache_handler,
204 })
205 }
206
207 pub fn heaviest_tipset(&self) -> Tipset {
209 self.chain_store().heaviest_tipset()
210 }
211
212 pub fn maybe_rewind_heaviest_tipset(&self) -> anyhow::Result<()> {
217 while self.maybe_rewind_heaviest_tipset_once()? {}
218 Ok(())
219 }
220
221 fn maybe_rewind_heaviest_tipset_once(&self) -> anyhow::Result<bool> {
222 let head = self.heaviest_tipset();
223 if let Some(info) = self
224 .chain_config()
225 .network_height_with_actor_bundle(head.epoch())
226 {
227 let expected_height_info = info.info;
228 let expected_bundle = info.manifest(self.blockstore())?;
229 let expected_bundle_metadata = expected_bundle.metadata()?;
230 let state = self.get_state_tree(head.parent_state())?;
231 let bundle_metadata = state.get_actor_bundle_metadata()?;
232 if expected_bundle_metadata != bundle_metadata {
233 let current_epoch = head.epoch();
234 let target_head = self.chain_index().tipset_by_height(
235 (expected_height_info.epoch - 1).max(0),
236 head,
237 ResolveNullTipset::TakeOlder,
238 )?;
239 let target_epoch = target_head.epoch();
240 let bundle_version = &bundle_metadata.version;
241 let expected_bundle_version = &expected_bundle_metadata.version;
242 if target_epoch < current_epoch {
243 tracing::warn!(
244 "rewinding chain head from {current_epoch} to {target_epoch}, actor bundle: {bundle_version}, expected: {expected_bundle_version}"
245 );
246 if self.blockstore().has(target_head.parent_state())? {
247 self.chain_store().set_heaviest_tipset(target_head)?;
248 return Ok(true);
249 } else {
250 anyhow::bail!(
251 "failed to rewind, state tree @ {target_epoch} is missing from blockstore: {}",
252 target_head.parent_state()
253 );
254 }
255 }
256 }
257 }
258 Ok(false)
259 }
260
261 pub fn populate_cache(&self) {
266 for (child, parent) in self
267 .chain_index()
268 .chain(self.heaviest_tipset())
269 .tuple_windows()
270 .take(DEFAULT_TIPSET_CACHE_SIZE.into())
271 {
272 let key = parent.key();
273 let state_root = child.min_ticket_block().state_root;
274 let receipt_root = child.min_ticket_block().message_receipts;
275 self.cache.insert(
276 key.clone(),
277 StateOutputValue {
278 state_root,
279 receipt_root,
280 },
281 );
282 if let Ok(receipts) = Receipt::get_receipts(self.blockstore(), receipt_root)
283 && !receipts.is_empty()
284 {
285 self.receipt_event_cache_handler
286 .insert_receipt(key, receipts);
287 }
288 }
289 }
290
291 pub fn beacon_schedule(&self) -> &Arc<BeaconSchedule> {
292 &self.beacon
293 }
294
295 pub fn get_network_version(&self, epoch: ChainEpoch) -> NetworkVersion {
297 self.chain_config().network_version(epoch)
298 }
299
300 pub fn get_state_tree(&self, state_cid: &Cid) -> anyhow::Result<StateTree<DB>> {
302 StateTree::new_from_root(self.blockstore_owned(), state_cid)
303 }
304
305 pub fn get_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<Option<ActorState>> {
307 let state = self.get_state_tree(&state_cid)?;
308 state.get_actor(addr)
309 }
310
311 pub fn get_actor_state<S: LoadActorStateFromBlockstore>(
313 &self,
314 ts: &Tipset,
315 ) -> anyhow::Result<S> {
316 let state_tree = self.get_state_tree(ts.parent_state())?;
317 state_tree.get_actor_state()
318 }
319
320 pub fn get_actor_state_from_address<S: LoadActorStateFromBlockstore>(
322 &self,
323 ts: &Tipset,
324 actor_address: &Address,
325 ) -> anyhow::Result<S> {
326 let state_tree = self.get_state_tree(ts.parent_state())?;
327 state_tree.get_actor_state_from_address(actor_address)
328 }
329
330 pub fn get_required_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<ActorState> {
332 let state = self.get_state_tree(&state_cid)?;
333 state.get_actor(addr)?.with_context(|| {
334 format!("Failed to load actor with addr={addr}, state_cid={state_cid}")
335 })
336 }
337
338 pub fn blockstore(&self) -> &Arc<DB> {
340 self.cs.blockstore()
341 }
342
343 pub fn blockstore_owned(&self) -> Arc<DB> {
344 self.blockstore().clone()
345 }
346
347 pub fn chain_store(&self) -> &Arc<ChainStore<DB>> {
349 &self.cs
350 }
351
352 pub fn chain_index(&self) -> &Arc<ChainIndex<Arc<DB>>> {
354 self.cs.chain_index()
355 }
356
357 pub fn chain_config(&self) -> &Arc<ChainConfig> {
359 self.cs.chain_config()
360 }
361
362 pub fn chain_rand(&self, tipset: Tipset) -> ChainRand<DB> {
363 ChainRand::new(
364 self.chain_config().clone(),
365 tipset,
366 self.chain_index().clone(),
367 self.beacon.clone(),
368 )
369 }
370
371 pub fn get_network_state_name(
373 &self,
374 state_cid: Cid,
375 ) -> anyhow::Result<crate::networks::StateNetworkName> {
376 let init_act = self
377 .get_actor(&init::ADDRESS.into(), state_cid)?
378 .ok_or_else(|| Error::state("Init actor address could not be resolved"))?;
379 Ok(
380 State::load(self.blockstore(), init_act.code, init_act.state)?
381 .into_network_name()
382 .into(),
383 )
384 }
385
386 pub fn is_miner_slashed(&self, addr: &Address, state_cid: &Cid) -> anyhow::Result<bool, Error> {
388 let actor = self
389 .get_actor(&Address::POWER_ACTOR, *state_cid)?
390 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
391
392 let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
393
394 Ok(spas.miner_power(self.blockstore(), addr)?.is_none())
395 }
396
397 pub fn get_miner_work_addr(&self, state_cid: Cid, addr: &Address) -> Result<Address, Error> {
399 let state =
400 StateTree::new_from_root(self.blockstore_owned(), &state_cid).map_err(Error::other)?;
401 let ms: miner::State = state.get_actor_state_from_address(addr)?;
402 let info = ms.info(self.blockstore()).map_err(|e| e.to_string())?;
403 let addr = resolve_to_key_addr(&state, self.blockstore(), &info.worker())?;
404 Ok(addr)
405 }
406
407 pub fn get_power(
410 &self,
411 state_cid: &Cid,
412 addr: Option<&Address>,
413 ) -> anyhow::Result<Option<(power::Claim, power::Claim)>, Error> {
414 let actor = self
415 .get_actor(&Address::POWER_ACTOR, *state_cid)?
416 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
417
418 let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
419
420 let t_pow = spas.total_power();
421
422 if let Some(maddr) = addr {
423 let m_pow = spas
424 .miner_power(self.blockstore(), maddr)?
425 .ok_or_else(|| Error::state(format!("Miner for address {maddr} not found")))?;
426
427 let min_pow = spas.miner_nominal_power_meets_consensus_minimum(
428 &self.chain_config().policy,
429 self.blockstore(),
430 maddr,
431 )?;
432 if min_pow {
433 return Ok(Some((m_pow, t_pow)));
434 }
435 }
436
437 Ok(None)
438 }
439
440 pub fn get_all_sectors(
442 &self,
443 addr: &Address,
444 ts: &Tipset,
445 ) -> anyhow::Result<Vec<SectorOnChainInfo>> {
446 let actor = self
447 .get_actor(addr, *ts.parent_state())?
448 .ok_or_else(|| Error::state("Miner actor not found"))?;
449 let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
450 state.load_sectors_ext(self.blockstore(), None)
451 }
452}
453
454impl<DB> StateManager<DB>
455where
456 DB: Blockstore + Send + Sync + 'static,
457{
458 pub async fn tipset_state(
462 self: &Arc<Self>,
463 tipset: &Tipset,
464 state_lookup: StateLookupPolicy,
465 ) -> anyhow::Result<CidPair> {
466 let StateOutput {
467 state_root,
468 receipt_root,
469 ..
470 } = self.tipset_state_output(tipset, state_lookup).await?;
471 Ok((state_root, receipt_root))
472 }
473
474 pub async fn tipset_state_output(
475 self: &Arc<Self>,
476 tipset: &Tipset,
477 state_lookup: StateLookupPolicy,
478 ) -> anyhow::Result<StateOutput> {
479 let key = tipset.key();
480 self.cache
481 .get_or_else(key, || async move {
482 info!(
483 "Evaluating tipset: EPOCH={}, blocks={}, tsk={}",
484 tipset.epoch(),
485 tipset.len(),
486 tipset.key(),
487 );
488
489 if matches!(state_lookup, StateLookupPolicy::Enabled)
492 && let Some(state_from_child) = self.try_lookup_state_from_next_tipset(tipset)
493 {
494 return Ok(state_from_child);
495 }
496
497 trace!("Computing state for tipset at epoch {}", tipset.epoch());
498 let state_output = self
499 .compute_tipset_state(tipset.clone(), NO_CALLBACK, VMTrace::NotTraced)
500 .await?;
501
502 self.update_cache_with_state_output(key, &state_output);
503
504 let ts_state = state_output.into();
505
506 Ok(ts_state)
507 })
508 .await
509 .map(StateOutput::from)
510 }
511
512 fn update_cache_with_state_output(&self, key: &TipsetKey, state_output: &StateOutput) {
514 if !state_output.events.is_empty() || !state_output.events_roots.is_empty() {
515 let events_data = StateEvents {
516 events: state_output.events.clone(),
517 roots: state_output.events_roots.clone(),
518 };
519 self.receipt_event_cache_handler
520 .insert_events(key, events_data);
521 }
522
523 if let Ok(receipts) = Receipt::get_receipts(self.blockstore(), state_output.receipt_root)
524 && !receipts.is_empty()
525 {
526 self.receipt_event_cache_handler
527 .insert_receipt(key, receipts);
528 }
529 }
530
531 #[instrument(skip(self))]
532 pub async fn tipset_message_receipts(
533 self: &Arc<Self>,
534 tipset: &Tipset,
535 ) -> anyhow::Result<Vec<Receipt>> {
536 let key = tipset.key();
537 let ts = tipset.clone();
538 let this = Arc::clone(self);
539 self.receipt_event_cache_handler
540 .get_receipt_or_else(
541 key,
542 Box::new(move || {
543 Box::pin(async move {
544 let StateOutput { receipt_root, .. } = this
545 .compute_tipset_state(ts, NO_CALLBACK, VMTrace::NotTraced)
546 .await?;
547 trace!("Completed tipset state calculation");
548 Receipt::get_receipts(this.blockstore(), receipt_root)
549 })
550 }),
551 )
552 .await
553 }
554
555 #[instrument(skip(self))]
556 pub async fn tipset_state_events(
557 self: &Arc<Self>,
558 tipset: &Tipset,
559 ) -> anyhow::Result<StateEvents> {
560 let key = tipset.key();
561 let ts = tipset.clone();
562 let this = Arc::clone(self);
563 let cids = tipset.cids();
564 self.receipt_event_cache_handler
565 .get_events_or_else(
566 key,
567 Box::new(move || {
568 Box::pin(async move {
569 let state_out = this
571 .compute_tipset_state(ts, NO_CALLBACK, VMTrace::NotTraced)
572 .await?;
573 trace!("Completed tipset state calculation {:?}", cids);
574 Ok(StateEvents {
575 events: state_out.events,
576 roots: state_out.events_roots,
577 })
578 })
579 }),
580 )
581 .await
582 }
583
584 #[instrument(skip(self, rand))]
585 fn call_raw(
586 &self,
587 state_cid: Option<Cid>,
588 msg: &Message,
589 rand: ChainRand<DB>,
590 tipset: &Tipset,
591 ) -> Result<ApiInvocResult, Error> {
592 let mut msg = msg.clone();
593
594 let state_cid = state_cid.unwrap_or(*tipset.parent_state());
595
596 let tipset_messages = self
597 .chain_store()
598 .messages_for_tipset(tipset)
599 .map_err(|err| Error::Other(err.to_string()))?;
600
601 let prior_messsages = tipset_messages
602 .iter()
603 .filter(|ts_msg| ts_msg.message().from() == msg.from());
604
605 let height = tipset.epoch();
608 let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
609 let mut vm = VM::new(
610 ExecutionContext {
611 heaviest_tipset: tipset.clone(),
612 state_tree_root: state_cid,
613 epoch: height,
614 rand: Box::new(rand),
615 base_fee: tipset.block_headers().first().parent_base_fee.clone(),
616 circ_supply: genesis_info.get_vm_circulating_supply(
617 height,
618 self.blockstore(),
619 &state_cid,
620 )?,
621 chain_config: self.chain_config().clone(),
622 chain_index: self.chain_index().clone(),
623 timestamp: tipset.min_timestamp(),
624 },
625 &self.engine,
626 VMTrace::Traced,
627 )?;
628
629 for m in prior_messsages {
630 vm.apply_message(m)?;
631 }
632
633 let state_cid = vm.flush()?;
636
637 let state = StateTree::new_from_root(self.blockstore_owned(), &state_cid)?;
638
639 let from_actor = state
640 .get_actor(&msg.from())?
641 .ok_or_else(|| anyhow::anyhow!("actor not found"))?;
642 msg.set_sequence(from_actor.sequence);
643
644 let mut msg = msg.clone();
646 msg.gas_limit = IMPLICIT_MESSAGE_GAS_LIMIT as u64;
647
648 let (apply_ret, duration) = vm.apply_implicit_message(&msg)?;
649
650 Ok(ApiInvocResult {
651 msg: msg.clone(),
652 msg_rct: Some(apply_ret.msg_receipt()),
653 msg_cid: msg.cid(),
654 error: apply_ret.failure_info().unwrap_or_default(),
655 duration: duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
656 gas_cost: MessageGasCost::default(),
657 execution_trace: structured::parse_events(apply_ret.exec_trace()).unwrap_or_default(),
658 })
659 }
660
661 pub fn call(&self, message: &Message, tipset: Option<Tipset>) -> Result<ApiInvocResult, Error> {
664 let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
665 let chain_rand = self.chain_rand(ts.clone());
666 self.call_raw(None, message, chain_rand, &ts)
667 }
668
669 pub fn call_on_state(
672 &self,
673 state_cid: Cid,
674 message: &Message,
675 tipset: Option<Tipset>,
676 ) -> Result<ApiInvocResult, Error> {
677 let ts = tipset.unwrap_or_else(|| self.cs.heaviest_tipset());
678 let chain_rand = self.chain_rand(ts.clone());
679 self.call_raw(Some(state_cid), message, chain_rand, &ts)
680 }
681
682 pub async fn apply_on_state_with_gas(
683 self: &Arc<Self>,
684 tipset: Option<Tipset>,
685 msg: Message,
686 state_lookup: StateLookupPolicy,
687 ) -> anyhow::Result<ApiInvocResult> {
688 let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
689
690 let from_a = self.resolve_to_key_addr(&msg.from, &ts).await?;
691
692 let mut chain_msg = match from_a.protocol() {
696 Protocol::Secp256k1 => ChainMessage::Signed(SignedMessage::new_unchecked(
697 msg.clone(),
698 Signature::new_secp256k1(vec![0; SECP_SIG_LEN]),
699 )),
700 Protocol::Delegated => ChainMessage::Signed(SignedMessage::new_unchecked(
701 msg.clone(),
702 Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]),
705 )),
706 _ => ChainMessage::Unsigned(msg.clone()),
707 };
708
709 let (_invoc_res, apply_ret, duration) = self
710 .call_with_gas(&mut chain_msg, &[], Some(ts), VMTrace::Traced, state_lookup)
711 .await?;
712 Ok(ApiInvocResult {
713 msg_cid: msg.cid(),
714 msg,
715 msg_rct: Some(apply_ret.msg_receipt()),
716 error: apply_ret.failure_info().unwrap_or_default(),
717 duration: duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
718 gas_cost: MessageGasCost::default(),
719 execution_trace: structured::parse_events(apply_ret.exec_trace()).unwrap_or_default(),
720 })
721 }
722
723 pub async fn call_with_gas(
726 self: &Arc<Self>,
727 message: &mut ChainMessage,
728 prior_messages: &[ChainMessage],
729 tipset: Option<Tipset>,
730 trace_config: VMTrace,
731 state_lookup: StateLookupPolicy,
732 ) -> Result<(InvocResult, ApplyRet, Duration), Error> {
733 let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
734 let (st, _) = self
735 .tipset_state(&ts, state_lookup)
736 .await
737 .map_err(|e| Error::Other(format!("Could not load tipset state: {e}")))?;
738 let chain_rand = self.chain_rand(ts.clone());
739
740 let epoch = ts.epoch() + 1;
743 let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
744 let (ret, duration) = stacker::grow(64 << 20, || -> ApplyResult {
747 let mut vm = VM::new(
748 ExecutionContext {
749 heaviest_tipset: ts.clone(),
750 state_tree_root: st,
751 epoch,
752 rand: Box::new(chain_rand),
753 base_fee: ts.block_headers().first().parent_base_fee.clone(),
754 circ_supply: genesis_info.get_vm_circulating_supply(
755 epoch,
756 self.blockstore(),
757 &st,
758 )?,
759 chain_config: self.chain_config().clone(),
760 chain_index: self.chain_index().clone(),
761 timestamp: ts.min_timestamp(),
762 },
763 &self.engine,
764 trace_config,
765 )?;
766
767 for msg in prior_messages {
768 vm.apply_message(msg)?;
769 }
770 let from_actor = vm
771 .get_actor(&message.from())
772 .map_err(|e| Error::Other(format!("Could not get actor from state: {e}")))?
773 .ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?;
774 message.set_sequence(from_actor.sequence);
775 vm.apply_message(message)
776 })?;
777
778 Ok((
779 InvocResult::new(message.message().clone(), &ret),
780 ret,
781 duration,
782 ))
783 }
784
785 pub async fn replay(self: &Arc<Self>, ts: Tipset, mcid: Cid) -> Result<ApiInvocResult, Error> {
788 let this = Arc::clone(self);
789 tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid))
790 .await
791 .map_err(|e| Error::Other(format!("{e}")))?
792 }
793
794 pub fn replay_blocking(
796 self: &Arc<Self>,
797 ts: Tipset,
798 mcid: Cid,
799 ) -> Result<ApiInvocResult, Error> {
800 const REPLAY_HALT: &str = "replay_halt";
801
802 let mut api_invoc_result = None;
803 let callback = |ctx: MessageCallbackCtx<'_>| {
804 match ctx.at {
805 CalledAt::Applied | CalledAt::Reward
806 if api_invoc_result.is_none() && ctx.cid == mcid =>
807 {
808 api_invoc_result = Some(ApiInvocResult {
809 msg_cid: ctx.message.cid(),
810 msg: ctx.message.message().clone(),
811 msg_rct: Some(ctx.apply_ret.msg_receipt()),
812 error: ctx.apply_ret.failure_info().unwrap_or_default(),
813 duration: ctx.duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
814 gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
815 execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
816 .unwrap_or_default(),
817 });
818 anyhow::bail!(REPLAY_HALT);
819 }
820 _ => Ok(()), }
822 };
823 let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced);
824 if let Err(error_message) = result
825 && error_message.to_string() != REPLAY_HALT
826 {
827 return Err(Error::Other(format!(
828 "unexpected error during execution : {error_message:}"
829 )));
830 }
831 api_invoc_result.ok_or_else(|| Error::Other("failed to replay".into()))
832 }
833
834 pub fn eligible_to_mine(
837 &self,
838 address: &Address,
839 base_tipset: &Tipset,
840 lookback_tipset: &Tipset,
841 ) -> anyhow::Result<bool, Error> {
842 let hmp =
843 self.miner_has_min_power(&self.chain_config().policy, address, lookback_tipset)?;
844 let version = self.get_network_version(base_tipset.epoch());
845
846 if version <= NetworkVersion::V3 {
847 return Ok(hmp);
848 }
849
850 if !hmp {
851 return Ok(false);
852 }
853
854 let actor = self
855 .get_actor(&Address::POWER_ACTOR, *base_tipset.parent_state())?
856 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
857
858 let power_state = power::State::load(self.blockstore(), actor.code, actor.state)?;
859
860 let actor = self
861 .get_actor(address, *base_tipset.parent_state())?
862 .ok_or_else(|| Error::state("Miner actor address could not be resolved"))?;
863
864 let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
865
866 let claim = power_state
868 .miner_power(self.blockstore(), address)?
869 .ok_or_else(|| Error::Other("Could not get claim".to_string()))?;
870 if claim.quality_adj_power <= BigInt::zero() {
871 return Ok(false);
872 }
873
874 if !miner_state.fee_debt().is_zero() {
876 return Ok(false);
877 }
878
879 let info = miner_state.info(self.blockstore())?;
881 if base_tipset.epoch() <= info.consensus_fault_elapsed {
882 return Ok(false);
883 }
884
885 Ok(true)
886 }
887
888 #[instrument(skip_all)]
910 pub async fn compute_tipset_state(
911 self: &Arc<Self>,
912 tipset: Tipset,
913 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
914 enable_tracing: VMTrace,
915 ) -> Result<StateOutput, Error> {
916 let this = Arc::clone(self);
917 tokio::task::spawn_blocking(move || {
918 this.compute_tipset_state_blocking(tipset, callback, enable_tracing)
919 })
920 .await?
921 }
922
923 #[tracing::instrument(skip_all)]
925 pub fn compute_tipset_state_blocking(
926 &self,
927 tipset: Tipset,
928 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
929 enable_tracing: VMTrace,
930 ) -> Result<StateOutput, Error> {
931 let epoch = tipset.epoch();
932 let has_callback = callback.is_some();
933 Ok(apply_block_messages(
934 self.chain_store().genesis_block_header().timestamp,
935 Arc::clone(self.chain_index()),
936 Arc::clone(self.chain_config()),
937 self.beacon_schedule().clone(),
938 &self.engine,
939 tipset,
940 callback,
941 enable_tracing,
942 )
943 .map_err(|e| {
944 if has_callback {
945 e
946 } else {
947 anyhow::anyhow!("Failed to compute tipset state@{epoch}: {e}")
948 }
949 })?)
950 }
951
952 #[instrument(skip_all)]
953 pub async fn compute_state(
954 self: &Arc<Self>,
955 height: ChainEpoch,
956 messages: Vec<Message>,
957 tipset: Tipset,
958 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
959 enable_tracing: VMTrace,
960 ) -> Result<StateOutput, Error> {
961 let this = Arc::clone(self);
962 tokio::task::spawn_blocking(move || {
963 this.compute_state_blocking(height, messages, tipset, callback, enable_tracing)
964 })
965 .await?
966 }
967
968 #[tracing::instrument(skip_all)]
970 pub fn compute_state_blocking(
971 &self,
972 height: ChainEpoch,
973 messages: Vec<Message>,
974 tipset: Tipset,
975 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
976 enable_tracing: VMTrace,
977 ) -> Result<StateOutput, Error> {
978 Ok(compute_state(
979 height,
980 messages,
981 tipset,
982 self.chain_store().genesis_block_header().timestamp,
983 Arc::clone(self.chain_index()),
984 Arc::clone(self.chain_config()),
985 self.beacon_schedule().clone(),
986 &self.engine,
987 callback,
988 enable_tracing,
989 )?)
990 }
991
992 fn tipset_executed_message(
995 &self,
996 tipset: &Tipset,
997 message: &ChainMessage,
998 allow_replaced: bool,
999 ) -> Result<Option<Receipt>, Error> {
1000 if tipset.epoch() == 0 {
1001 return Ok(None);
1002 }
1003 let message_from_address = message.from();
1004 let message_sequence = message.sequence();
1005 let pts = self
1007 .chain_index()
1008 .load_required_tipset(tipset.parents())
1009 .map_err(|err| Error::Other(format!("Failed to load tipset: {err}")))?;
1010 let messages = self
1011 .cs
1012 .messages_for_tipset(&pts)
1013 .map_err(|err| Error::Other(format!("Failed to load messages for tipset: {err}")))?;
1014 messages
1015 .iter()
1016 .enumerate()
1017 .rev()
1019 .filter(|(_, s)| {
1020 s.sequence() == message_sequence
1021 && s.from() == message_from_address
1022 && s.equal_call(message)
1023 })
1024 .map(|(index, m)| {
1025 if !allow_replaced && message.cid() != m.cid(){
1029 Err(Error::Other(format!(
1030 "found message with equal nonce and call params but different CID. wanted {}, found: {}, nonce: {}, from: {}",
1031 message.cid(),
1032 m.cid(),
1033 message.sequence(),
1034 message.from(),
1035 )))
1036 } else {
1037 let block_header = tipset.block_headers().first();
1038 crate::chain::get_parent_receipt(
1039 self.blockstore(),
1040 block_header,
1041 index,
1042 )
1043 .map_err(|err| Error::Other(format!("Failed to get parent receipt (message_receipts={}, index={index}, error={err})", block_header.message_receipts)))
1044 }
1045 })
1046 .next()
1047 .unwrap_or(Ok(None))
1048 }
1049
1050 fn check_search(
1051 &self,
1052 mut current: Tipset,
1053 message: &ChainMessage,
1054 lookback_max_epoch: ChainEpoch,
1055 allow_replaced: bool,
1056 ) -> Result<Option<(Tipset, Receipt)>, Error> {
1057 let message_from_address = message.from();
1058 let message_sequence = message.sequence();
1059 let mut current_actor_state = self
1060 .get_required_actor(&message_from_address, *current.parent_state())
1061 .map_err(Error::state)?;
1062 let message_from_id = self.lookup_required_id(&message_from_address, ¤t)?;
1063
1064 while current.epoch() >= lookback_max_epoch {
1065 let parent_tipset = self
1066 .chain_index()
1067 .load_required_tipset(current.parents())
1068 .map_err(|err| {
1069 Error::Other(format!(
1070 "failed to load tipset during msg wait searchback: {err:}"
1071 ))
1072 })?;
1073
1074 let parent_actor_state = self
1075 .get_actor(&message_from_id, *parent_tipset.parent_state())
1076 .map_err(|e| Error::State(e.to_string()))?;
1077
1078 if parent_actor_state.is_none()
1079 || (current_actor_state.sequence > message_sequence
1080 && parent_actor_state.as_ref().unwrap().sequence <= message_sequence)
1081 {
1082 let receipt = self
1083 .tipset_executed_message(¤t, message, allow_replaced)?
1084 .context("Failed to get receipt with tipset_executed_message")?;
1085 return Ok(Some((current, receipt)));
1086 }
1087
1088 if let Some(parent_actor_state) = parent_actor_state {
1089 current = parent_tipset;
1090 current_actor_state = parent_actor_state;
1091 } else {
1092 break;
1093 }
1094 }
1095
1096 Ok(None)
1097 }
1098
1099 fn search_back_for_message(
1101 &self,
1102 current: Tipset,
1103 message: &ChainMessage,
1104 look_back_limit: Option<i64>,
1105 allow_replaced: Option<bool>,
1106 ) -> Result<Option<(Tipset, Receipt)>, Error> {
1107 let current_epoch = current.epoch();
1108 let allow_replaced = allow_replaced.unwrap_or(true);
1109
1110 let lookback_max_epoch = match look_back_limit {
1112 Some(0) => return Ok(None),
1114 Some(limit) if limit > 0 => (current_epoch - limit + 1).max(0),
1118 _ => 0,
1120 };
1121
1122 self.check_search(current, message, lookback_max_epoch, allow_replaced)
1123 }
1124
1125 pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result<Receipt, Error> {
1127 let m = crate::chain::get_chain_message(self.blockstore(), &msg)
1128 .map_err(|e| Error::Other(e.to_string()))?;
1129 let message_receipt = self.tipset_executed_message(&tipset, &m, true)?;
1130 if let Some(receipt) = message_receipt {
1131 return Ok(receipt);
1132 }
1133
1134 let maybe_tuple = self.search_back_for_message(tipset, &m, None, None)?;
1135 let message_receipt = maybe_tuple
1136 .ok_or_else(|| {
1137 Error::Other("Could not get receipt from search back message".to_string())
1138 })?
1139 .1;
1140 Ok(message_receipt)
1141 }
1142
1143 pub async fn wait_for_message(
1148 self: &Arc<Self>,
1149 msg_cid: Cid,
1150 confidence: i64,
1151 look_back_limit: Option<ChainEpoch>,
1152 allow_replaced: Option<bool>,
1153 ) -> Result<(Option<Tipset>, Option<Receipt>), Error> {
1154 let mut subscriber = self.cs.publisher().subscribe();
1155 let (sender, mut receiver) = oneshot::channel::<()>();
1156 let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1157 .map_err(|err| Error::Other(format!("failed to load message {err:}")))?;
1158 let current_tipset = self.heaviest_tipset();
1159 let maybe_message_receipt =
1160 self.tipset_executed_message(¤t_tipset, &message, true)?;
1161 if let Some(r) = maybe_message_receipt {
1162 return Ok((Some(current_tipset.clone()), Some(r)));
1163 }
1164
1165 let mut candidate_tipset: Option<Tipset> = None;
1166 let mut candidate_receipt: Option<Receipt> = None;
1167
1168 let sm_cloned = Arc::clone(self);
1169
1170 let message_for_task = message.clone();
1171 let height_of_head = current_tipset.epoch();
1172 let task = tokio::task::spawn(async move {
1173 let back_tuple = sm_cloned.search_back_for_message(
1174 current_tipset,
1175 &message_for_task,
1176 look_back_limit,
1177 allow_replaced,
1178 )?;
1179 sender
1180 .send(())
1181 .map_err(|e| Error::Other(format!("Could not send to channel {e:?}")))?;
1182 Ok::<_, Error>(back_tuple)
1183 });
1184
1185 let reverts: Arc<RwLock<HashMap<TipsetKey, bool>>> = Arc::new(RwLock::new(HashMap::new()));
1186 let block_revert = reverts.clone();
1187 let sm_cloned = Arc::clone(self);
1188
1189 let mut subscriber_poll = tokio::task::spawn(async move {
1191 loop {
1192 match subscriber.recv().await {
1193 Ok(subscriber) => match subscriber {
1194 HeadChange::Apply(tipset) => {
1195 if candidate_tipset
1196 .as_ref()
1197 .map(|s| tipset.epoch() >= s.epoch() + confidence)
1198 .unwrap_or_default()
1199 {
1200 return Ok((candidate_tipset, candidate_receipt));
1201 }
1202 let poll_receiver = receiver.try_recv();
1203 if let Ok(Some(_)) = poll_receiver {
1204 block_revert
1205 .write()
1206 .await
1207 .insert(tipset.key().to_owned(), true);
1208 }
1209
1210 let maybe_receipt =
1211 sm_cloned.tipset_executed_message(&tipset, &message, true)?;
1212 if let Some(receipt) = maybe_receipt {
1213 if confidence == 0 {
1214 return Ok((Some(tipset), Some(receipt)));
1215 }
1216 candidate_tipset = Some(tipset);
1217 candidate_receipt = Some(receipt)
1218 }
1219 }
1220 },
1221 Err(RecvError::Lagged(i)) => {
1222 warn!(
1223 "wait for message head change subscriber lagged, skipped {} events",
1224 i
1225 );
1226 }
1227 Err(RecvError::Closed) => break,
1228 }
1229 }
1230 Ok((None, None))
1231 })
1232 .fuse();
1233
1234 let mut search_back_poll = tokio::task::spawn(async move {
1236 let back_tuple = task.await.map_err(|e| {
1237 Error::Other(format!("Could not search backwards for message {e}"))
1238 })??;
1239 if let Some((back_tipset, back_receipt)) = back_tuple {
1240 let should_revert = *reverts
1241 .read()
1242 .await
1243 .get(back_tipset.key())
1244 .unwrap_or(&false);
1245 let larger_height_of_head = height_of_head >= back_tipset.epoch() + confidence;
1246 if !should_revert && larger_height_of_head {
1247 return Ok::<_, Error>((Some(back_tipset), Some(back_receipt)));
1248 }
1249 return Ok((None, None));
1250 }
1251 Ok((None, None))
1252 })
1253 .fuse();
1254
1255 loop {
1257 select! {
1258 res = subscriber_poll => {
1259 return res?
1260 }
1261 res = search_back_poll => {
1262 if let Ok((Some(ts), Some(rct))) = res? {
1263 return Ok((Some(ts), Some(rct)));
1264 }
1265 }
1266 }
1267 }
1268 }
1269
1270 pub async fn search_for_message(
1271 &self,
1272 from: Option<Tipset>,
1273 msg_cid: Cid,
1274 look_back_limit: Option<i64>,
1275 allow_replaced: Option<bool>,
1276 ) -> Result<Option<(Tipset, Receipt)>, Error> {
1277 let from = from.unwrap_or_else(|| self.heaviest_tipset());
1278 let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1279 .map_err(|err| Error::Other(format!("failed to load message {err}")))?;
1280 let current_tipset = self.heaviest_tipset();
1281 let maybe_message_receipt =
1282 self.tipset_executed_message(&from, &message, allow_replaced.unwrap_or(true))?;
1283 if let Some(r) = maybe_message_receipt {
1284 Ok(Some((from, r)))
1285 } else {
1286 self.search_back_for_message(current_tipset, &message, look_back_limit, allow_replaced)
1287 }
1288 }
1289
1290 pub fn get_bls_public_key(
1292 db: &Arc<DB>,
1293 addr: &Address,
1294 state_cid: Cid,
1295 ) -> Result<BlsPublicKey, Error> {
1296 let state = StateTree::new_from_root(Arc::clone(db), &state_cid)
1297 .map_err(|e| Error::Other(e.to_string()))?;
1298 let kaddr = resolve_to_key_addr(&state, db, addr)
1299 .map_err(|e| format!("Failed to resolve key address, error: {e}"))?;
1300
1301 match kaddr.into_payload() {
1302 Payload::BLS(key) => BlsPublicKey::from_bytes(&key)
1303 .map_err(|e| Error::Other(format!("Failed to construct bls public key: {e}"))),
1304 _ => Err(Error::state(
1305 "Address must be BLS address to load bls public key",
1306 )),
1307 }
1308 }
1309
1310 pub fn lookup_id(&self, addr: &Address, ts: &Tipset) -> Result<Option<Address>, Error> {
1312 let state_tree = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1313 .map_err(|e| format!("{e:?}"))?;
1314 Ok(state_tree
1315 .lookup_id(addr)
1316 .map_err(|e| Error::Other(e.to_string()))?
1317 .map(Address::new_id))
1318 }
1319
1320 pub fn lookup_required_id(&self, addr: &Address, ts: &Tipset) -> Result<Address, Error> {
1322 self.lookup_id(addr, ts)?
1323 .ok_or_else(|| Error::Other(format!("Failed to lookup the id address {addr}")))
1324 }
1325
1326 pub fn market_state(&self, ts: &Tipset) -> Result<market::State, Error> {
1328 let actor = self.get_required_actor(&Address::MARKET_ACTOR, *ts.parent_state())?;
1329 let market_state = market::State::load(self.blockstore(), actor.code, actor.state)?;
1330 Ok(market_state)
1331 }
1332
1333 pub fn market_balance(&self, addr: &Address, ts: &Tipset) -> Result<MarketBalance, Error> {
1335 let market_state = self.market_state(ts)?;
1336 let new_addr = self.lookup_required_id(addr, ts)?;
1337 let out = MarketBalance {
1338 escrow: {
1339 market_state
1340 .escrow_table(self.blockstore())?
1341 .get(&new_addr)?
1342 },
1343 locked: {
1344 market_state
1345 .locked_table(self.blockstore())?
1346 .get(&new_addr)?
1347 },
1348 };
1349
1350 Ok(out)
1351 }
1352
1353 pub fn miner_info(&self, addr: &Address, ts: &Tipset) -> Result<MinerInfo, Error> {
1355 let actor = self
1356 .get_actor(addr, *ts.parent_state())?
1357 .ok_or_else(|| Error::state("Miner actor not found"))?;
1358 let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1359
1360 Ok(state.info(self.blockstore())?)
1361 }
1362
1363 pub fn miner_faults(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1365 self.all_partition_sectors(addr, ts, |partition| partition.faulty_sectors().clone())
1366 }
1367
1368 pub fn miner_recoveries(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1370 self.all_partition_sectors(addr, ts, |partition| partition.recovering_sectors().clone())
1371 }
1372
1373 fn all_partition_sectors(
1374 &self,
1375 addr: &Address,
1376 ts: &Tipset,
1377 get_sector: impl Fn(Partition<'_>) -> BitField,
1378 ) -> Result<BitField, Error> {
1379 let actor = self
1380 .get_actor(addr, *ts.parent_state())?
1381 .ok_or_else(|| Error::state("Miner actor not found"))?;
1382
1383 let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1384
1385 let mut partitions = Vec::new();
1386
1387 state.for_each_deadline(
1388 &self.chain_config().policy,
1389 self.blockstore(),
1390 |_, deadline| {
1391 deadline.for_each(self.blockstore(), |_, partition| {
1392 partitions.push(get_sector(partition));
1393 Ok(())
1394 })
1395 },
1396 )?;
1397
1398 Ok(BitField::union(partitions.iter()))
1399 }
1400
1401 pub fn miner_power(&self, addr: &Address, ts: &Tipset) -> Result<MinerPower, Error> {
1403 if let Some((miner_power, total_power)) = self.get_power(ts.parent_state(), Some(addr))? {
1404 return Ok(MinerPower {
1405 miner_power,
1406 total_power,
1407 has_min_power: true,
1408 });
1409 }
1410
1411 Ok(MinerPower {
1412 has_min_power: false,
1413 miner_power: Default::default(),
1414 total_power: Default::default(),
1415 })
1416 }
1417
1418 pub async fn resolve_to_key_addr(
1421 self: &Arc<Self>,
1422 addr: &Address,
1423 ts: &Tipset,
1424 ) -> Result<Address, anyhow::Error> {
1425 match addr.protocol() {
1426 Protocol::BLS | Protocol::Secp256k1 | Protocol::Delegated => return Ok(*addr),
1427 Protocol::Actor => {
1428 return Err(Error::Other(
1429 "cannot resolve actor address to key address".to_string(),
1430 )
1431 .into());
1432 }
1433 _ => {}
1434 };
1435
1436 let state = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())?;
1439 if let Ok(addr) = resolve_to_key_addr(&state, self.blockstore(), addr) {
1440 return Ok(addr);
1441 }
1442
1443 let (st, _) = self.tipset_state(ts, StateLookupPolicy::Enabled).await?;
1445 let state = StateTree::new_from_root(self.blockstore_owned(), &st)?;
1446
1447 resolve_to_key_addr(&state, self.blockstore(), addr)
1448 }
1449
1450 pub async fn miner_get_base_info(
1451 self: &Arc<Self>,
1452 beacon_schedule: &BeaconSchedule,
1453 tipset: Tipset,
1454 addr: Address,
1455 epoch: ChainEpoch,
1456 ) -> anyhow::Result<Option<MiningBaseInfo>> {
1457 let prev_beacon = self
1458 .chain_store()
1459 .chain_index()
1460 .latest_beacon_entry(tipset.clone())?;
1461
1462 let entries: Vec<BeaconEntry> = beacon_schedule
1463 .beacon_entries_for_block(
1464 self.chain_config().network_version(epoch),
1465 epoch,
1466 tipset.epoch(),
1467 &prev_beacon,
1468 )
1469 .await?;
1470
1471 let base = entries.last().unwrap_or(&prev_beacon);
1472
1473 let (lb_tipset, lb_state_root) = ChainStore::get_lookback_tipset_for_round(
1474 self.chain_index(),
1475 self.chain_config(),
1476 &tipset,
1477 epoch,
1478 )?;
1479
1480 let actor = self.get_required_actor(&addr, *tipset.parent_state())?;
1485 if self.get_actor(&addr, lb_state_root)?.is_none() {
1486 return Ok(None);
1487 }
1488
1489 let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1490
1491 let addr_buf = to_vec(&addr)?;
1492 let rand = draw_randomness(
1493 base.signature(),
1494 DomainSeparationTag::WinningPoStChallengeSeed as i64,
1495 epoch,
1496 &addr_buf,
1497 )?;
1498
1499 let network_version = self.chain_config().network_version(tipset.epoch());
1500 let sectors = self.get_sectors_for_winning_post(
1501 &lb_state_root,
1502 network_version,
1503 &addr,
1504 Randomness::new(rand.to_vec()),
1505 )?;
1506
1507 if sectors.is_empty() {
1508 return Ok(None);
1509 }
1510
1511 let (miner_power, total_power) = self
1512 .get_power(&lb_state_root, Some(&addr))?
1513 .context("failed to get power")?;
1514
1515 let info = miner_state.info(self.blockstore())?;
1516
1517 let worker_key = self
1518 .resolve_to_deterministic_address(info.worker, &tipset)
1519 .await?;
1520 let eligible = self.eligible_to_mine(&addr, &tipset, &lb_tipset)?;
1521
1522 Ok(Some(MiningBaseInfo {
1523 miner_power: miner_power.quality_adj_power,
1524 network_power: total_power.quality_adj_power,
1525 sectors,
1526 worker_key,
1527 sector_size: info.sector_size,
1528 prev_beacon_entry: prev_beacon,
1529 beacon_entries: entries,
1530 eligible_for_mining: eligible,
1531 }))
1532 }
1533
1534 pub fn miner_has_min_power(
1537 &self,
1538 policy: &Policy,
1539 addr: &Address,
1540 ts: &Tipset,
1541 ) -> anyhow::Result<bool> {
1542 let actor = self
1543 .get_actor(&Address::POWER_ACTOR, *ts.parent_state())?
1544 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
1545 let ps = power::State::load(self.blockstore(), actor.code, actor.state)?;
1546
1547 ps.miner_nominal_power_meets_consensus_minimum(policy, self.blockstore(), addr)
1548 }
1549
1550 #[tracing::instrument(skip(self))]
1574 pub fn validate_range(&self, epochs: RangeInclusive<i64>) -> anyhow::Result<()> {
1575 let heaviest = self.heaviest_tipset();
1576 let heaviest_epoch = heaviest.epoch();
1577 let end = self
1578 .chain_index()
1579 .tipset_by_height(*epochs.end(), heaviest, ResolveNullTipset::TakeOlder)
1580 .with_context(|| {
1581 format!(
1582 "couldn't get a tipset at height {} behind heaviest tipset at height {heaviest_epoch}",
1583 *epochs.end(),
1584 )
1585 })?;
1586
1587 let tipsets = self
1589 .chain_index()
1590 .chain(end)
1591 .take_while(|tipset| tipset.epoch() >= *epochs.start());
1592
1593 self.validate_tipsets(tipsets)
1594 }
1595
1596 pub fn validate_tipsets<T>(&self, tipsets: T) -> anyhow::Result<()>
1597 where
1598 T: Iterator<Item = Tipset> + Send,
1599 {
1600 let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1601 validate_tipsets(
1602 genesis_timestamp,
1603 self.chain_index().clone(),
1604 self.chain_config().clone(),
1605 self.beacon_schedule().clone(),
1606 &self.engine,
1607 tipsets,
1608 )
1609 }
1610
1611 pub fn get_verified_registry_actor_state(
1612 &self,
1613 ts: &Tipset,
1614 ) -> anyhow::Result<verifreg::State> {
1615 let act = self
1616 .get_actor(&Address::VERIFIED_REGISTRY_ACTOR, *ts.parent_state())
1617 .map_err(Error::state)?
1618 .ok_or_else(|| Error::state("actor not found"))?;
1619 verifreg::State::load(self.blockstore(), act.code, act.state)
1620 }
1621 pub fn get_claim(
1622 &self,
1623 addr: &Address,
1624 ts: &Tipset,
1625 claim_id: ClaimID,
1626 ) -> anyhow::Result<Option<Claim>> {
1627 let id_address = self.lookup_required_id(addr, ts)?;
1628 let state = self.get_verified_registry_actor_state(ts)?;
1629 state.get_claim(self.blockstore(), id_address, claim_id)
1630 }
1631
1632 pub fn get_all_claims(&self, ts: &Tipset) -> anyhow::Result<HashMap<ClaimID, Claim>> {
1633 let state = self.get_verified_registry_actor_state(ts)?;
1634 state.get_all_claims(self.blockstore())
1635 }
1636
1637 pub fn get_allocation(
1638 &self,
1639 addr: &Address,
1640 ts: &Tipset,
1641 allocation_id: AllocationID,
1642 ) -> anyhow::Result<Option<Allocation>> {
1643 let id_address = self.lookup_required_id(addr, ts)?;
1644 let state = self.get_verified_registry_actor_state(ts)?;
1645 state.get_allocation(self.blockstore(), id_address.id()?, allocation_id)
1646 }
1647
1648 pub fn get_all_allocations(
1649 &self,
1650 ts: &Tipset,
1651 ) -> anyhow::Result<HashMap<AllocationID, Allocation>> {
1652 let state = self.get_verified_registry_actor_state(ts)?;
1653 state.get_all_allocations(self.blockstore())
1654 }
1655
1656 pub fn verified_client_status(
1657 &self,
1658 addr: &Address,
1659 ts: &Tipset,
1660 ) -> anyhow::Result<Option<DataCap>> {
1661 let id = self.lookup_required_id(addr, ts)?;
1662 let network_version = self.get_network_version(ts.epoch());
1663
1664 if (u32::from(network_version.0)) < 17 {
1668 let state = self.get_verified_registry_actor_state(ts)?;
1669 return state.verified_client_data_cap(self.blockstore(), id);
1670 }
1671
1672 let act = self
1673 .get_actor(&Address::DATACAP_TOKEN_ACTOR, *ts.parent_state())
1674 .map_err(Error::state)?
1675 .ok_or_else(|| Error::state("Miner actor not found"))?;
1676
1677 let state = datacap::State::load(self.blockstore(), act.code, act.state)?;
1678
1679 state.verified_client_data_cap(self.blockstore(), id)
1680 }
1681
1682 pub async fn resolve_to_deterministic_address(
1683 self: &Arc<Self>,
1684 address: Address,
1685 ts: &Tipset,
1686 ) -> anyhow::Result<Address> {
1687 use crate::shim::address::Protocol::*;
1688 match address.protocol() {
1689 BLS | Secp256k1 | Delegated => Ok(address),
1690 Actor => anyhow::bail!("cannot resolve actor address to key address"),
1691 _ => {
1692 if let Ok(state) =
1694 StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1695 && let Ok(address) = state
1696 .resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
1697 {
1698 return Ok(address);
1699 }
1700
1701 let (state_root, _) = self.tipset_state(ts, StateLookupPolicy::Enabled).await?;
1703 let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
1704 state.resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
1705 }
1706 }
1707 }
1708
1709 pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec<ApiInvocResult>)> {
1710 let mut invoc_trace = vec![];
1711
1712 let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1713
1714 let callback = |ctx: MessageCallbackCtx<'_>| {
1715 match ctx.at {
1716 CalledAt::Applied | CalledAt::Reward => {
1717 invoc_trace.push(ApiInvocResult {
1718 msg_cid: ctx.message.cid(),
1719 msg: ctx.message.message().clone(),
1720 msg_rct: Some(ctx.apply_ret.msg_receipt()),
1721 error: ctx.apply_ret.failure_info().unwrap_or_default(),
1722 duration: ctx.duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
1723 gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
1724 execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
1725 .unwrap_or_default(),
1726 });
1727 Ok(())
1728 }
1729 _ => Ok(()), }
1731 };
1732
1733 let StateOutput { state_root, .. } = apply_block_messages(
1734 genesis_timestamp,
1735 self.chain_index().clone(),
1736 self.chain_config().clone(),
1737 self.beacon_schedule().clone(),
1738 &self.engine,
1739 tipset.clone(),
1740 Some(callback),
1741 VMTrace::Traced,
1742 )?;
1743
1744 Ok((state_root, invoc_trace))
1745 }
1746
1747 fn try_lookup_state_from_next_tipset(&self, tipset: &Tipset) -> Option<StateOutputValue> {
1751 let epoch = tipset.epoch();
1752 let next_epoch = epoch + 1;
1753
1754 let heaviest = self.heaviest_tipset();
1756 if next_epoch > heaviest.epoch() {
1757 return None;
1758 }
1759
1760 if let Ok(next_tipset) =
1762 self.chain_index()
1763 .tipset_by_height(next_epoch, heaviest, ResolveNullTipset::TakeNewer)
1764 {
1765 if !next_tipset.parents().eq(tipset.key()) {
1767 return None;
1768 }
1769
1770 let state_root = next_tipset.parent_state();
1771 let receipt_root = next_tipset.min_ticket_block().message_receipts;
1772
1773 if self.blockstore().has(state_root).unwrap_or(false)
1774 && self.blockstore().has(&receipt_root).unwrap_or(false)
1775 {
1776 return Some(StateOutputValue {
1777 state_root: state_root.into(),
1778 receipt_root,
1779 });
1780 }
1781 }
1782
1783 None
1784 }
1785}
1786
1787pub fn validate_tipsets<DB, T>(
1788 genesis_timestamp: u64,
1789 chain_index: Arc<ChainIndex<Arc<DB>>>,
1790 chain_config: Arc<ChainConfig>,
1791 beacon: Arc<BeaconSchedule>,
1792 engine: &MultiEngine,
1793 tipsets: T,
1794) -> anyhow::Result<()>
1795where
1796 DB: Blockstore + Send + Sync + 'static,
1797 T: Iterator<Item = Tipset> + Send,
1798{
1799 use rayon::iter::ParallelIterator as _;
1800 tipsets
1801 .tuple_windows()
1802 .par_bridge()
1803 .try_for_each(|(child, parent)| {
1804 info!(height = parent.epoch(), "compute parent state");
1805 let StateOutput {
1806 state_root: actual_state,
1807 receipt_root: actual_receipt,
1808 ..
1809 } = apply_block_messages(
1810 genesis_timestamp,
1811 chain_index.clone(),
1812 chain_config.clone(),
1813 beacon.clone(),
1814 engine,
1815 parent,
1816 NO_CALLBACK,
1817 VMTrace::NotTraced,
1818 )
1819 .map_err(|e| anyhow::anyhow!("couldn't compute tipset state: {e}"))?;
1820 let expected_receipt = child.min_ticket_block().message_receipts;
1821 let expected_state = child.parent_state();
1822 match (expected_state, expected_receipt) == (&actual_state, actual_receipt) {
1823 true => Ok(()),
1824 false => {
1825 error!(
1826 height = child.epoch(),
1827 ?expected_state,
1828 ?expected_receipt,
1829 ?actual_state,
1830 ?actual_receipt,
1831 "state mismatch"
1832 );
1833 bail!("state mismatch");
1834 }
1835 }
1836 })
1837}
1838
1839#[allow(clippy::too_many_arguments)]
1916pub fn apply_block_messages<DB>(
1917 genesis_timestamp: u64,
1918 chain_index: Arc<ChainIndex<Arc<DB>>>,
1919 chain_config: Arc<ChainConfig>,
1920 beacon: Arc<BeaconSchedule>,
1921 engine: &MultiEngine,
1922 tipset: Tipset,
1923 mut callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
1924 enable_tracing: VMTrace,
1925) -> anyhow::Result<StateOutput>
1926where
1927 DB: Blockstore + Send + Sync + 'static,
1928{
1929 if tipset.epoch() == 0 {
1938 let message_receipts = tipset.min_ticket_block().message_receipts;
1943 return Ok(StateOutput {
1944 state_root: *tipset.parent_state(),
1945 receipt_root: message_receipts,
1946 events: vec![],
1947 events_roots: vec![],
1948 });
1949 }
1950
1951 let rand = ChainRand::new(
1952 Arc::clone(&chain_config),
1953 tipset.clone(),
1954 Arc::clone(&chain_index),
1955 beacon,
1956 );
1957
1958 let genesis_info = GenesisInfo::from_chain_config(chain_config.clone());
1959 let create_vm = |state_root: Cid, epoch, timestamp| {
1960 let circulating_supply =
1961 genesis_info.get_vm_circulating_supply(epoch, chain_index.db(), &state_root)?;
1962 VM::new(
1963 ExecutionContext {
1964 heaviest_tipset: tipset.clone(),
1965 state_tree_root: state_root,
1966 epoch,
1967 rand: Box::new(rand.clone()),
1968 base_fee: tipset.min_ticket_block().parent_base_fee.clone(),
1969 circ_supply: circulating_supply,
1970 chain_config: Arc::clone(&chain_config),
1971 chain_index: Arc::clone(&chain_index),
1972 timestamp,
1973 },
1974 engine,
1975 enable_tracing,
1976 )
1977 };
1978
1979 let mut parent_state = *tipset.parent_state();
1980
1981 let parent_epoch = Tipset::load_required(chain_index.db(), tipset.parents())?.epoch();
1982 let epoch = tipset.epoch();
1983
1984 for epoch_i in parent_epoch..epoch {
1985 if epoch_i > parent_epoch {
1986 let timestamp = genesis_timestamp + ((EPOCH_DURATION_SECONDS * epoch_i) as u64);
1988
1989 parent_state = stacker::grow(64 << 20, || -> anyhow::Result<Cid> {
1992 let mut vm = create_vm(parent_state, epoch_i, timestamp)?;
1993 if let Err(e) = vm.run_cron(epoch_i, callback.as_mut()) {
1995 error!("Beginning of epoch cron failed to run: {}", e);
1996 }
1997 vm.flush()
1998 })?;
1999 }
2000
2001 if let Some(new_state) =
2003 run_state_migrations(epoch_i, &chain_config, chain_index.db(), &parent_state)?
2004 {
2005 parent_state = new_state;
2006 }
2007 }
2008
2009 let block_messages = BlockMessages::for_tipset(chain_index.db(), &tipset)
2010 .map_err(|e| Error::Other(e.to_string()))?;
2011
2012 stacker::grow(64 << 20, || -> anyhow::Result<StateOutput> {
2015 let mut vm = create_vm(parent_state, epoch, tipset.min_timestamp())?;
2016
2017 let (receipts, events, events_roots) =
2019 vm.apply_block_messages(&block_messages, epoch, callback)?;
2020
2021 let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts)?;
2023
2024 for (msg_events, events_root) in events.iter().zip(events_roots.iter()) {
2026 if let Some(event_root) = events_root {
2027 let derived_event_root = Amt::new_from_iter_with_bit_width(
2029 chain_index.db(),
2030 EVENTS_AMT_BITWIDTH,
2031 msg_events.iter(),
2032 )
2033 .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;
2034
2035 ensure!(
2037 derived_event_root.eq(event_root),
2038 "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
2039 );
2040 }
2041 }
2042
2043 let state_root = vm.flush()?;
2044
2045 Ok(StateOutput {
2046 state_root,
2047 receipt_root,
2048 events,
2049 events_roots,
2050 })
2051 })
2052}
2053
2054#[allow(clippy::too_many_arguments)]
2055pub fn compute_state<DB>(
2056 _height: ChainEpoch,
2057 messages: Vec<Message>,
2058 tipset: Tipset,
2059 genesis_timestamp: u64,
2060 chain_index: Arc<ChainIndex<Arc<DB>>>,
2061 chain_config: Arc<ChainConfig>,
2062 beacon: Arc<BeaconSchedule>,
2063 engine: &MultiEngine,
2064 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
2065 enable_tracing: VMTrace,
2066) -> anyhow::Result<StateOutput>
2067where
2068 DB: Blockstore + Send + Sync + 'static,
2069{
2070 if !messages.is_empty() {
2071 anyhow::bail!("Applying messages is not yet implemented.");
2072 }
2073
2074 let output = apply_block_messages(
2075 genesis_timestamp,
2076 chain_index,
2077 chain_config,
2078 beacon,
2079 engine,
2080 tipset,
2081 callback,
2082 enable_tracing,
2083 )?;
2084
2085 Ok(output)
2086}
2087
2088#[derive(Debug, Copy, Clone, Default)]
2090pub enum StateLookupPolicy {
2091 #[default]
2092 Enabled,
2093 Disabled,
2094}