1#[cfg(test)]
5mod tests;
6
7mod cache;
8pub mod chain_rand;
9pub mod circulating_supply;
10mod errors;
11pub mod utils;
12
13pub use self::errors::*;
14use self::utils::structured;
15
16use crate::beacon::{BeaconEntry, BeaconSchedule};
17use crate::blocks::{Tipset, TipsetKey};
18use crate::chain::{
19 ChainStore, HeadChange,
20 index::{ChainIndex, ResolveNullTipset},
21};
22use crate::interpreter::{
23 BlockMessages, CalledAt, ExecutionContext, IMPLICIT_MESSAGE_GAS_LIMIT, VM, resolve_to_key_addr,
24};
25use crate::interpreter::{MessageCallbackCtx, VMTrace};
26use crate::lotus_json::{LotusJson, lotus_json_with_self};
27use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage};
28use crate::networks::ChainConfig;
29use crate::rpc::state::{ApiInvocResult, InvocResult, MessageGasCost};
30use crate::rpc::types::{MiningBaseInfo, SectorOnChainInfo};
31use crate::shim::actors::init::{self, State};
32use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition};
33use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim};
34use crate::shim::actors::*;
35use crate::shim::crypto::{Signature, SignatureType};
36use crate::shim::{
37 actors::{
38 LoadActorStateFromBlockstore, miner::ext::MinerStateExt as _,
39 verifreg::ext::VerifiedRegistryStateExt as _,
40 },
41 executor::{ApplyRet, Receipt, StampedEvent},
42};
43use crate::shim::{
44 address::{Address, Payload, Protocol},
45 clock::ChainEpoch,
46 econ::TokenAmount,
47 machine::{GLOBAL_MULTI_ENGINE, MultiEngine},
48 message::Message,
49 randomness::Randomness,
50 runtime::Policy,
51 state_tree::{ActorState, StateTree},
52 version::NetworkVersion,
53};
54use crate::state_manager::cache::{
55 DisabledTipsetDataCache, EnabledTipsetDataCache, TipsetReceiptEventCacheHandler,
56 TipsetStateCache,
57};
58use crate::state_manager::chain_rand::draw_randomness;
59use crate::state_migration::run_state_migrations;
60use crate::utils::get_size::{
61 GetSize, vec_heap_size_helper, vec_with_stack_only_item_heap_size_helper,
62};
63use ahash::{HashMap, HashMapExt};
64use anyhow::{Context as _, bail, ensure};
65use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _};
66use chain_rand::ChainRand;
67use cid::Cid;
68pub use circulating_supply::GenesisInfo;
69use fil_actor_verifreg_state::v12::DataCap;
70use fil_actor_verifreg_state::v13::ClaimID;
71use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0};
72use fil_actors_shared::fvm_ipld_bitfield::BitField;
73use fil_actors_shared::v12::runtime::DomainSeparationTag;
74use futures::{FutureExt, channel::oneshot, select};
75use fvm_ipld_blockstore::Blockstore;
76use fvm_ipld_encoding::to_vec;
77use fvm_shared4::crypto::signature::SECP_SIG_LEN;
78use itertools::Itertools as _;
79use nonzero_ext::nonzero;
80use num::BigInt;
81use num_traits::identities::Zero;
82use rayon::prelude::ParallelBridge;
83use schemars::JsonSchema;
84use serde::{Deserialize, Serialize};
85use std::ops::RangeInclusive;
86use std::time::Duration;
87use std::{num::NonZeroUsize, sync::Arc};
88use tokio::sync::{RwLock, broadcast::error::RecvError};
89use tracing::{error, info, instrument, trace, warn};
90
91const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
92pub const EVENTS_AMT_BITWIDTH: u32 = 5;
93
94type CidPair = (Cid, Cid);
96
97#[derive(Debug, Clone, GetSize)] pub struct StateEvents {
99 #[get_size(size_fn = vec_heap_size_helper)]
100 pub events: Vec<Vec<StampedEvent>>,
101 #[get_size(size_fn = vec_with_stack_only_item_heap_size_helper)]
102 pub roots: Vec<Option<Cid>>,
103}
104
105#[derive(Clone)]
106pub struct StateOutput {
107 pub state_root: Cid,
108 pub receipt_root: Cid,
109 pub events: Vec<Vec<StampedEvent>>,
110 pub events_roots: Vec<Option<Cid>>,
111}
112
113#[derive(Debug, Default, Clone, GetSize)]
114pub struct StateOutputValue {
115 #[get_size(ignore)]
116 pub state_root: Cid,
117 #[get_size(ignore)]
118 pub receipt_root: Cid,
119}
120
121impl From<StateOutputValue> for StateOutput {
122 fn from(value: StateOutputValue) -> Self {
123 Self {
124 state_root: value.state_root,
125 receipt_root: value.receipt_root,
126 events: vec![],
127 events_roots: vec![],
128 }
129 }
130}
131
132impl From<StateOutput> for StateOutputValue {
133 fn from(value: StateOutput) -> Self {
134 StateOutputValue {
135 state_root: value.state_root,
136 receipt_root: value.receipt_root,
137 }
138 }
139}
140
141#[derive(
143 Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema,
144)]
145#[serde(rename_all = "PascalCase")]
146pub struct MarketBalance {
147 #[schemars(with = "LotusJson<TokenAmount>")]
148 #[serde(with = "crate::lotus_json")]
149 pub escrow: TokenAmount,
150 #[schemars(with = "LotusJson<TokenAmount>")]
151 #[serde(with = "crate::lotus_json")]
152 pub locked: TokenAmount,
153}
154lotus_json_with_self!(MarketBalance);
155
156pub struct StateManager<DB> {
162 cs: Arc<ChainStore<DB>>,
164 cache: TipsetStateCache<StateOutputValue>,
166 beacon: Arc<crate::beacon::BeaconSchedule>,
167 engine: Arc<MultiEngine>,
168 receipt_event_cache_handler: Box<dyn TipsetReceiptEventCacheHandler>,
170}
171
172#[allow(clippy::type_complexity)]
173pub const NO_CALLBACK: Option<fn(MessageCallbackCtx<'_>) -> anyhow::Result<()>> = None;
174
175impl<DB> StateManager<DB>
176where
177 DB: Blockstore,
178{
179 pub fn new(cs: Arc<ChainStore<DB>>) -> Result<Self, anyhow::Error> {
180 Self::new_with_engine(cs, GLOBAL_MULTI_ENGINE.clone())
181 }
182
183 pub fn new_with_engine(
184 cs: Arc<ChainStore<DB>>,
185 engine: Arc<MultiEngine>,
186 ) -> Result<Self, anyhow::Error> {
187 let genesis = cs.genesis_block_header();
188 let beacon = Arc::new(cs.chain_config().get_beacon_schedule(genesis.timestamp));
189
190 let cache_handler: Box<dyn TipsetReceiptEventCacheHandler> =
191 if cs.chain_config().enable_receipt_event_caching {
192 Box::new(EnabledTipsetDataCache::new())
193 } else {
194 Box::new(DisabledTipsetDataCache::new())
195 };
196
197 Ok(Self {
198 cs,
199 cache: TipsetStateCache::new("state_output"), beacon,
201 engine,
202 receipt_event_cache_handler: cache_handler,
203 })
204 }
205
206 pub fn heaviest_tipset(&self) -> Tipset {
208 self.chain_store().heaviest_tipset()
209 }
210
211 pub fn maybe_rewind_heaviest_tipset(&self) -> anyhow::Result<()> {
216 while self.maybe_rewind_heaviest_tipset_once()? {}
217 Ok(())
218 }
219
220 fn maybe_rewind_heaviest_tipset_once(&self) -> anyhow::Result<bool> {
221 let head = self.heaviest_tipset();
222 if let Some(info) = self
223 .chain_config()
224 .network_height_with_actor_bundle(head.epoch())
225 {
226 let expected_height_info = info.info;
227 let expected_bundle = info.manifest(self.blockstore())?;
228 let expected_bundle_metadata = expected_bundle.metadata()?;
229 let state = self.get_state_tree(head.parent_state())?;
230 let bundle_metadata = state.get_actor_bundle_metadata()?;
231 if expected_bundle_metadata != bundle_metadata {
232 let current_epoch = head.epoch();
233 let target_head = self.chain_index().tipset_by_height(
234 (expected_height_info.epoch - 1).max(0),
235 head,
236 ResolveNullTipset::TakeOlder,
237 )?;
238 let target_epoch = target_head.epoch();
239 let bundle_version = &bundle_metadata.version;
240 let expected_bundle_version = &expected_bundle_metadata.version;
241 if target_epoch < current_epoch {
242 tracing::warn!(
243 "rewinding chain head from {current_epoch} to {target_epoch}, actor bundle: {bundle_version}, expected: {expected_bundle_version}"
244 );
245 if self.blockstore().has(target_head.parent_state())? {
246 self.chain_store().set_heaviest_tipset(target_head)?;
247 return Ok(true);
248 } else {
249 anyhow::bail!(
250 "failed to rewind, state tree @ {target_epoch} is missing from blockstore: {}",
251 target_head.parent_state()
252 );
253 }
254 }
255 }
256 }
257 Ok(false)
258 }
259
260 pub fn populate_cache(&self) {
265 for (child, parent) in self
266 .chain_index()
267 .chain(self.heaviest_tipset())
268 .tuple_windows()
269 .take(DEFAULT_TIPSET_CACHE_SIZE.into())
270 {
271 let key = parent.key();
272 let state_root = child.min_ticket_block().state_root;
273 let receipt_root = child.min_ticket_block().message_receipts;
274 self.cache.insert(
275 key.clone(),
276 StateOutputValue {
277 state_root,
278 receipt_root,
279 },
280 );
281 if let Ok(receipts) = Receipt::get_receipts(self.blockstore(), receipt_root)
282 && !receipts.is_empty()
283 {
284 self.receipt_event_cache_handler
285 .insert_receipt(key, receipts);
286 }
287 }
288 }
289
290 pub fn beacon_schedule(&self) -> &Arc<BeaconSchedule> {
291 &self.beacon
292 }
293
294 pub fn get_network_version(&self, epoch: ChainEpoch) -> NetworkVersion {
296 self.chain_config().network_version(epoch)
297 }
298
299 pub fn get_state_tree(&self, state_cid: &Cid) -> anyhow::Result<StateTree<DB>> {
301 StateTree::new_from_root(self.blockstore_owned(), state_cid)
302 }
303
304 pub fn get_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<Option<ActorState>> {
306 let state = self.get_state_tree(&state_cid)?;
307 state.get_actor(addr)
308 }
309
310 pub fn get_actor_state<S: LoadActorStateFromBlockstore>(
312 &self,
313 ts: &Tipset,
314 ) -> anyhow::Result<S> {
315 let state_tree = self.get_state_tree(ts.parent_state())?;
316 state_tree.get_actor_state()
317 }
318
319 pub fn get_actor_state_from_address<S: LoadActorStateFromBlockstore>(
321 &self,
322 ts: &Tipset,
323 actor_address: &Address,
324 ) -> anyhow::Result<S> {
325 let state_tree = self.get_state_tree(ts.parent_state())?;
326 state_tree.get_actor_state_from_address(actor_address)
327 }
328
329 pub fn get_required_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<ActorState> {
331 let state = self.get_state_tree(&state_cid)?;
332 state.get_actor(addr)?.with_context(|| {
333 format!("Failed to load actor with addr={addr}, state_cid={state_cid}")
334 })
335 }
336
337 pub fn blockstore(&self) -> &Arc<DB> {
339 self.cs.blockstore()
340 }
341
342 pub fn blockstore_owned(&self) -> Arc<DB> {
343 self.blockstore().clone()
344 }
345
346 pub fn chain_store(&self) -> &Arc<ChainStore<DB>> {
348 &self.cs
349 }
350
351 pub fn chain_index(&self) -> &Arc<ChainIndex<Arc<DB>>> {
353 self.cs.chain_index()
354 }
355
356 pub fn chain_config(&self) -> &Arc<ChainConfig> {
358 self.cs.chain_config()
359 }
360
361 pub fn chain_rand(&self, tipset: Tipset) -> ChainRand<DB> {
362 ChainRand::new(
363 self.chain_config().clone(),
364 tipset,
365 self.chain_index().clone(),
366 self.beacon.clone(),
367 )
368 }
369
370 pub fn get_network_state_name(
372 &self,
373 state_cid: Cid,
374 ) -> anyhow::Result<crate::networks::StateNetworkName> {
375 let init_act = self
376 .get_actor(&init::ADDRESS.into(), state_cid)?
377 .ok_or_else(|| Error::state("Init actor address could not be resolved"))?;
378 Ok(
379 State::load(self.blockstore(), init_act.code, init_act.state)?
380 .into_network_name()
381 .into(),
382 )
383 }
384
385 pub fn is_miner_slashed(&self, addr: &Address, state_cid: &Cid) -> anyhow::Result<bool, Error> {
387 let actor = self
388 .get_actor(&Address::POWER_ACTOR, *state_cid)?
389 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
390
391 let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
392
393 Ok(spas.miner_power(self.blockstore(), addr)?.is_none())
394 }
395
396 pub fn get_miner_work_addr(&self, state_cid: Cid, addr: &Address) -> Result<Address, Error> {
398 let state =
399 StateTree::new_from_root(self.blockstore_owned(), &state_cid).map_err(Error::other)?;
400 let ms: miner::State = state.get_actor_state_from_address(addr)?;
401 let info = ms.info(self.blockstore()).map_err(|e| e.to_string())?;
402 let addr = resolve_to_key_addr(&state, self.blockstore(), &info.worker())?;
403 Ok(addr)
404 }
405
406 pub fn get_power(
409 &self,
410 state_cid: &Cid,
411 addr: Option<&Address>,
412 ) -> anyhow::Result<Option<(power::Claim, power::Claim)>, Error> {
413 let actor = self
414 .get_actor(&Address::POWER_ACTOR, *state_cid)?
415 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
416
417 let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
418
419 let t_pow = spas.total_power();
420
421 if let Some(maddr) = addr {
422 let m_pow = spas
423 .miner_power(self.blockstore(), maddr)?
424 .ok_or_else(|| Error::state(format!("Miner for address {maddr} not found")))?;
425
426 let min_pow = spas.miner_nominal_power_meets_consensus_minimum(
427 &self.chain_config().policy,
428 self.blockstore(),
429 maddr,
430 )?;
431 if min_pow {
432 return Ok(Some((m_pow, t_pow)));
433 }
434 }
435
436 Ok(None)
437 }
438
439 pub fn get_all_sectors(
441 &self,
442 addr: &Address,
443 ts: &Tipset,
444 ) -> anyhow::Result<Vec<SectorOnChainInfo>> {
445 let actor = self
446 .get_actor(addr, *ts.parent_state())?
447 .ok_or_else(|| Error::state("Miner actor not found"))?;
448 let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
449 state.load_sectors_ext(self.blockstore(), None)
450 }
451}
452
453impl<DB> StateManager<DB>
454where
455 DB: Blockstore + Send + Sync + 'static,
456{
457 pub async fn tipset_state(
461 self: &Arc<Self>,
462 tipset: &Tipset,
463 state_lookup: StateLookupPolicy,
464 ) -> anyhow::Result<CidPair> {
465 let StateOutput {
466 state_root,
467 receipt_root,
468 ..
469 } = self.tipset_state_output(tipset, state_lookup).await?;
470 Ok((state_root, receipt_root))
471 }
472
473 pub async fn tipset_state_output(
474 self: &Arc<Self>,
475 tipset: &Tipset,
476 state_lookup: StateLookupPolicy,
477 ) -> anyhow::Result<StateOutput> {
478 let key = tipset.key();
479 self.cache
480 .get_or_else(key, || async move {
481 info!(
482 "Evaluating tipset: EPOCH={}, blocks={}, tsk={}",
483 tipset.epoch(),
484 tipset.len(),
485 tipset.key(),
486 );
487
488 if matches!(state_lookup, StateLookupPolicy::Enabled)
491 && let Some(state_from_child) = self.try_lookup_state_from_next_tipset(tipset)
492 {
493 return Ok(state_from_child);
494 }
495
496 trace!("Computing state for tipset at epoch {}", tipset.epoch());
497 let state_output = self
498 .compute_tipset_state(tipset.clone(), NO_CALLBACK, VMTrace::NotTraced)
499 .await?;
500
501 self.update_cache_with_state_output(key, &state_output);
502
503 let ts_state = state_output.into();
504
505 Ok(ts_state)
506 })
507 .await
508 .map(StateOutput::from)
509 }
510
511 fn update_cache_with_state_output(&self, key: &TipsetKey, state_output: &StateOutput) {
513 if !state_output.events.is_empty() || !state_output.events_roots.is_empty() {
514 let events_data = StateEvents {
515 events: state_output.events.clone(),
516 roots: state_output.events_roots.clone(),
517 };
518 self.receipt_event_cache_handler
519 .insert_events(key, events_data);
520 }
521
522 if let Ok(receipts) = Receipt::get_receipts(self.blockstore(), state_output.receipt_root)
523 && !receipts.is_empty()
524 {
525 self.receipt_event_cache_handler
526 .insert_receipt(key, receipts);
527 }
528 }
529
530 #[instrument(skip(self))]
531 pub async fn tipset_message_receipts(
532 self: &Arc<Self>,
533 tipset: &Tipset,
534 ) -> anyhow::Result<Vec<Receipt>> {
535 let key = tipset.key();
536 let ts = tipset.clone();
537 let this = Arc::clone(self);
538 self.receipt_event_cache_handler
539 .get_receipt_or_else(
540 key,
541 Box::new(move || {
542 Box::pin(async move {
543 let StateOutput { receipt_root, .. } = this
544 .compute_tipset_state(ts, NO_CALLBACK, VMTrace::NotTraced)
545 .await?;
546 trace!("Completed tipset state calculation");
547 Receipt::get_receipts(this.blockstore(), receipt_root)
548 })
549 }),
550 )
551 .await
552 }
553
554 #[instrument(skip(self))]
555 pub async fn tipset_state_events(
556 self: &Arc<Self>,
557 tipset: &Tipset,
558 ) -> anyhow::Result<StateEvents> {
559 let key = tipset.key();
560 let ts = tipset.clone();
561 let this = Arc::clone(self);
562 let cids = tipset.cids();
563 self.receipt_event_cache_handler
564 .get_events_or_else(
565 key,
566 Box::new(move || {
567 Box::pin(async move {
568 let state_out = this
570 .compute_tipset_state(ts, NO_CALLBACK, VMTrace::NotTraced)
571 .await?;
572 trace!("Completed tipset state calculation {:?}", cids);
573 Ok(StateEvents {
574 events: state_out.events,
575 roots: state_out.events_roots,
576 })
577 })
578 }),
579 )
580 .await
581 }
582
583 #[instrument(skip(self, rand))]
584 fn call_raw(
585 &self,
586 state_cid: Option<Cid>,
587 msg: &Message,
588 rand: ChainRand<DB>,
589 tipset: &Tipset,
590 ) -> Result<ApiInvocResult, Error> {
591 let mut msg = msg.clone();
592
593 let state_cid = state_cid.unwrap_or(*tipset.parent_state());
594
595 let tipset_messages = self
596 .chain_store()
597 .messages_for_tipset(tipset)
598 .map_err(|err| Error::Other(err.to_string()))?;
599
600 let prior_messsages = tipset_messages
601 .iter()
602 .filter(|ts_msg| ts_msg.message().from() == msg.from());
603
604 let height = tipset.epoch();
607 let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
608 let mut vm = VM::new(
609 ExecutionContext {
610 heaviest_tipset: tipset.clone(),
611 state_tree_root: state_cid,
612 epoch: height,
613 rand: Box::new(rand),
614 base_fee: tipset.block_headers().first().parent_base_fee.clone(),
615 circ_supply: genesis_info.get_vm_circulating_supply(
616 height,
617 self.blockstore(),
618 &state_cid,
619 )?,
620 chain_config: self.chain_config().clone(),
621 chain_index: self.chain_index().clone(),
622 timestamp: tipset.min_timestamp(),
623 },
624 &self.engine,
625 VMTrace::Traced,
626 )?;
627
628 for m in prior_messsages {
629 vm.apply_message(m)?;
630 }
631
632 let state_cid = vm.flush()?;
635
636 let state = StateTree::new_from_root(self.blockstore_owned(), &state_cid)?;
637
638 let from_actor = state
639 .get_actor(&msg.from())?
640 .ok_or_else(|| anyhow::anyhow!("actor not found"))?;
641 msg.set_sequence(from_actor.sequence);
642
643 let mut msg = msg.clone();
645 msg.gas_limit = IMPLICIT_MESSAGE_GAS_LIMIT as u64;
646
647 let (apply_ret, duration) = vm.apply_implicit_message(&msg)?;
648
649 Ok(ApiInvocResult {
650 msg: msg.clone(),
651 msg_rct: Some(apply_ret.msg_receipt()),
652 msg_cid: msg.cid(),
653 error: apply_ret.failure_info().unwrap_or_default(),
654 duration: duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
655 gas_cost: MessageGasCost::default(),
656 execution_trace: structured::parse_events(apply_ret.exec_trace()).unwrap_or_default(),
657 })
658 }
659
660 pub fn call(&self, message: &Message, tipset: Option<Tipset>) -> Result<ApiInvocResult, Error> {
663 let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
664 let chain_rand = self.chain_rand(ts.clone());
665 self.call_raw(None, message, chain_rand, &ts)
666 }
667
668 pub fn call_on_state(
671 &self,
672 state_cid: Cid,
673 message: &Message,
674 tipset: Option<Tipset>,
675 ) -> Result<ApiInvocResult, Error> {
676 let ts = tipset.unwrap_or_else(|| self.cs.heaviest_tipset());
677 let chain_rand = self.chain_rand(ts.clone());
678 self.call_raw(Some(state_cid), message, chain_rand, &ts)
679 }
680
681 pub async fn apply_on_state_with_gas(
682 self: &Arc<Self>,
683 tipset: Option<Tipset>,
684 msg: Message,
685 state_lookup: StateLookupPolicy,
686 vm_flush: VMFlush,
687 ) -> anyhow::Result<(ApiInvocResult, Option<Cid>)> {
688 let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
689
690 let from_a = self.resolve_to_key_addr(&msg.from, &ts).await?;
691
692 let mut chain_msg = match from_a.protocol() {
696 Protocol::Secp256k1 => ChainMessage::Signed(SignedMessage::new_unchecked(
697 msg.clone(),
698 Signature::new_secp256k1(vec![0; SECP_SIG_LEN]),
699 )),
700 Protocol::Delegated => ChainMessage::Signed(SignedMessage::new_unchecked(
701 msg.clone(),
702 Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]),
705 )),
706 _ => ChainMessage::Unsigned(msg.clone()),
707 };
708
709 let (_invoc_res, apply_ret, duration, state_root) = self
710 .call_with_gas(
711 &mut chain_msg,
712 &[],
713 Some(ts),
714 VMTrace::Traced,
715 state_lookup,
716 vm_flush,
717 )
718 .await?;
719
720 Ok((
721 ApiInvocResult {
722 msg_cid: msg.cid(),
723 msg,
724 msg_rct: Some(apply_ret.msg_receipt()),
725 error: apply_ret.failure_info().unwrap_or_default(),
726 duration: duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
727 gas_cost: MessageGasCost::default(),
728 execution_trace: structured::parse_events(apply_ret.exec_trace())
729 .unwrap_or_default(),
730 },
731 state_root,
732 ))
733 }
734
735 pub async fn call_with_gas(
738 self: &Arc<Self>,
739 message: &mut ChainMessage,
740 prior_messages: &[ChainMessage],
741 tipset: Option<Tipset>,
742 trace_config: VMTrace,
743 state_lookup: StateLookupPolicy,
744 vm_flush: VMFlush,
745 ) -> Result<(InvocResult, ApplyRet, Duration, Option<Cid>), Error> {
746 let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
747 let (st, _) = self
748 .tipset_state(&ts, state_lookup)
749 .await
750 .map_err(|e| Error::Other(format!("Could not load tipset state: {e}")))?;
751 let chain_rand = self.chain_rand(ts.clone());
752
753 let epoch = ts.epoch() + 1;
756 let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
757 let (ret, duration, state_cid) = stacker::grow(64 << 20, || -> anyhow::Result<_> {
760 let mut vm = VM::new(
761 ExecutionContext {
762 heaviest_tipset: ts.clone(),
763 state_tree_root: st,
764 epoch,
765 rand: Box::new(chain_rand),
766 base_fee: ts.block_headers().first().parent_base_fee.clone(),
767 circ_supply: genesis_info.get_vm_circulating_supply(
768 epoch,
769 self.blockstore(),
770 &st,
771 )?,
772 chain_config: self.chain_config().clone(),
773 chain_index: self.chain_index().clone(),
774 timestamp: ts.min_timestamp(),
775 },
776 &self.engine,
777 trace_config,
778 )?;
779
780 for msg in prior_messages {
781 vm.apply_message(msg)?;
782 }
783 let from_actor = vm
784 .get_actor(&message.from())
785 .map_err(|e| Error::Other(format!("Could not get actor from state: {e}")))?
786 .ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?;
787
788 message.set_sequence(from_actor.sequence);
789 let (ret, duration) = vm.apply_message(message)?;
790 let state_root = match vm_flush {
791 VMFlush::Flush => Some(vm.flush()?),
792 VMFlush::Skip => None,
793 };
794 Ok((ret, duration, state_root))
795 })?;
796
797 Ok((
798 InvocResult::new(message.message().clone(), &ret),
799 ret,
800 duration,
801 state_cid,
802 ))
803 }
804
805 pub async fn replay(self: &Arc<Self>, ts: Tipset, mcid: Cid) -> Result<ApiInvocResult, Error> {
808 let this = Arc::clone(self);
809 tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid))
810 .await
811 .map_err(|e| Error::Other(format!("{e}")))?
812 }
813
814 pub fn replay_blocking(
816 self: &Arc<Self>,
817 ts: Tipset,
818 mcid: Cid,
819 ) -> Result<ApiInvocResult, Error> {
820 const REPLAY_HALT: &str = "replay_halt";
821
822 let mut api_invoc_result = None;
823 let callback = |ctx: MessageCallbackCtx<'_>| {
824 match ctx.at {
825 CalledAt::Applied | CalledAt::Reward
826 if api_invoc_result.is_none() && ctx.cid == mcid =>
827 {
828 api_invoc_result = Some(ApiInvocResult {
829 msg_cid: ctx.message.cid(),
830 msg: ctx.message.message().clone(),
831 msg_rct: Some(ctx.apply_ret.msg_receipt()),
832 error: ctx.apply_ret.failure_info().unwrap_or_default(),
833 duration: ctx.duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
834 gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
835 execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
836 .unwrap_or_default(),
837 });
838 anyhow::bail!(REPLAY_HALT);
839 }
840 _ => Ok(()), }
842 };
843 let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced);
844 if let Err(error_message) = result
845 && error_message.to_string() != REPLAY_HALT
846 {
847 return Err(Error::Other(format!(
848 "unexpected error during execution : {error_message:}"
849 )));
850 }
851 api_invoc_result.ok_or_else(|| Error::Other("failed to replay".into()))
852 }
853
854 pub fn eligible_to_mine(
857 &self,
858 address: &Address,
859 base_tipset: &Tipset,
860 lookback_tipset: &Tipset,
861 ) -> anyhow::Result<bool, Error> {
862 let hmp =
863 self.miner_has_min_power(&self.chain_config().policy, address, lookback_tipset)?;
864 let version = self.get_network_version(base_tipset.epoch());
865
866 if version <= NetworkVersion::V3 {
867 return Ok(hmp);
868 }
869
870 if !hmp {
871 return Ok(false);
872 }
873
874 let actor = self
875 .get_actor(&Address::POWER_ACTOR, *base_tipset.parent_state())?
876 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
877
878 let power_state = power::State::load(self.blockstore(), actor.code, actor.state)?;
879
880 let actor = self
881 .get_actor(address, *base_tipset.parent_state())?
882 .ok_or_else(|| Error::state("Miner actor address could not be resolved"))?;
883
884 let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
885
886 let claim = power_state
888 .miner_power(self.blockstore(), address)?
889 .ok_or_else(|| Error::Other("Could not get claim".to_string()))?;
890 if claim.quality_adj_power <= BigInt::zero() {
891 return Ok(false);
892 }
893
894 if !miner_state.fee_debt().is_zero() {
896 return Ok(false);
897 }
898
899 let info = miner_state.info(self.blockstore())?;
901 if base_tipset.epoch() <= info.consensus_fault_elapsed {
902 return Ok(false);
903 }
904
905 Ok(true)
906 }
907
908 #[instrument(skip_all)]
930 pub async fn compute_tipset_state(
931 self: &Arc<Self>,
932 tipset: Tipset,
933 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
934 enable_tracing: VMTrace,
935 ) -> Result<StateOutput, Error> {
936 let this = Arc::clone(self);
937 tokio::task::spawn_blocking(move || {
938 this.compute_tipset_state_blocking(tipset, callback, enable_tracing)
939 })
940 .await?
941 }
942
943 #[tracing::instrument(skip_all)]
945 pub fn compute_tipset_state_blocking(
946 &self,
947 tipset: Tipset,
948 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
949 enable_tracing: VMTrace,
950 ) -> Result<StateOutput, Error> {
951 let epoch = tipset.epoch();
952 let has_callback = callback.is_some();
953 Ok(apply_block_messages(
954 self.chain_store().genesis_block_header().timestamp,
955 Arc::clone(self.chain_index()),
956 Arc::clone(self.chain_config()),
957 self.beacon_schedule().clone(),
958 &self.engine,
959 tipset,
960 callback,
961 enable_tracing,
962 )
963 .map_err(|e| {
964 if has_callback {
965 e
966 } else {
967 anyhow::anyhow!("Failed to compute tipset state@{epoch}: {e}")
968 }
969 })?)
970 }
971
972 #[instrument(skip_all)]
973 pub async fn compute_state(
974 self: &Arc<Self>,
975 height: ChainEpoch,
976 messages: Vec<Message>,
977 tipset: Tipset,
978 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
979 enable_tracing: VMTrace,
980 ) -> Result<StateOutput, Error> {
981 let this = Arc::clone(self);
982 tokio::task::spawn_blocking(move || {
983 this.compute_state_blocking(height, messages, tipset, callback, enable_tracing)
984 })
985 .await?
986 }
987
988 #[tracing::instrument(skip_all)]
990 pub fn compute_state_blocking(
991 &self,
992 height: ChainEpoch,
993 messages: Vec<Message>,
994 tipset: Tipset,
995 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
996 enable_tracing: VMTrace,
997 ) -> Result<StateOutput, Error> {
998 Ok(compute_state(
999 height,
1000 messages,
1001 tipset,
1002 self.chain_store().genesis_block_header().timestamp,
1003 Arc::clone(self.chain_index()),
1004 Arc::clone(self.chain_config()),
1005 self.beacon_schedule().clone(),
1006 &self.engine,
1007 callback,
1008 enable_tracing,
1009 )?)
1010 }
1011
1012 fn tipset_executed_message(
1015 &self,
1016 tipset: &Tipset,
1017 message: &ChainMessage,
1018 allow_replaced: bool,
1019 ) -> Result<Option<Receipt>, Error> {
1020 if tipset.epoch() == 0 {
1021 return Ok(None);
1022 }
1023 let message_from_address = message.from();
1024 let message_sequence = message.sequence();
1025 let pts = self
1027 .chain_index()
1028 .load_required_tipset(tipset.parents())
1029 .map_err(|err| Error::Other(format!("Failed to load tipset: {err}")))?;
1030 let messages = self
1031 .cs
1032 .messages_for_tipset(&pts)
1033 .map_err(|err| Error::Other(format!("Failed to load messages for tipset: {err}")))?;
1034 messages
1035 .iter()
1036 .enumerate()
1037 .rev()
1039 .filter(|(_, s)| {
1040 s.sequence() == message_sequence
1041 && s.from() == message_from_address
1042 && s.equal_call(message)
1043 })
1044 .map(|(index, m)| {
1045 if !allow_replaced && message.cid() != m.cid(){
1049 Err(Error::Other(format!(
1050 "found message with equal nonce and call params but different CID. wanted {}, found: {}, nonce: {}, from: {}",
1051 message.cid(),
1052 m.cid(),
1053 message.sequence(),
1054 message.from(),
1055 )))
1056 } else {
1057 let block_header = tipset.block_headers().first();
1058 crate::chain::get_parent_receipt(
1059 self.blockstore(),
1060 block_header,
1061 index,
1062 )
1063 .map_err(|err| Error::Other(format!("Failed to get parent receipt (message_receipts={}, index={index}, error={err})", block_header.message_receipts)))
1064 }
1065 })
1066 .next()
1067 .unwrap_or(Ok(None))
1068 }
1069
1070 fn check_search(
1071 &self,
1072 mut current: Tipset,
1073 message: &ChainMessage,
1074 lookback_max_epoch: ChainEpoch,
1075 allow_replaced: bool,
1076 ) -> Result<Option<(Tipset, Receipt)>, Error> {
1077 let message_from_address = message.from();
1078 let message_sequence = message.sequence();
1079 let mut current_actor_state = self
1080 .get_required_actor(&message_from_address, *current.parent_state())
1081 .map_err(Error::state)?;
1082 let message_from_id = self.lookup_required_id(&message_from_address, ¤t)?;
1083
1084 while current.epoch() >= lookback_max_epoch {
1085 let parent_tipset = self
1086 .chain_index()
1087 .load_required_tipset(current.parents())
1088 .map_err(|err| {
1089 Error::Other(format!(
1090 "failed to load tipset during msg wait searchback: {err:}"
1091 ))
1092 })?;
1093
1094 let parent_actor_state = self
1095 .get_actor(&message_from_id, *parent_tipset.parent_state())
1096 .map_err(|e| Error::State(e.to_string()))?;
1097
1098 if parent_actor_state.is_none()
1099 || (current_actor_state.sequence > message_sequence
1100 && parent_actor_state.as_ref().unwrap().sequence <= message_sequence)
1101 {
1102 let receipt = self
1103 .tipset_executed_message(¤t, message, allow_replaced)?
1104 .context("Failed to get receipt with tipset_executed_message")?;
1105 return Ok(Some((current, receipt)));
1106 }
1107
1108 if let Some(parent_actor_state) = parent_actor_state {
1109 current = parent_tipset;
1110 current_actor_state = parent_actor_state;
1111 } else {
1112 break;
1113 }
1114 }
1115
1116 Ok(None)
1117 }
1118
1119 fn search_back_for_message(
1121 &self,
1122 current: Tipset,
1123 message: &ChainMessage,
1124 look_back_limit: Option<i64>,
1125 allow_replaced: Option<bool>,
1126 ) -> Result<Option<(Tipset, Receipt)>, Error> {
1127 let current_epoch = current.epoch();
1128 let allow_replaced = allow_replaced.unwrap_or(true);
1129
1130 let lookback_max_epoch = match look_back_limit {
1132 Some(0) => return Ok(None),
1134 Some(limit) if limit > 0 => (current_epoch - limit + 1).max(0),
1138 _ => 0,
1140 };
1141
1142 self.check_search(current, message, lookback_max_epoch, allow_replaced)
1143 }
1144
1145 pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result<Receipt, Error> {
1147 let m = crate::chain::get_chain_message(self.blockstore(), &msg)
1148 .map_err(|e| Error::Other(e.to_string()))?;
1149 let message_receipt = self.tipset_executed_message(&tipset, &m, true)?;
1150 if let Some(receipt) = message_receipt {
1151 return Ok(receipt);
1152 }
1153
1154 let maybe_tuple = self.search_back_for_message(tipset, &m, None, None)?;
1155 let message_receipt = maybe_tuple
1156 .ok_or_else(|| {
1157 Error::Other("Could not get receipt from search back message".to_string())
1158 })?
1159 .1;
1160 Ok(message_receipt)
1161 }
1162
1163 pub async fn wait_for_message(
1168 self: &Arc<Self>,
1169 msg_cid: Cid,
1170 confidence: i64,
1171 look_back_limit: Option<ChainEpoch>,
1172 allow_replaced: Option<bool>,
1173 ) -> Result<(Option<Tipset>, Option<Receipt>), Error> {
1174 let mut subscriber = self.cs.publisher().subscribe();
1175 let (sender, mut receiver) = oneshot::channel::<()>();
1176 let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1177 .map_err(|err| Error::Other(format!("failed to load message {err:}")))?;
1178 let current_tipset = self.heaviest_tipset();
1179 let maybe_message_receipt =
1180 self.tipset_executed_message(¤t_tipset, &message, true)?;
1181 if let Some(r) = maybe_message_receipt {
1182 return Ok((Some(current_tipset.clone()), Some(r)));
1183 }
1184
1185 let mut candidate_tipset: Option<Tipset> = None;
1186 let mut candidate_receipt: Option<Receipt> = None;
1187
1188 let sm_cloned = Arc::clone(self);
1189
1190 let message_for_task = message.clone();
1191 let height_of_head = current_tipset.epoch();
1192 let task = tokio::task::spawn(async move {
1193 let back_tuple = sm_cloned.search_back_for_message(
1194 current_tipset,
1195 &message_for_task,
1196 look_back_limit,
1197 allow_replaced,
1198 )?;
1199 sender
1200 .send(())
1201 .map_err(|e| Error::Other(format!("Could not send to channel {e:?}")))?;
1202 Ok::<_, Error>(back_tuple)
1203 });
1204
1205 let reverts: Arc<RwLock<HashMap<TipsetKey, bool>>> = Arc::new(RwLock::new(HashMap::new()));
1206 let block_revert = reverts.clone();
1207 let sm_cloned = Arc::clone(self);
1208
1209 let mut subscriber_poll = tokio::task::spawn(async move {
1211 loop {
1212 match subscriber.recv().await {
1213 Ok(subscriber) => match subscriber {
1214 HeadChange::Apply(tipset) => {
1215 if candidate_tipset
1216 .as_ref()
1217 .map(|s| tipset.epoch() >= s.epoch() + confidence)
1218 .unwrap_or_default()
1219 {
1220 return Ok((candidate_tipset, candidate_receipt));
1221 }
1222 let poll_receiver = receiver.try_recv();
1223 if let Ok(Some(_)) = poll_receiver {
1224 block_revert
1225 .write()
1226 .await
1227 .insert(tipset.key().to_owned(), true);
1228 }
1229
1230 let maybe_receipt =
1231 sm_cloned.tipset_executed_message(&tipset, &message, true)?;
1232 if let Some(receipt) = maybe_receipt {
1233 if confidence == 0 {
1234 return Ok((Some(tipset), Some(receipt)));
1235 }
1236 candidate_tipset = Some(tipset);
1237 candidate_receipt = Some(receipt)
1238 }
1239 }
1240 },
1241 Err(RecvError::Lagged(i)) => {
1242 warn!(
1243 "wait for message head change subscriber lagged, skipped {} events",
1244 i
1245 );
1246 }
1247 Err(RecvError::Closed) => break,
1248 }
1249 }
1250 Ok((None, None))
1251 })
1252 .fuse();
1253
1254 let mut search_back_poll = tokio::task::spawn(async move {
1256 let back_tuple = task.await.map_err(|e| {
1257 Error::Other(format!("Could not search backwards for message {e}"))
1258 })??;
1259 if let Some((back_tipset, back_receipt)) = back_tuple {
1260 let should_revert = *reverts
1261 .read()
1262 .await
1263 .get(back_tipset.key())
1264 .unwrap_or(&false);
1265 let larger_height_of_head = height_of_head >= back_tipset.epoch() + confidence;
1266 if !should_revert && larger_height_of_head {
1267 return Ok::<_, Error>((Some(back_tipset), Some(back_receipt)));
1268 }
1269 return Ok((None, None));
1270 }
1271 Ok((None, None))
1272 })
1273 .fuse();
1274
1275 loop {
1277 select! {
1278 res = subscriber_poll => {
1279 return res?
1280 }
1281 res = search_back_poll => {
1282 if let Ok((Some(ts), Some(rct))) = res? {
1283 return Ok((Some(ts), Some(rct)));
1284 }
1285 }
1286 }
1287 }
1288 }
1289
1290 pub async fn search_for_message(
1291 &self,
1292 from: Option<Tipset>,
1293 msg_cid: Cid,
1294 look_back_limit: Option<i64>,
1295 allow_replaced: Option<bool>,
1296 ) -> Result<Option<(Tipset, Receipt)>, Error> {
1297 let from = from.unwrap_or_else(|| self.heaviest_tipset());
1298 let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1299 .map_err(|err| Error::Other(format!("failed to load message {err}")))?;
1300 let current_tipset = self.heaviest_tipset();
1301 let maybe_message_receipt =
1302 self.tipset_executed_message(&from, &message, allow_replaced.unwrap_or(true))?;
1303 if let Some(r) = maybe_message_receipt {
1304 Ok(Some((from, r)))
1305 } else {
1306 self.search_back_for_message(current_tipset, &message, look_back_limit, allow_replaced)
1307 }
1308 }
1309
1310 pub fn get_bls_public_key(
1312 db: &Arc<DB>,
1313 addr: &Address,
1314 state_cid: Cid,
1315 ) -> Result<BlsPublicKey, Error> {
1316 let state = StateTree::new_from_root(Arc::clone(db), &state_cid)
1317 .map_err(|e| Error::Other(e.to_string()))?;
1318 let kaddr = resolve_to_key_addr(&state, db, addr)
1319 .map_err(|e| format!("Failed to resolve key address, error: {e}"))?;
1320
1321 match kaddr.into_payload() {
1322 Payload::BLS(key) => BlsPublicKey::from_bytes(&key)
1323 .map_err(|e| Error::Other(format!("Failed to construct bls public key: {e}"))),
1324 _ => Err(Error::state(
1325 "Address must be BLS address to load bls public key",
1326 )),
1327 }
1328 }
1329
1330 pub fn lookup_id(&self, addr: &Address, ts: &Tipset) -> Result<Option<Address>, Error> {
1332 let state_tree = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1333 .map_err(|e| format!("{e:?}"))?;
1334 Ok(state_tree
1335 .lookup_id(addr)
1336 .map_err(|e| Error::Other(e.to_string()))?
1337 .map(Address::new_id))
1338 }
1339
1340 pub fn lookup_required_id(&self, addr: &Address, ts: &Tipset) -> Result<Address, Error> {
1342 self.lookup_id(addr, ts)?
1343 .ok_or_else(|| Error::Other(format!("Failed to lookup the id address {addr}")))
1344 }
1345
1346 pub fn market_state(&self, ts: &Tipset) -> Result<market::State, Error> {
1348 let actor = self.get_required_actor(&Address::MARKET_ACTOR, *ts.parent_state())?;
1349 let market_state = market::State::load(self.blockstore(), actor.code, actor.state)?;
1350 Ok(market_state)
1351 }
1352
1353 pub fn market_balance(&self, addr: &Address, ts: &Tipset) -> Result<MarketBalance, Error> {
1355 let market_state = self.market_state(ts)?;
1356 let new_addr = self.lookup_required_id(addr, ts)?;
1357 let out = MarketBalance {
1358 escrow: {
1359 market_state
1360 .escrow_table(self.blockstore())?
1361 .get(&new_addr)?
1362 },
1363 locked: {
1364 market_state
1365 .locked_table(self.blockstore())?
1366 .get(&new_addr)?
1367 },
1368 };
1369
1370 Ok(out)
1371 }
1372
1373 pub fn miner_info(&self, addr: &Address, ts: &Tipset) -> Result<MinerInfo, Error> {
1375 let actor = self
1376 .get_actor(addr, *ts.parent_state())?
1377 .ok_or_else(|| Error::state("Miner actor not found"))?;
1378 let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1379
1380 Ok(state.info(self.blockstore())?)
1381 }
1382
1383 pub fn miner_faults(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1385 self.all_partition_sectors(addr, ts, |partition| partition.faulty_sectors().clone())
1386 }
1387
1388 pub fn miner_recoveries(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1390 self.all_partition_sectors(addr, ts, |partition| partition.recovering_sectors().clone())
1391 }
1392
1393 fn all_partition_sectors(
1394 &self,
1395 addr: &Address,
1396 ts: &Tipset,
1397 get_sector: impl Fn(Partition<'_>) -> BitField,
1398 ) -> Result<BitField, Error> {
1399 let actor = self
1400 .get_actor(addr, *ts.parent_state())?
1401 .ok_or_else(|| Error::state("Miner actor not found"))?;
1402
1403 let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1404
1405 let mut partitions = Vec::new();
1406
1407 state.for_each_deadline(
1408 &self.chain_config().policy,
1409 self.blockstore(),
1410 |_, deadline| {
1411 deadline.for_each(self.blockstore(), |_, partition| {
1412 partitions.push(get_sector(partition));
1413 Ok(())
1414 })
1415 },
1416 )?;
1417
1418 Ok(BitField::union(partitions.iter()))
1419 }
1420
1421 pub fn miner_power(&self, addr: &Address, ts: &Tipset) -> Result<MinerPower, Error> {
1423 if let Some((miner_power, total_power)) = self.get_power(ts.parent_state(), Some(addr))? {
1424 return Ok(MinerPower {
1425 miner_power,
1426 total_power,
1427 has_min_power: true,
1428 });
1429 }
1430
1431 Ok(MinerPower {
1432 has_min_power: false,
1433 miner_power: Default::default(),
1434 total_power: Default::default(),
1435 })
1436 }
1437
1438 pub async fn resolve_to_key_addr(
1441 self: &Arc<Self>,
1442 addr: &Address,
1443 ts: &Tipset,
1444 ) -> Result<Address, anyhow::Error> {
1445 match addr.protocol() {
1446 Protocol::BLS | Protocol::Secp256k1 | Protocol::Delegated => return Ok(*addr),
1447 Protocol::Actor => {
1448 return Err(Error::Other(
1449 "cannot resolve actor address to key address".to_string(),
1450 )
1451 .into());
1452 }
1453 _ => {}
1454 };
1455
1456 let state = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())?;
1459 if let Ok(addr) = resolve_to_key_addr(&state, self.blockstore(), addr) {
1460 return Ok(addr);
1461 }
1462
1463 let (st, _) = self.tipset_state(ts, StateLookupPolicy::Enabled).await?;
1465 let state = StateTree::new_from_root(self.blockstore_owned(), &st)?;
1466
1467 resolve_to_key_addr(&state, self.blockstore(), addr)
1468 }
1469
1470 pub async fn miner_get_base_info(
1471 self: &Arc<Self>,
1472 beacon_schedule: &BeaconSchedule,
1473 tipset: Tipset,
1474 addr: Address,
1475 epoch: ChainEpoch,
1476 ) -> anyhow::Result<Option<MiningBaseInfo>> {
1477 let prev_beacon = self
1478 .chain_store()
1479 .chain_index()
1480 .latest_beacon_entry(tipset.clone())?;
1481
1482 let entries: Vec<BeaconEntry> = beacon_schedule
1483 .beacon_entries_for_block(
1484 self.chain_config().network_version(epoch),
1485 epoch,
1486 tipset.epoch(),
1487 &prev_beacon,
1488 )
1489 .await?;
1490
1491 let base = entries.last().unwrap_or(&prev_beacon);
1492
1493 let (lb_tipset, lb_state_root) = ChainStore::get_lookback_tipset_for_round(
1494 self.chain_index(),
1495 self.chain_config(),
1496 &tipset,
1497 epoch,
1498 )?;
1499
1500 let actor = self.get_required_actor(&addr, *tipset.parent_state())?;
1505 if self.get_actor(&addr, lb_state_root)?.is_none() {
1506 return Ok(None);
1507 }
1508
1509 let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1510
1511 let addr_buf = to_vec(&addr)?;
1512 let rand = draw_randomness(
1513 base.signature(),
1514 DomainSeparationTag::WinningPoStChallengeSeed as i64,
1515 epoch,
1516 &addr_buf,
1517 )?;
1518
1519 let network_version = self.chain_config().network_version(tipset.epoch());
1520 let sectors = self.get_sectors_for_winning_post(
1521 &lb_state_root,
1522 network_version,
1523 &addr,
1524 Randomness::new(rand.to_vec()),
1525 )?;
1526
1527 if sectors.is_empty() {
1528 return Ok(None);
1529 }
1530
1531 let (miner_power, total_power) = self
1532 .get_power(&lb_state_root, Some(&addr))?
1533 .context("failed to get power")?;
1534
1535 let info = miner_state.info(self.blockstore())?;
1536
1537 let worker_key = self
1538 .resolve_to_deterministic_address(info.worker, &tipset)
1539 .await?;
1540 let eligible = self.eligible_to_mine(&addr, &tipset, &lb_tipset)?;
1541
1542 Ok(Some(MiningBaseInfo {
1543 miner_power: miner_power.quality_adj_power,
1544 network_power: total_power.quality_adj_power,
1545 sectors,
1546 worker_key,
1547 sector_size: info.sector_size,
1548 prev_beacon_entry: prev_beacon,
1549 beacon_entries: entries,
1550 eligible_for_mining: eligible,
1551 }))
1552 }
1553
1554 pub fn miner_has_min_power(
1557 &self,
1558 policy: &Policy,
1559 addr: &Address,
1560 ts: &Tipset,
1561 ) -> anyhow::Result<bool> {
1562 let actor = self
1563 .get_actor(&Address::POWER_ACTOR, *ts.parent_state())?
1564 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
1565 let ps = power::State::load(self.blockstore(), actor.code, actor.state)?;
1566
1567 ps.miner_nominal_power_meets_consensus_minimum(policy, self.blockstore(), addr)
1568 }
1569
1570 #[tracing::instrument(skip(self))]
1594 pub fn validate_range(&self, epochs: RangeInclusive<i64>) -> anyhow::Result<()> {
1595 let heaviest = self.heaviest_tipset();
1596 let heaviest_epoch = heaviest.epoch();
1597 let end = self
1598 .chain_index()
1599 .tipset_by_height(*epochs.end(), heaviest, ResolveNullTipset::TakeOlder)
1600 .with_context(|| {
1601 format!(
1602 "couldn't get a tipset at height {} behind heaviest tipset at height {heaviest_epoch}",
1603 *epochs.end(),
1604 )
1605 })?;
1606
1607 let tipsets = self
1609 .chain_index()
1610 .chain(end)
1611 .take_while(|tipset| tipset.epoch() >= *epochs.start());
1612
1613 self.validate_tipsets(tipsets)
1614 }
1615
1616 pub fn validate_tipsets<T>(&self, tipsets: T) -> anyhow::Result<()>
1617 where
1618 T: Iterator<Item = Tipset> + Send,
1619 {
1620 let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1621 validate_tipsets(
1622 genesis_timestamp,
1623 self.chain_index().clone(),
1624 self.chain_config().clone(),
1625 self.beacon_schedule().clone(),
1626 &self.engine,
1627 tipsets,
1628 )
1629 }
1630
1631 pub fn get_verified_registry_actor_state(
1632 &self,
1633 ts: &Tipset,
1634 ) -> anyhow::Result<verifreg::State> {
1635 let act = self
1636 .get_actor(&Address::VERIFIED_REGISTRY_ACTOR, *ts.parent_state())
1637 .map_err(Error::state)?
1638 .ok_or_else(|| Error::state("actor not found"))?;
1639 verifreg::State::load(self.blockstore(), act.code, act.state)
1640 }
1641 pub fn get_claim(
1642 &self,
1643 addr: &Address,
1644 ts: &Tipset,
1645 claim_id: ClaimID,
1646 ) -> anyhow::Result<Option<Claim>> {
1647 let id_address = self.lookup_required_id(addr, ts)?;
1648 let state = self.get_verified_registry_actor_state(ts)?;
1649 state.get_claim(self.blockstore(), id_address, claim_id)
1650 }
1651
1652 pub fn get_all_claims(&self, ts: &Tipset) -> anyhow::Result<HashMap<ClaimID, Claim>> {
1653 let state = self.get_verified_registry_actor_state(ts)?;
1654 state.get_all_claims(self.blockstore())
1655 }
1656
1657 pub fn get_allocation(
1658 &self,
1659 addr: &Address,
1660 ts: &Tipset,
1661 allocation_id: AllocationID,
1662 ) -> anyhow::Result<Option<Allocation>> {
1663 let id_address = self.lookup_required_id(addr, ts)?;
1664 let state = self.get_verified_registry_actor_state(ts)?;
1665 state.get_allocation(self.blockstore(), id_address.id()?, allocation_id)
1666 }
1667
1668 pub fn get_all_allocations(
1669 &self,
1670 ts: &Tipset,
1671 ) -> anyhow::Result<HashMap<AllocationID, Allocation>> {
1672 let state = self.get_verified_registry_actor_state(ts)?;
1673 state.get_all_allocations(self.blockstore())
1674 }
1675
1676 pub fn verified_client_status(
1677 &self,
1678 addr: &Address,
1679 ts: &Tipset,
1680 ) -> anyhow::Result<Option<DataCap>> {
1681 let id = self.lookup_required_id(addr, ts)?;
1682 let network_version = self.get_network_version(ts.epoch());
1683
1684 if (u32::from(network_version.0)) < 17 {
1688 let state = self.get_verified_registry_actor_state(ts)?;
1689 return state.verified_client_data_cap(self.blockstore(), id);
1690 }
1691
1692 let act = self
1693 .get_actor(&Address::DATACAP_TOKEN_ACTOR, *ts.parent_state())
1694 .map_err(Error::state)?
1695 .ok_or_else(|| Error::state("Miner actor not found"))?;
1696
1697 let state = datacap::State::load(self.blockstore(), act.code, act.state)?;
1698
1699 state.verified_client_data_cap(self.blockstore(), id)
1700 }
1701
1702 pub async fn resolve_to_deterministic_address(
1703 self: &Arc<Self>,
1704 address: Address,
1705 ts: &Tipset,
1706 ) -> anyhow::Result<Address> {
1707 use crate::shim::address::Protocol::*;
1708 match address.protocol() {
1709 BLS | Secp256k1 | Delegated => Ok(address),
1710 Actor => anyhow::bail!("cannot resolve actor address to key address"),
1711 _ => {
1712 if let Ok(state) =
1714 StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1715 && let Ok(address) = state
1716 .resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
1717 {
1718 return Ok(address);
1719 }
1720
1721 let (state_root, _) = self.tipset_state(ts, StateLookupPolicy::Enabled).await?;
1723 let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
1724 state.resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
1725 }
1726 }
1727 }
1728
1729 pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec<ApiInvocResult>)> {
1730 let mut invoc_trace = vec![];
1731
1732 let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1733
1734 let callback = |ctx: MessageCallbackCtx<'_>| {
1735 match ctx.at {
1736 CalledAt::Applied | CalledAt::Reward => {
1737 invoc_trace.push(ApiInvocResult {
1738 msg_cid: ctx.message.cid(),
1739 msg: ctx.message.message().clone(),
1740 msg_rct: Some(ctx.apply_ret.msg_receipt()),
1741 error: ctx.apply_ret.failure_info().unwrap_or_default(),
1742 duration: ctx.duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
1743 gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
1744 execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
1745 .unwrap_or_default(),
1746 });
1747 Ok(())
1748 }
1749 _ => Ok(()), }
1751 };
1752
1753 let StateOutput { state_root, .. } = apply_block_messages(
1754 genesis_timestamp,
1755 self.chain_index().clone(),
1756 self.chain_config().clone(),
1757 self.beacon_schedule().clone(),
1758 &self.engine,
1759 tipset.clone(),
1760 Some(callback),
1761 VMTrace::Traced,
1762 )?;
1763
1764 Ok((state_root, invoc_trace))
1765 }
1766
1767 fn try_lookup_state_from_next_tipset(&self, tipset: &Tipset) -> Option<StateOutputValue> {
1771 let epoch = tipset.epoch();
1772 let next_epoch = epoch + 1;
1773
1774 let heaviest = self.heaviest_tipset();
1776 if next_epoch > heaviest.epoch() {
1777 return None;
1778 }
1779
1780 if let Ok(next_tipset) =
1782 self.chain_index()
1783 .tipset_by_height(next_epoch, heaviest, ResolveNullTipset::TakeNewer)
1784 {
1785 if !next_tipset.parents().eq(tipset.key()) {
1787 return None;
1788 }
1789
1790 let state_root = next_tipset.parent_state();
1791 let receipt_root = next_tipset.min_ticket_block().message_receipts;
1792
1793 if self.blockstore().has(state_root).unwrap_or(false)
1794 && self.blockstore().has(&receipt_root).unwrap_or(false)
1795 {
1796 return Some(StateOutputValue {
1797 state_root: state_root.into(),
1798 receipt_root,
1799 });
1800 }
1801 }
1802
1803 None
1804 }
1805}
1806
1807pub fn validate_tipsets<DB, T>(
1808 genesis_timestamp: u64,
1809 chain_index: Arc<ChainIndex<Arc<DB>>>,
1810 chain_config: Arc<ChainConfig>,
1811 beacon: Arc<BeaconSchedule>,
1812 engine: &MultiEngine,
1813 tipsets: T,
1814) -> anyhow::Result<()>
1815where
1816 DB: Blockstore + Send + Sync + 'static,
1817 T: Iterator<Item = Tipset> + Send,
1818{
1819 use rayon::iter::ParallelIterator as _;
1820 tipsets
1821 .tuple_windows()
1822 .par_bridge()
1823 .try_for_each(|(child, parent)| {
1824 info!(height = parent.epoch(), "compute parent state");
1825 let StateOutput {
1826 state_root: actual_state,
1827 receipt_root: actual_receipt,
1828 ..
1829 } = apply_block_messages(
1830 genesis_timestamp,
1831 chain_index.clone(),
1832 chain_config.clone(),
1833 beacon.clone(),
1834 engine,
1835 parent,
1836 NO_CALLBACK,
1837 VMTrace::NotTraced,
1838 )
1839 .map_err(|e| anyhow::anyhow!("couldn't compute tipset state: {e}"))?;
1840 let expected_receipt = child.min_ticket_block().message_receipts;
1841 let expected_state = child.parent_state();
1842 match (expected_state, expected_receipt) == (&actual_state, actual_receipt) {
1843 true => Ok(()),
1844 false => {
1845 error!(
1846 height = child.epoch(),
1847 ?expected_state,
1848 ?expected_receipt,
1849 ?actual_state,
1850 ?actual_receipt,
1851 "state mismatch"
1852 );
1853 bail!("state mismatch");
1854 }
1855 }
1856 })
1857}
1858
1859#[allow(clippy::too_many_arguments)]
1936pub fn apply_block_messages<DB>(
1937 genesis_timestamp: u64,
1938 chain_index: Arc<ChainIndex<Arc<DB>>>,
1939 chain_config: Arc<ChainConfig>,
1940 beacon: Arc<BeaconSchedule>,
1941 engine: &MultiEngine,
1942 tipset: Tipset,
1943 mut callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
1944 enable_tracing: VMTrace,
1945) -> anyhow::Result<StateOutput>
1946where
1947 DB: Blockstore + Send + Sync + 'static,
1948{
1949 if tipset.epoch() == 0 {
1958 let message_receipts = tipset.min_ticket_block().message_receipts;
1963 return Ok(StateOutput {
1964 state_root: *tipset.parent_state(),
1965 receipt_root: message_receipts,
1966 events: vec![],
1967 events_roots: vec![],
1968 });
1969 }
1970
1971 let rand = ChainRand::new(
1972 Arc::clone(&chain_config),
1973 tipset.clone(),
1974 Arc::clone(&chain_index),
1975 beacon,
1976 );
1977
1978 let genesis_info = GenesisInfo::from_chain_config(chain_config.clone());
1979 let create_vm = |state_root: Cid, epoch, timestamp| {
1980 let circulating_supply =
1981 genesis_info.get_vm_circulating_supply(epoch, chain_index.db(), &state_root)?;
1982 VM::new(
1983 ExecutionContext {
1984 heaviest_tipset: tipset.clone(),
1985 state_tree_root: state_root,
1986 epoch,
1987 rand: Box::new(rand.clone()),
1988 base_fee: tipset.min_ticket_block().parent_base_fee.clone(),
1989 circ_supply: circulating_supply,
1990 chain_config: Arc::clone(&chain_config),
1991 chain_index: Arc::clone(&chain_index),
1992 timestamp,
1993 },
1994 engine,
1995 enable_tracing,
1996 )
1997 };
1998
1999 let mut parent_state = *tipset.parent_state();
2000
2001 let parent_epoch = Tipset::load_required(chain_index.db(), tipset.parents())?.epoch();
2002 let epoch = tipset.epoch();
2003
2004 for epoch_i in parent_epoch..epoch {
2005 if epoch_i > parent_epoch {
2006 let timestamp = genesis_timestamp + ((EPOCH_DURATION_SECONDS * epoch_i) as u64);
2008
2009 parent_state = stacker::grow(64 << 20, || -> anyhow::Result<Cid> {
2012 let mut vm = create_vm(parent_state, epoch_i, timestamp)?;
2013 if let Err(e) = vm.run_cron(epoch_i, callback.as_mut()) {
2015 error!("Beginning of epoch cron failed to run: {}", e);
2016 }
2017 vm.flush()
2018 })?;
2019 }
2020
2021 if let Some(new_state) =
2023 run_state_migrations(epoch_i, &chain_config, chain_index.db(), &parent_state)?
2024 {
2025 parent_state = new_state;
2026 }
2027 }
2028
2029 let block_messages = BlockMessages::for_tipset(chain_index.db(), &tipset)
2030 .map_err(|e| Error::Other(e.to_string()))?;
2031
2032 stacker::grow(64 << 20, || -> anyhow::Result<StateOutput> {
2035 let mut vm = create_vm(parent_state, epoch, tipset.min_timestamp())?;
2036
2037 let (receipts, events, events_roots) =
2039 vm.apply_block_messages(&block_messages, epoch, callback)?;
2040
2041 let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts)?;
2043
2044 for (msg_events, events_root) in events.iter().zip(events_roots.iter()) {
2046 if let Some(event_root) = events_root {
2047 let derived_event_root = Amt::new_from_iter_with_bit_width(
2049 chain_index.db(),
2050 EVENTS_AMT_BITWIDTH,
2051 msg_events.iter(),
2052 )
2053 .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;
2054
2055 ensure!(
2057 derived_event_root.eq(event_root),
2058 "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
2059 );
2060 }
2061 }
2062
2063 let state_root = vm.flush()?;
2064
2065 Ok(StateOutput {
2066 state_root,
2067 receipt_root,
2068 events,
2069 events_roots,
2070 })
2071 })
2072}
2073
2074#[allow(clippy::too_many_arguments)]
2075pub fn compute_state<DB>(
2076 _height: ChainEpoch,
2077 messages: Vec<Message>,
2078 tipset: Tipset,
2079 genesis_timestamp: u64,
2080 chain_index: Arc<ChainIndex<Arc<DB>>>,
2081 chain_config: Arc<ChainConfig>,
2082 beacon: Arc<BeaconSchedule>,
2083 engine: &MultiEngine,
2084 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
2085 enable_tracing: VMTrace,
2086) -> anyhow::Result<StateOutput>
2087where
2088 DB: Blockstore + Send + Sync + 'static,
2089{
2090 if !messages.is_empty() {
2091 anyhow::bail!("Applying messages is not yet implemented.");
2092 }
2093
2094 let output = apply_block_messages(
2095 genesis_timestamp,
2096 chain_index,
2097 chain_config,
2098 beacon,
2099 engine,
2100 tipset,
2101 callback,
2102 enable_tracing,
2103 )?;
2104
2105 Ok(output)
2106}
2107
2108#[derive(Debug, Copy, Clone, Default)]
2110pub enum StateLookupPolicy {
2111 #[default]
2112 Enabled,
2113 Disabled,
2114}
2115
2116#[derive(Debug, Copy, Clone, Default)]
2118pub enum VMFlush {
2119 Flush,
2120 #[default]
2121 Skip,
2122}