1#[cfg(test)]
5mod tests;
6
7mod cache;
8pub mod chain_rand;
9pub mod circulating_supply;
10mod errors;
11pub mod utils;
12
13pub use self::errors::*;
14use self::utils::structured;
15
16use crate::beacon::{BeaconEntry, BeaconSchedule};
17use crate::blocks::{Tipset, TipsetKey};
18use crate::chain::{
19 ChainStore,
20 index::{ChainIndex, ResolveNullTipset},
21};
22use crate::interpreter::{
23 BlockMessages, CalledAt, ExecutionContext, IMPLICIT_MESSAGE_GAS_LIMIT, VM, resolve_to_key_addr,
24};
25use crate::interpreter::{MessageCallbackCtx, VMTrace};
26use crate::lotus_json::{LotusJson, lotus_json_with_self};
27use crate::message::{ChainMessage, MessageRead as _, MessageReadWrite as _, SignedMessage};
28use crate::networks::ChainConfig;
29use crate::rpc::state::{ApiInvocResult, InvocResult, MessageGasCost};
30use crate::rpc::types::{MiningBaseInfo, SectorOnChainInfo};
31use crate::shim::actors::init::{self, State};
32use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition};
33use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim};
34use crate::shim::actors::*;
35use crate::shim::address::AddressId;
36use crate::shim::crypto::{Signature, SignatureType};
37use crate::shim::{
38 actors::{
39 LoadActorStateFromBlockstore, miner::ext::MinerStateExt as _,
40 verifreg::ext::VerifiedRegistryStateExt as _,
41 },
42 executor::{ApplyRet, Receipt, StampedEvent},
43};
44use crate::shim::{
45 address::{Address, Payload, Protocol},
46 clock::ChainEpoch,
47 econ::TokenAmount,
48 machine::{GLOBAL_MULTI_ENGINE, MultiEngine},
49 message::Message,
50 randomness::Randomness,
51 runtime::Policy,
52 state_tree::{ActorState, StateTree},
53 version::NetworkVersion,
54};
55use crate::state_manager::cache::TipsetStateCache;
56use crate::state_manager::chain_rand::draw_randomness;
57use crate::state_migration::run_state_migrations;
58use crate::utils::ShallowClone as _;
59use crate::utils::cache::SizeTrackingLruCache;
60use crate::utils::get_size::{GetSize, vec_heap_size_helper};
61use ahash::{HashMap, HashMapExt};
62use anyhow::{Context as _, bail, ensure};
63use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _};
64use chain_rand::ChainRand;
65use cid::Cid;
66pub use circulating_supply::GenesisInfo;
67use fil_actor_verifreg_state::v12::DataCap;
68use fil_actor_verifreg_state::v13::ClaimID;
69use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0};
70use fil_actors_shared::fvm_ipld_bitfield::BitField;
71use fil_actors_shared::v12::runtime::DomainSeparationTag;
72use futures::{FutureExt, channel::oneshot, select};
73use fvm_ipld_blockstore::Blockstore;
74use fvm_ipld_encoding::to_vec;
75use fvm_shared4::crypto::signature::SECP_SIG_LEN;
76use itertools::Itertools as _;
77use nonzero_ext::nonzero;
78use num::BigInt;
79use num_traits::identities::Zero;
80use schemars::JsonSchema;
81use serde::{Deserialize, Serialize};
82use std::ops::RangeInclusive;
83use std::time::Duration;
84use std::{num::NonZeroUsize, sync::Arc};
85use tokio::sync::{RwLock, broadcast::error::RecvError};
86use tracing::{error, info, instrument, warn};
87
88const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
89const DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
90pub const EVENTS_AMT_BITWIDTH: u32 = 5;
91pub type IdToAddressCache = SizeTrackingLruCache<AddressId, Address>;
92
93#[derive(Debug, Clone)]
98pub struct ExecutedMessage {
99 pub message: ChainMessage,
100 pub receipt: Receipt,
101 pub events: Option<Vec<StampedEvent>>,
102}
103
104impl GetSize for ExecutedMessage {
105 fn get_heap_size(&self) -> usize {
106 self.message.get_heap_size()
107 + self.receipt.get_heap_size()
108 + self
109 .events
110 .as_ref()
111 .map(vec_heap_size_helper)
112 .unwrap_or_default()
113 }
114}
115
116#[derive(Debug, Clone, GetSize)]
118pub struct ExecutedTipset {
119 #[get_size(ignore)]
121 pub state_root: Cid,
122 #[get_size(ignore)]
124 pub receipt_root: Cid,
125 pub executed_messages: Arc<Vec<ExecutedMessage>>,
128}
129
130#[derive(Debug, Clone, GetSize)]
132pub struct TipsetState {
133 #[get_size(ignore)]
135 pub state_root: Cid,
136 #[allow(dead_code)]
138 #[get_size(ignore)]
139 pub receipt_root: Cid,
140}
141
142impl From<ExecutedTipset> for TipsetState {
143 fn from(
144 ExecutedTipset {
145 state_root,
146 receipt_root,
147 ..
148 }: ExecutedTipset,
149 ) -> Self {
150 Self {
151 state_root,
152 receipt_root,
153 }
154 }
155}
156
157impl From<&ExecutedTipset> for TipsetState {
158 fn from(
159 ExecutedTipset {
160 state_root,
161 receipt_root,
162 ..
163 }: &ExecutedTipset,
164 ) -> Self {
165 Self {
166 state_root: *state_root,
167 receipt_root: *receipt_root,
168 }
169 }
170}
171
172#[derive(
174 Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema,
175)]
176#[serde(rename_all = "PascalCase")]
177pub struct MarketBalance {
178 #[schemars(with = "LotusJson<TokenAmount>")]
179 #[serde(with = "crate::lotus_json")]
180 pub escrow: TokenAmount,
181 #[schemars(with = "LotusJson<TokenAmount>")]
182 #[serde(with = "crate::lotus_json")]
183 pub locked: TokenAmount,
184}
185lotus_json_with_self!(MarketBalance);
186
187pub struct StateManager<DB> {
193 cs: Arc<ChainStore<DB>>,
195 cache: TipsetStateCache<ExecutedTipset>,
197 id_to_deterministic_address_cache: IdToAddressCache,
198 beacon: Arc<crate::beacon::BeaconSchedule>,
199 engine: Arc<MultiEngine>,
200}
201
202#[allow(clippy::type_complexity)]
203pub const NO_CALLBACK: Option<fn(MessageCallbackCtx<'_>) -> anyhow::Result<()>> = None;
204
205impl<DB> StateManager<DB>
206where
207 DB: Blockstore,
208{
209 pub fn new(cs: Arc<ChainStore<DB>>) -> anyhow::Result<Self> {
210 Self::new_with_engine(cs, GLOBAL_MULTI_ENGINE.clone())
211 }
212
213 pub fn new_with_engine(
214 cs: Arc<ChainStore<DB>>,
215 engine: Arc<MultiEngine>,
216 ) -> anyhow::Result<Self> {
217 let genesis = cs.genesis_block_header();
218 let beacon = Arc::new(cs.chain_config().get_beacon_schedule(genesis.timestamp));
219
220 Ok(Self {
221 cs,
222 cache: TipsetStateCache::new("executed_tipset"), beacon,
224 engine,
225 id_to_deterministic_address_cache: SizeTrackingLruCache::new_with_metrics(
226 "id_to_deterministic_address".into(),
227 DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE,
228 ),
229 })
230 }
231
232 pub fn heaviest_tipset(&self) -> Tipset {
234 self.chain_store().heaviest_tipset()
235 }
236
237 pub fn maybe_rewind_heaviest_tipset(&self) -> anyhow::Result<()> {
242 while self.maybe_rewind_heaviest_tipset_once()? {}
243 Ok(())
244 }
245
246 fn maybe_rewind_heaviest_tipset_once(&self) -> anyhow::Result<bool> {
247 let head = self.heaviest_tipset();
248 if let Some(info) = self
249 .chain_config()
250 .network_height_with_actor_bundle(head.epoch())
251 {
252 let expected_height_info = info.info;
253 let expected_bundle = info.manifest(self.blockstore())?;
254 let expected_bundle_metadata = expected_bundle.metadata()?;
255 let state = self.get_state_tree(head.parent_state())?;
256 let bundle_metadata = state.get_actor_bundle_metadata()?;
257 if expected_bundle_metadata != bundle_metadata {
258 let current_epoch = head.epoch();
259 let target_head = self.chain_index().load_required_tipset_by_height(
260 (expected_height_info.epoch - 1).max(0),
261 head,
262 ResolveNullTipset::TakeOlder,
263 )?;
264 let target_epoch = target_head.epoch();
265 let bundle_version = &bundle_metadata.version;
266 let expected_bundle_version = &expected_bundle_metadata.version;
267 if target_epoch < current_epoch {
268 tracing::warn!(
269 "rewinding chain head from {current_epoch} to {target_epoch}, actor bundle: {bundle_version}, expected: {expected_bundle_version}"
270 );
271 if self.blockstore().has(target_head.parent_state())? {
272 self.chain_store().set_heaviest_tipset(target_head)?;
273 return Ok(true);
274 } else {
275 anyhow::bail!(
276 "failed to rewind, state tree @ {target_epoch} is missing from blockstore: {}",
277 target_head.parent_state()
278 );
279 }
280 }
281 }
282 }
283 Ok(false)
284 }
285
286 pub fn beacon_schedule(&self) -> &Arc<BeaconSchedule> {
287 &self.beacon
288 }
289
290 pub fn get_network_version(&self, epoch: ChainEpoch) -> NetworkVersion {
292 self.chain_config().network_version(epoch)
293 }
294
295 pub fn get_state_tree(&self, state_cid: &Cid) -> anyhow::Result<StateTree<DB>> {
297 StateTree::new_from_root(self.blockstore_owned(), state_cid)
298 }
299
300 pub fn get_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<Option<ActorState>> {
302 let state = self.get_state_tree(&state_cid)?;
303 state.get_actor(addr)
304 }
305
306 pub fn get_actor_state<S: LoadActorStateFromBlockstore>(
308 &self,
309 ts: &Tipset,
310 ) -> anyhow::Result<S> {
311 let state_tree = self.get_state_tree(ts.parent_state())?;
312 state_tree.get_actor_state()
313 }
314
315 pub fn get_actor_state_from_address<S: LoadActorStateFromBlockstore>(
317 &self,
318 ts: &Tipset,
319 actor_address: &Address,
320 ) -> anyhow::Result<S> {
321 let state_tree = self.get_state_tree(ts.parent_state())?;
322 state_tree.get_actor_state_from_address(actor_address)
323 }
324
325 pub fn get_required_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<ActorState> {
327 let state = self.get_state_tree(&state_cid)?;
328 state.get_actor(addr)?.with_context(|| {
329 format!("Failed to load actor with addr={addr}, state_cid={state_cid}")
330 })
331 }
332
333 pub fn blockstore(&self) -> &Arc<DB> {
335 self.cs.blockstore()
336 }
337
338 pub fn blockstore_owned(&self) -> Arc<DB> {
339 self.blockstore().clone()
340 }
341
342 pub fn chain_store(&self) -> &Arc<ChainStore<DB>> {
344 &self.cs
345 }
346
347 pub fn chain_index(&self) -> &ChainIndex<DB> {
349 self.cs.chain_index()
350 }
351
352 pub fn chain_config(&self) -> &Arc<ChainConfig> {
354 self.cs.chain_config()
355 }
356
357 pub fn chain_rand(&self, tipset: Tipset) -> ChainRand<DB> {
358 ChainRand::new(
359 self.chain_config().shallow_clone(),
360 tipset,
361 self.chain_index().shallow_clone(),
362 self.beacon.shallow_clone(),
363 )
364 }
365
366 pub fn get_network_state_name(
368 &self,
369 state_cid: Cid,
370 ) -> anyhow::Result<crate::networks::StateNetworkName> {
371 let init_act = self
372 .get_actor(&init::ADDRESS.into(), state_cid)?
373 .ok_or_else(|| Error::state("Init actor address could not be resolved"))?;
374 Ok(
375 State::load(self.blockstore(), init_act.code, init_act.state)?
376 .into_network_name()
377 .into(),
378 )
379 }
380
381 pub fn is_miner_slashed(&self, addr: &Address, state_cid: &Cid) -> anyhow::Result<bool, Error> {
383 let actor = self
384 .get_actor(&Address::POWER_ACTOR, *state_cid)?
385 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
386
387 let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
388
389 Ok(spas.miner_power(self.blockstore(), addr)?.is_none())
390 }
391
392 pub fn get_miner_work_addr(&self, state_cid: Cid, addr: &Address) -> Result<Address, Error> {
394 let state =
395 StateTree::new_from_root(self.blockstore_owned(), &state_cid).map_err(Error::other)?;
396 let ms: miner::State = state.get_actor_state_from_address(addr)?;
397 let info = ms.info(self.blockstore()).map_err(|e| e.to_string())?;
398 let addr = resolve_to_key_addr(&state, self.blockstore(), &info.worker())?;
399 Ok(addr)
400 }
401
402 pub fn get_power(
405 &self,
406 state_cid: &Cid,
407 addr: Option<&Address>,
408 ) -> anyhow::Result<Option<(power::Claim, power::Claim)>, Error> {
409 let actor = self
410 .get_actor(&Address::POWER_ACTOR, *state_cid)?
411 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
412
413 let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
414
415 let t_pow = spas.total_power();
416
417 if let Some(maddr) = addr {
418 let m_pow = spas
419 .miner_power(self.blockstore(), maddr)?
420 .ok_or_else(|| Error::state(format!("Miner for address {maddr} not found")))?;
421
422 let min_pow = spas.miner_nominal_power_meets_consensus_minimum(
423 &self.chain_config().policy,
424 self.blockstore(),
425 maddr,
426 )?;
427 if min_pow {
428 return Ok(Some((m_pow, t_pow)));
429 }
430 }
431
432 Ok(None)
433 }
434
435 pub fn get_all_sectors(
437 &self,
438 addr: &Address,
439 ts: &Tipset,
440 ) -> anyhow::Result<Vec<SectorOnChainInfo>> {
441 let actor = self
442 .get_actor(addr, *ts.parent_state())?
443 .ok_or_else(|| Error::state("Miner actor not found"))?;
444 let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
445 state.load_sectors_ext(self.blockstore(), None)
446 }
447}
448
449impl<DB> StateManager<DB>
450where
451 DB: Blockstore + Send + Sync + 'static,
452{
453 pub async fn load_tipset_state(self: &Arc<Self>, ts: &Tipset) -> anyhow::Result<TipsetState> {
455 if let Some(state) = self.cache.get_map(ts.key(), |et| et.into()) {
456 Ok(state)
457 } else {
458 match self.chain_store().load_child_tipset(ts)? {
459 Some(receipt_ts) => Ok(TipsetState {
460 state_root: *receipt_ts.parent_state(),
461 receipt_root: *receipt_ts.parent_message_receipts(),
462 }),
463 None => Ok(self.load_executed_tipset(ts).await?.into()),
464 }
465 }
466 }
467
468 pub async fn load_executed_tipset(
470 self: &Arc<Self>,
471 ts: &Tipset,
472 ) -> anyhow::Result<ExecutedTipset> {
473 if ts.epoch() >= self.heaviest_tipset().epoch()
475 && let Some(cached) = self.cache.get(ts.key())
476 {
477 if StateTree::new_from_root(self.blockstore_owned(), &cached.state_root).is_ok() {
478 return Ok(cached);
479 } else {
480 self.cache.remove(ts.key());
481 }
482 }
483 self.cache
484 .get_or_else(ts.key(), || async move {
485 let receipt_ts = self.chain_store().load_child_tipset(ts)?;
486 self.load_executed_tipset_inner(ts, receipt_ts.as_ref())
487 .await
488 })
489 .await
490 }
491
492 async fn load_executed_tipset_inner(
493 self: &Arc<Self>,
494 msg_ts: &Tipset,
495 receipt_ts: Option<&Tipset>,
497 ) -> anyhow::Result<ExecutedTipset> {
498 if let Some(receipt_ts) = receipt_ts {
499 anyhow::ensure!(
500 msg_ts.key() == receipt_ts.parents(),
501 "message tipset should be the parent of message receipt tipset"
502 );
503 }
504 let mut recomputed = false;
505 let (state_root, receipt_root, receipts) = match receipt_ts.and_then(|ts| {
506 let receipt_root = *ts.parent_message_receipts();
507 Receipt::get_receipts(self.cs.blockstore(), receipt_root)
508 .ok()
509 .map(|r| (*ts.parent_state(), receipt_root, r))
510 }) {
511 Some((state_root, receipt_root, receipts)) => (state_root, receipt_root, receipts),
512 None => {
513 let state_output = self
514 .compute_tipset_state(msg_ts.shallow_clone(), NO_CALLBACK, VMTrace::NotTraced)
515 .await?;
516 recomputed = true;
517 (
518 state_output.state_root,
519 state_output.receipt_root,
520 Receipt::get_receipts(self.cs.blockstore(), state_output.receipt_root)?,
521 )
522 }
523 };
524
525 let messages = self.chain_store().messages_for_tipset(msg_ts)?;
526 anyhow::ensure!(
527 messages.len() == receipts.len(),
528 "mismatching message and receipt counts ({} messages, {} receipts)",
529 messages.len(),
530 receipts.len()
531 );
532 let mut executed_messages = Vec::with_capacity(messages.len());
533 for (message, receipt) in messages.iter().cloned().zip(receipts) {
534 let events = if let Some(events_root) = receipt.events_root() {
535 Some(
536 match StampedEvent::get_events(self.cs.blockstore(), &events_root) {
537 Ok(events) => events,
538 Err(e) if recomputed => return Err(e),
539 Err(_) => {
540 self.compute_tipset_state(
541 msg_ts.shallow_clone(),
542 NO_CALLBACK,
543 VMTrace::NotTraced,
544 )
545 .await?;
546 recomputed = true;
547 StampedEvent::get_events(self.cs.blockstore(), &events_root)?
548 }
549 },
550 )
551 } else {
552 None
553 };
554 executed_messages.push(ExecutedMessage {
555 message,
556 receipt,
557 events,
558 });
559 }
560 Ok(ExecutedTipset {
561 state_root,
562 receipt_root,
563 executed_messages: Arc::new(executed_messages),
564 })
565 }
566
567 #[instrument(skip(self, rand))]
568 fn call_raw(
569 &self,
570 state_cid: Option<Cid>,
571 msg: &Message,
572 rand: ChainRand<DB>,
573 tipset: &Tipset,
574 ) -> Result<ApiInvocResult, Error> {
575 let mut msg = msg.clone();
576
577 let state_cid = state_cid.unwrap_or(*tipset.parent_state());
578
579 let tipset_messages = self
580 .chain_store()
581 .messages_for_tipset(tipset)
582 .map_err(|err| Error::Other(err.to_string()))?;
583
584 let prior_messsages = tipset_messages
585 .iter()
586 .filter(|ts_msg| ts_msg.message().from() == msg.from());
587
588 let height = tipset.epoch();
591 let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
592 let mut vm = VM::new(
593 ExecutionContext {
594 heaviest_tipset: tipset.shallow_clone(),
595 state_tree_root: state_cid,
596 epoch: height,
597 rand: Box::new(rand),
598 base_fee: tipset.block_headers().first().parent_base_fee.clone(),
599 circ_supply: genesis_info.get_vm_circulating_supply(
600 height,
601 self.blockstore(),
602 &state_cid,
603 )?,
604 chain_config: self.chain_config().shallow_clone(),
605 chain_index: self.chain_index().shallow_clone(),
606 timestamp: tipset.min_timestamp(),
607 },
608 &self.engine,
609 VMTrace::Traced,
610 )?;
611
612 for m in prior_messsages {
613 vm.apply_message(m)?;
614 }
615
616 let state_cid = vm.flush()?;
619
620 let state = StateTree::new_from_root(self.blockstore_owned(), &state_cid)?;
621
622 let from_actor = state
623 .get_actor(&msg.from())?
624 .ok_or_else(|| anyhow::anyhow!("actor not found"))?;
625 msg.set_sequence(from_actor.sequence);
626
627 let mut msg = msg.clone();
629 msg.gas_limit = IMPLICIT_MESSAGE_GAS_LIMIT as u64;
630
631 let (apply_ret, duration) = vm.apply_implicit_message(&msg)?;
632
633 Ok(ApiInvocResult {
634 msg: msg.clone(),
635 msg_rct: Some(apply_ret.msg_receipt()),
636 msg_cid: msg.cid(),
637 error: apply_ret.failure_info().unwrap_or_default(),
638 duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
639 gas_cost: MessageGasCost::default(),
640 execution_trace: structured::parse_events(apply_ret.exec_trace()).unwrap_or_default(),
641 })
642 }
643
644 pub fn call(&self, message: &Message, tipset: Option<Tipset>) -> Result<ApiInvocResult, Error> {
647 let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
648 let chain_rand = self.chain_rand(ts.shallow_clone());
649 self.call_raw(None, message, chain_rand, &ts)
650 }
651
652 pub fn call_on_state(
655 &self,
656 state_cid: Cid,
657 message: &Message,
658 tipset: Option<Tipset>,
659 ) -> Result<ApiInvocResult, Error> {
660 let ts = tipset.unwrap_or_else(|| self.cs.heaviest_tipset());
661 let chain_rand = self.chain_rand(ts.shallow_clone());
662 self.call_raw(Some(state_cid), message, chain_rand, &ts)
663 }
664
665 pub async fn apply_on_state_with_gas(
666 self: &Arc<Self>,
667 tipset: Option<Tipset>,
668 msg: Message,
669 vm_flush: VMFlush,
670 ) -> anyhow::Result<(ApiInvocResult, Option<Cid>)> {
671 let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
672
673 let from_a = self.resolve_to_key_addr(&msg.from, &ts).await?;
674
675 let mut chain_msg = match from_a.protocol() {
679 Protocol::Secp256k1 => SignedMessage::new_unchecked(
680 msg.clone(),
681 Signature::new_secp256k1(vec![0; SECP_SIG_LEN]),
682 )
683 .into(),
684 Protocol::Delegated => SignedMessage::new_unchecked(
685 msg.clone(),
686 Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]),
689 )
690 .into(),
691 _ => msg.clone().into(),
692 };
693
694 let (_invoc_res, apply_ret, duration, state_root) = self
695 .call_with_gas(&mut chain_msg, &[], Some(ts), vm_flush)
696 .await?;
697
698 Ok((
699 ApiInvocResult {
700 msg_cid: msg.cid(),
701 msg,
702 msg_rct: Some(apply_ret.msg_receipt()),
703 error: apply_ret.failure_info().unwrap_or_default(),
704 duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
705 gas_cost: MessageGasCost::default(),
706 execution_trace: structured::parse_events(apply_ret.exec_trace())
707 .unwrap_or_default(),
708 },
709 state_root,
710 ))
711 }
712
713 pub async fn call_with_gas(
716 self: &Arc<Self>,
717 message: &mut ChainMessage,
718 prior_messages: &[ChainMessage],
719 tipset: Option<Tipset>,
720 vm_flush: VMFlush,
721 ) -> Result<(InvocResult, ApplyRet, Duration, Option<Cid>), Error> {
722 let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
723 let TipsetState { state_root, .. } = self
724 .load_tipset_state(&ts)
725 .await
726 .map_err(|e| Error::Other(format!("Could not load tipset state: {e:#}")))?;
727 let chain_rand = self.chain_rand(ts.clone());
728
729 let epoch = ts.epoch() + 1;
732 let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
733 let (ret, duration, state_cid) = stacker::grow(64 << 20, || -> anyhow::Result<_> {
736 let mut vm = VM::new(
737 ExecutionContext {
738 heaviest_tipset: ts.clone(),
739 state_tree_root: state_root,
740 epoch,
741 rand: Box::new(chain_rand),
742 base_fee: ts.block_headers().first().parent_base_fee.clone(),
743 circ_supply: genesis_info.get_vm_circulating_supply(
744 epoch,
745 self.blockstore(),
746 &state_root,
747 )?,
748 chain_config: self.chain_config().shallow_clone(),
749 chain_index: self.chain_index().shallow_clone(),
750 timestamp: ts.min_timestamp(),
751 },
752 &self.engine,
753 VMTrace::NotTraced,
754 )?;
755
756 for msg in prior_messages {
757 vm.apply_message(msg)?;
758 }
759 let from_actor = vm
760 .get_actor(&message.from())
761 .map_err(|e| Error::Other(format!("Could not get actor from state: {e:#}")))?
762 .ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?;
763
764 message.set_sequence(from_actor.sequence);
765 let (ret, duration) = vm.apply_message(message)?;
766 let state_root = match vm_flush {
767 VMFlush::Flush => Some(vm.flush()?),
768 VMFlush::Skip => None,
769 };
770 Ok((ret, duration, state_root))
771 })?;
772
773 Ok((
774 InvocResult::new(message.message().clone(), &ret),
775 ret,
776 duration,
777 state_cid,
778 ))
779 }
780
781 pub async fn replay(self: &Arc<Self>, ts: Tipset, mcid: Cid) -> Result<ApiInvocResult, Error> {
784 let this = Arc::clone(self);
785 tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid)).await?
786 }
787
788 pub fn replay_blocking(
790 self: &Arc<Self>,
791 ts: Tipset,
792 mcid: Cid,
793 ) -> Result<ApiInvocResult, Error> {
794 const REPLAY_HALT: &str = "replay_halt";
795
796 let mut api_invoc_result = None;
797 let callback = |ctx: MessageCallbackCtx<'_>| {
798 match ctx.at {
799 CalledAt::Applied | CalledAt::Reward
800 if api_invoc_result.is_none() && ctx.cid == mcid =>
801 {
802 api_invoc_result = Some(ApiInvocResult {
803 msg_cid: ctx.message.cid(),
804 msg: ctx.message.message().clone(),
805 msg_rct: Some(ctx.apply_ret.msg_receipt()),
806 error: ctx.apply_ret.failure_info().unwrap_or_default(),
807 duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
808 gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
809 execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
810 .unwrap_or_default(),
811 });
812 anyhow::bail!(REPLAY_HALT);
813 }
814 _ => Ok(()), }
816 };
817 let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced);
818 if let Err(error_message) = result
819 && error_message.to_string() != REPLAY_HALT
820 {
821 return Err(Error::Other(format!(
822 "unexpected error during execution : {error_message:}"
823 )));
824 }
825 api_invoc_result.ok_or_else(|| Error::Other("failed to replay".into()))
826 }
827
828 pub async fn replay_for_prestate(
831 self: &Arc<Self>,
832 ts: Tipset,
833 target_message_cid: Cid,
834 ) -> Result<(Cid, ApiInvocResult, Cid), Error> {
835 let this = Arc::clone(self);
836 tokio::task::spawn_blocking(move || {
837 this.replay_for_prestate_blocking(ts, target_message_cid)
838 })
839 .await
840 .map_err(|e| Error::Other(format!("{e}")))?
841 }
842
843 fn replay_for_prestate_blocking(
844 self: &Arc<Self>,
845 ts: Tipset,
846 target_msg_cid: Cid,
847 ) -> Result<(Cid, ApiInvocResult, Cid), Error> {
848 if ts.epoch() == 0 {
849 return Err(Error::Other(
850 "cannot trace messages in the genesis block".into(),
851 ));
852 }
853
854 let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
855 let exec = TipsetExecutor::new(
856 self.chain_index().shallow_clone(),
857 self.chain_config().shallow_clone(),
858 self.beacon_schedule().shallow_clone(),
859 &self.engine,
860 ts.shallow_clone(),
861 );
862 let mut no_cb = NO_CALLBACK;
863 let (parent_state, epoch, block_messages) =
864 exec.prepare_parent_state(genesis_timestamp, VMTrace::NotTraced, &mut no_cb)?;
865
866 Ok(stacker::grow(64 << 20, || {
867 let mut vm =
868 exec.create_vm(parent_state, epoch, ts.min_timestamp(), VMTrace::NotTraced)?;
869 let mut processed = ahash::HashSet::default();
870
871 for block in block_messages.iter() {
872 let mut penalty = TokenAmount::zero();
873 let mut gas_reward = TokenAmount::zero();
874
875 for msg in block.messages.iter() {
876 let cid = msg.cid();
877 if processed.contains(&cid) {
878 continue;
879 }
880
881 processed.insert(cid);
882
883 if cid == target_msg_cid {
884 let pre_root = vm.flush()?;
885 let mut traced_vm =
886 exec.create_vm(pre_root, epoch, ts.min_timestamp(), VMTrace::Traced)?;
887 let (ret, duration) = traced_vm.apply_message(msg)?;
888 let post_root = traced_vm.flush()?;
889
890 return Ok((
891 pre_root,
892 ApiInvocResult {
893 msg_cid: cid,
894 msg: msg.message().clone(),
895 msg_rct: Some(ret.msg_receipt()),
896 error: ret.failure_info().unwrap_or_default(),
897 duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
898 gas_cost: MessageGasCost::default(),
899 execution_trace: structured::parse_events(ret.exec_trace())
900 .unwrap_or_default(),
901 },
902 post_root,
903 ));
904 }
905
906 let (ret, _) = vm.apply_message(msg)?;
907 gas_reward += ret.miner_tip();
908 penalty += ret.penalty();
909 }
910
911 if let Some(rew_msg) =
912 vm.reward_message(epoch, block.miner, block.win_count, penalty, gas_reward)?
913 {
914 let (ret, _) = vm.apply_implicit_message(&rew_msg)?;
915 if let Some(err) = ret.failure_info() {
916 bail!(
917 "failed to apply reward message for miner {}: {err}",
918 block.miner
919 );
920 }
921
922 if !ret.msg_receipt().exit_code().is_success() {
924 bail!(
925 "reward application message failed (exit: {:?})",
926 ret.msg_receipt().exit_code()
927 );
928 }
929 }
930 }
931
932 bail!("message {target_msg_cid} not found in tipset")
933 })?)
934 }
935
936 pub fn eligible_to_mine(
939 &self,
940 address: &Address,
941 base_tipset: &Tipset,
942 lookback_tipset: &Tipset,
943 ) -> anyhow::Result<bool, Error> {
944 let hmp =
945 self.miner_has_min_power(&self.chain_config().policy, address, lookback_tipset)?;
946 let version = self.get_network_version(base_tipset.epoch());
947
948 if version <= NetworkVersion::V3 {
949 return Ok(hmp);
950 }
951
952 if !hmp {
953 return Ok(false);
954 }
955
956 let actor = self
957 .get_actor(&Address::POWER_ACTOR, *base_tipset.parent_state())?
958 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
959
960 let power_state = power::State::load(self.blockstore(), actor.code, actor.state)?;
961
962 let actor = self
963 .get_actor(address, *base_tipset.parent_state())?
964 .ok_or_else(|| Error::state("Miner actor address could not be resolved"))?;
965
966 let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
967
968 let claim = power_state
970 .miner_power(self.blockstore(), address)?
971 .ok_or_else(|| Error::Other("Could not get claim".to_string()))?;
972 if claim.quality_adj_power <= BigInt::zero() {
973 return Ok(false);
974 }
975
976 if !miner_state.fee_debt().is_zero() {
978 return Ok(false);
979 }
980
981 let info = miner_state.info(self.blockstore())?;
983 if base_tipset.epoch() <= info.consensus_fault_elapsed {
984 return Ok(false);
985 }
986
987 Ok(true)
988 }
989
990 pub async fn compute_tipset_state(
1012 self: &Arc<Self>,
1013 tipset: Tipset,
1014 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
1015 enable_tracing: VMTrace,
1016 ) -> Result<ExecutedTipset, Error> {
1017 let this = Arc::clone(self);
1018 tokio::task::spawn_blocking(move || {
1019 this.compute_tipset_state_blocking(tipset, callback, enable_tracing)
1020 })
1021 .await?
1022 }
1023
1024 pub fn compute_tipset_state_blocking(
1026 &self,
1027 tipset: Tipset,
1028 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
1029 enable_tracing: VMTrace,
1030 ) -> Result<ExecutedTipset, Error> {
1031 let epoch = tipset.epoch();
1032 let has_callback = callback.is_some();
1033 info!(
1034 "Evaluating tipset: EPOCH={epoch}, blocks={}, tsk={}",
1035 tipset.len(),
1036 tipset.key(),
1037 );
1038 Ok(apply_block_messages(
1039 self.chain_store().genesis_block_header().timestamp,
1040 self.chain_index().shallow_clone(),
1041 self.chain_config().shallow_clone(),
1042 self.beacon_schedule().shallow_clone(),
1043 &self.engine,
1044 tipset,
1045 callback,
1046 enable_tracing,
1047 )
1048 .map_err(|e| {
1049 if has_callback {
1050 e
1051 } else {
1052 e.context(format!("Failed to compute tipset state@{epoch}"))
1053 }
1054 })?)
1055 }
1056
1057 #[instrument(skip_all)]
1058 pub async fn compute_state(
1059 self: &Arc<Self>,
1060 height: ChainEpoch,
1061 messages: Vec<Message>,
1062 tipset: Tipset,
1063 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
1064 enable_tracing: VMTrace,
1065 ) -> Result<ExecutedTipset, Error> {
1066 let this = Arc::clone(self);
1067 tokio::task::spawn_blocking(move || {
1068 this.compute_state_blocking(height, messages, tipset, callback, enable_tracing)
1069 })
1070 .await?
1071 }
1072
1073 #[tracing::instrument(skip_all)]
1075 pub fn compute_state_blocking(
1076 &self,
1077 height: ChainEpoch,
1078 messages: Vec<Message>,
1079 tipset: Tipset,
1080 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
1081 enable_tracing: VMTrace,
1082 ) -> Result<ExecutedTipset, Error> {
1083 Ok(compute_state(
1084 height,
1085 messages,
1086 tipset,
1087 self.chain_store().genesis_block_header().timestamp,
1088 self.chain_index().shallow_clone(),
1089 self.chain_config().shallow_clone(),
1090 self.beacon_schedule().shallow_clone(),
1091 &self.engine,
1092 callback,
1093 enable_tracing,
1094 )?)
1095 }
1096
1097 fn tipset_executed_message(
1100 &self,
1101 tipset: &Tipset,
1102 message: &ChainMessage,
1103 allow_replaced: bool,
1104 ) -> Result<Option<Receipt>, Error> {
1105 if tipset.epoch() == 0 {
1106 return Ok(None);
1107 }
1108 let message_from_address = message.from();
1109 let message_sequence = message.sequence();
1110 let pts = self
1112 .chain_index()
1113 .load_required_tipset(tipset.parents())
1114 .map_err(|err| Error::Other(format!("Failed to load tipset: {err}")))?;
1115 let messages = self
1116 .cs
1117 .messages_for_tipset(&pts)
1118 .map_err(|err| Error::Other(format!("Failed to load messages for tipset: {err}")))?;
1119 messages
1120 .iter()
1121 .enumerate()
1122 .rev()
1124 .filter(|(_, s)| {
1125 s.sequence() == message_sequence
1126 && s.from() == message_from_address
1127 && s.equal_call(message)
1128 })
1129 .map(|(index, m)| {
1130 if !allow_replaced && message.cid() != m.cid(){
1134 Err(Error::Other(format!(
1135 "found message with equal nonce and call params but different CID. wanted {}, found: {}, nonce: {}, from: {}",
1136 message.cid(),
1137 m.cid(),
1138 message.sequence(),
1139 message.from(),
1140 )))
1141 } else {
1142 let block_header = tipset.block_headers().first();
1143 crate::chain::get_parent_receipt(
1144 self.blockstore(),
1145 block_header,
1146 index,
1147 )
1148 .map_err(|err| Error::Other(format!("Failed to get parent receipt (message_receipts={}, index={index}, error={err})", block_header.message_receipts)))
1149 }
1150 })
1151 .next()
1152 .unwrap_or(Ok(None))
1153 }
1154
1155 fn check_search(
1156 &self,
1157 mut current: Tipset,
1158 message: &ChainMessage,
1159 lookback_max_epoch: ChainEpoch,
1160 allow_replaced: bool,
1161 ) -> Result<Option<(Tipset, Receipt)>, Error> {
1162 let message_from_address = message.from();
1163 let message_sequence = message.sequence();
1164 let mut current_actor_state = self
1165 .get_required_actor(&message_from_address, *current.parent_state())
1166 .map_err(Error::state)?;
1167 let message_from_id = self.lookup_required_id(&message_from_address, ¤t)?;
1168
1169 while current.epoch() >= lookback_max_epoch {
1170 let parent_tipset = self
1171 .chain_index()
1172 .load_required_tipset(current.parents())
1173 .map_err(|err| {
1174 Error::Other(format!(
1175 "failed to load tipset during msg wait searchback: {err:}"
1176 ))
1177 })?;
1178
1179 let parent_actor_state = self
1180 .get_actor(&message_from_id, *parent_tipset.parent_state())
1181 .map_err(|e| Error::State(e.to_string()))?;
1182
1183 if parent_actor_state.is_none()
1184 || (current_actor_state.sequence > message_sequence
1185 && parent_actor_state.as_ref().unwrap().sequence <= message_sequence)
1186 {
1187 let receipt = self
1188 .tipset_executed_message(¤t, message, allow_replaced)?
1189 .context("Failed to get receipt with tipset_executed_message")?;
1190 return Ok(Some((current, receipt)));
1191 }
1192
1193 if let Some(parent_actor_state) = parent_actor_state {
1194 current = parent_tipset;
1195 current_actor_state = parent_actor_state;
1196 } else {
1197 break;
1198 }
1199 }
1200
1201 Ok(None)
1202 }
1203
1204 fn search_back_for_message(
1206 &self,
1207 current: Tipset,
1208 message: &ChainMessage,
1209 look_back_limit: Option<i64>,
1210 allow_replaced: Option<bool>,
1211 ) -> Result<Option<(Tipset, Receipt)>, Error> {
1212 let current_epoch = current.epoch();
1213 let allow_replaced = allow_replaced.unwrap_or(true);
1214
1215 let lookback_max_epoch = match look_back_limit {
1217 Some(0) => return Ok(None),
1219 Some(limit) if limit > 0 => (current_epoch - limit + 1).max(0),
1223 _ => 0,
1225 };
1226
1227 self.check_search(current, message, lookback_max_epoch, allow_replaced)
1228 }
1229
1230 pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result<Receipt, Error> {
1232 let m = crate::chain::get_chain_message(self.blockstore(), &msg)
1233 .map_err(|e| Error::Other(e.to_string()))?;
1234 let message_receipt = self.tipset_executed_message(&tipset, &m, true)?;
1235 if let Some(receipt) = message_receipt {
1236 return Ok(receipt);
1237 }
1238
1239 let maybe_tuple = self.search_back_for_message(tipset, &m, None, None)?;
1240 let message_receipt = maybe_tuple
1241 .ok_or_else(|| {
1242 Error::Other("Could not get receipt from search back message".to_string())
1243 })?
1244 .1;
1245 Ok(message_receipt)
1246 }
1247
1248 pub async fn wait_for_message(
1253 self: &Arc<Self>,
1254 msg_cid: Cid,
1255 confidence: i64,
1256 look_back_limit: Option<ChainEpoch>,
1257 allow_replaced: Option<bool>,
1258 ) -> Result<(Option<Tipset>, Option<Receipt>), Error> {
1259 let mut head_changes_rx = self.cs.subscribe_head_changes();
1260 let (sender, mut receiver) = oneshot::channel::<()>();
1261 let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1262 .map_err(|err| Error::Other(format!("failed to load message {err:}")))?;
1263 let current_tipset = self.heaviest_tipset();
1264 let maybe_message_receipt =
1265 self.tipset_executed_message(¤t_tipset, &message, true)?;
1266 if let Some(r) = maybe_message_receipt {
1267 return Ok((Some(current_tipset.shallow_clone()), Some(r)));
1268 }
1269
1270 let mut candidate_tipset: Option<Tipset> = None;
1271 let mut candidate_receipt: Option<Receipt> = None;
1272
1273 let sm_cloned = self.shallow_clone();
1274
1275 let message_for_task = message.clone();
1276 let height_of_head = current_tipset.epoch();
1277 let task = tokio::task::spawn(async move {
1278 let back_tuple = sm_cloned.search_back_for_message(
1279 current_tipset,
1280 &message_for_task,
1281 look_back_limit,
1282 allow_replaced,
1283 )?;
1284 sender
1285 .send(())
1286 .map_err(|e| Error::Other(format!("Could not send to channel {e:?}")))?;
1287 Ok::<_, Error>(back_tuple)
1288 });
1289
1290 let reverts: Arc<RwLock<HashMap<TipsetKey, bool>>> = Arc::new(RwLock::new(HashMap::new()));
1291 let block_revert = reverts.clone();
1292 let sm_cloned = Arc::clone(self);
1293
1294 let mut subscriber_poll = tokio::task::spawn(async move {
1296 loop {
1297 match head_changes_rx.recv().await {
1298 Ok(head_changes) => {
1299 for tipset in head_changes.reverts {
1300 if candidate_tipset
1301 .as_ref()
1302 .is_some_and(|candidate| candidate.key() == tipset.key())
1303 {
1304 candidate_tipset = None;
1305 candidate_receipt = None;
1306 }
1307 }
1308 for tipset in head_changes.applies {
1309 if candidate_tipset
1310 .as_ref()
1311 .map(|s| tipset.epoch() >= s.epoch() + confidence)
1312 .unwrap_or_default()
1313 {
1314 return Ok((candidate_tipset, candidate_receipt));
1315 }
1316 let poll_receiver = receiver.try_recv();
1317 if let Ok(Some(_)) = poll_receiver {
1318 block_revert
1319 .write()
1320 .await
1321 .insert(tipset.key().to_owned(), true);
1322 }
1323
1324 let maybe_receipt =
1325 sm_cloned.tipset_executed_message(&tipset, &message, true)?;
1326 if let Some(receipt) = maybe_receipt {
1327 if confidence == 0 {
1328 return Ok((Some(tipset), Some(receipt)));
1329 }
1330 candidate_tipset = Some(tipset);
1331 candidate_receipt = Some(receipt)
1332 }
1333 }
1334 }
1335 Err(RecvError::Lagged(i)) => {
1336 warn!(
1337 "wait for message head change subscriber lagged, skipped {} events",
1338 i
1339 );
1340 }
1341 Err(RecvError::Closed) => break,
1342 }
1343 }
1344 Ok((None, None))
1345 })
1346 .fuse();
1347
1348 let mut search_back_poll = tokio::task::spawn(async move {
1350 let back_tuple = task.await.map_err(|e| {
1351 Error::Other(format!("Could not search backwards for message {e}"))
1352 })??;
1353 if let Some((back_tipset, back_receipt)) = back_tuple {
1354 let should_revert = *reverts
1355 .read()
1356 .await
1357 .get(back_tipset.key())
1358 .unwrap_or(&false);
1359 let larger_height_of_head = height_of_head >= back_tipset.epoch() + confidence;
1360 if !should_revert && larger_height_of_head {
1361 return Ok::<_, Error>((Some(back_tipset), Some(back_receipt)));
1362 }
1363 return Ok((None, None));
1364 }
1365 Ok((None, None))
1366 })
1367 .fuse();
1368
1369 loop {
1371 select! {
1372 res = subscriber_poll => {
1373 return res?
1374 }
1375 res = search_back_poll => {
1376 if let Ok((Some(ts), Some(rct))) = res? {
1377 return Ok((Some(ts), Some(rct)));
1378 }
1379 }
1380 }
1381 }
1382 }
1383
1384 pub async fn search_for_message(
1385 &self,
1386 from: Option<Tipset>,
1387 msg_cid: Cid,
1388 look_back_limit: Option<i64>,
1389 allow_replaced: Option<bool>,
1390 ) -> Result<Option<(Tipset, Receipt)>, Error> {
1391 let from = from.unwrap_or_else(|| self.heaviest_tipset());
1392 let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1393 .map_err(|err| Error::Other(format!("failed to load message {err}")))?;
1394 let current_tipset = self.heaviest_tipset();
1395 let maybe_message_receipt =
1396 self.tipset_executed_message(&from, &message, allow_replaced.unwrap_or(true))?;
1397 if let Some(r) = maybe_message_receipt {
1398 Ok(Some((from, r)))
1399 } else {
1400 self.search_back_for_message(current_tipset, &message, look_back_limit, allow_replaced)
1401 }
1402 }
1403
1404 pub fn get_bls_public_key(
1406 db: &Arc<DB>,
1407 addr: &Address,
1408 state_cid: Cid,
1409 ) -> Result<BlsPublicKey, Error> {
1410 let state = StateTree::new_from_root(Arc::clone(db), &state_cid)
1411 .map_err(|e| Error::Other(e.to_string()))?;
1412 let kaddr =
1413 resolve_to_key_addr(&state, db, addr).context("Failed to resolve key address")?;
1414
1415 match kaddr.into_payload() {
1416 Payload::BLS(key) => BlsPublicKey::from_bytes(&key)
1417 .context("Failed to construct bls public key")
1418 .map_err(Error::from),
1419 _ => Err(Error::state(
1420 "Address must be BLS address to load bls public key",
1421 )),
1422 }
1423 }
1424
1425 pub fn lookup_id(&self, addr: &Address, ts: &Tipset) -> Result<Option<Address>, Error> {
1427 let state_tree = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1428 .map_err(|e| format!("{e:?}"))?;
1429 Ok(state_tree
1430 .lookup_id(addr)
1431 .map_err(|e| Error::Other(e.to_string()))?
1432 .map(Address::new_id))
1433 }
1434
1435 pub fn lookup_required_id(&self, addr: &Address, ts: &Tipset) -> Result<Address, Error> {
1437 self.lookup_id(addr, ts)?
1438 .ok_or_else(|| Error::Other(format!("Failed to lookup the id address {addr}")))
1439 }
1440
1441 pub fn market_state(&self, ts: &Tipset) -> Result<market::State, Error> {
1443 let actor = self.get_required_actor(&Address::MARKET_ACTOR, *ts.parent_state())?;
1444 let market_state = market::State::load(self.blockstore(), actor.code, actor.state)?;
1445 Ok(market_state)
1446 }
1447
1448 pub fn market_balance(&self, addr: &Address, ts: &Tipset) -> Result<MarketBalance, Error> {
1450 let market_state = self.market_state(ts)?;
1451 let new_addr = self.lookup_required_id(addr, ts)?;
1452 let out = MarketBalance {
1453 escrow: {
1454 market_state
1455 .escrow_table(self.blockstore())?
1456 .get(&new_addr)?
1457 },
1458 locked: {
1459 market_state
1460 .locked_table(self.blockstore())?
1461 .get(&new_addr)?
1462 },
1463 };
1464
1465 Ok(out)
1466 }
1467
1468 pub fn miner_info(&self, addr: &Address, ts: &Tipset) -> Result<MinerInfo, Error> {
1470 let actor = self
1471 .get_actor(addr, *ts.parent_state())?
1472 .ok_or_else(|| Error::state("Miner actor not found"))?;
1473 let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1474
1475 Ok(state.info(self.blockstore())?)
1476 }
1477
1478 pub fn miner_faults(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1480 self.all_partition_sectors(addr, ts, |partition| partition.faulty_sectors().clone())
1481 }
1482
1483 pub fn miner_recoveries(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1485 self.all_partition_sectors(addr, ts, |partition| partition.recovering_sectors().clone())
1486 }
1487
1488 fn all_partition_sectors(
1489 &self,
1490 addr: &Address,
1491 ts: &Tipset,
1492 get_sector: impl Fn(Partition<'_>) -> BitField,
1493 ) -> Result<BitField, Error> {
1494 let actor = self
1495 .get_actor(addr, *ts.parent_state())?
1496 .ok_or_else(|| Error::state("Miner actor not found"))?;
1497
1498 let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1499
1500 let mut partitions = Vec::new();
1501
1502 state.for_each_deadline(
1503 &self.chain_config().policy,
1504 self.blockstore(),
1505 |_, deadline| {
1506 deadline.for_each(self.blockstore(), |_, partition| {
1507 partitions.push(get_sector(partition));
1508 Ok(())
1509 })
1510 },
1511 )?;
1512
1513 Ok(BitField::union(partitions.iter()))
1514 }
1515
1516 pub fn miner_power(&self, addr: &Address, ts: &Tipset) -> Result<MinerPower, Error> {
1518 if let Some((miner_power, total_power)) = self.get_power(ts.parent_state(), Some(addr))? {
1519 return Ok(MinerPower {
1520 miner_power,
1521 total_power,
1522 has_min_power: true,
1523 });
1524 }
1525
1526 Ok(MinerPower {
1527 has_min_power: false,
1528 miner_power: Default::default(),
1529 total_power: Default::default(),
1530 })
1531 }
1532
1533 pub async fn resolve_to_key_addr(
1536 self: &Arc<Self>,
1537 addr: &Address,
1538 ts: &Tipset,
1539 ) -> anyhow::Result<Address> {
1540 match addr.protocol() {
1541 Protocol::BLS | Protocol::Secp256k1 | Protocol::Delegated => return Ok(*addr),
1542 Protocol::Actor => {
1543 return Err(Error::Other(
1544 "cannot resolve actor address to key address".to_string(),
1545 )
1546 .into());
1547 }
1548 _ => {}
1549 };
1550
1551 let state = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())?;
1554 if let Ok(addr) = resolve_to_key_addr(&state, self.blockstore(), addr) {
1555 return Ok(addr);
1556 }
1557
1558 let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?;
1560 let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
1561
1562 resolve_to_key_addr(&state, self.blockstore(), addr)
1563 }
1564
1565 pub async fn miner_get_base_info(
1566 self: &Arc<Self>,
1567 beacon_schedule: &BeaconSchedule,
1568 tipset: Tipset,
1569 addr: Address,
1570 epoch: ChainEpoch,
1571 ) -> anyhow::Result<Option<MiningBaseInfo>> {
1572 let prev_beacon = self
1573 .chain_store()
1574 .chain_index()
1575 .latest_beacon_entry(tipset.clone())?;
1576
1577 let entries: Vec<BeaconEntry> = beacon_schedule
1578 .beacon_entries_for_block(
1579 self.chain_config().network_version(epoch),
1580 epoch,
1581 tipset.epoch(),
1582 &prev_beacon,
1583 )
1584 .await?;
1585
1586 let base = entries.last().unwrap_or(&prev_beacon);
1587
1588 let (lb_tipset, lb_state_root) = ChainStore::get_lookback_tipset_for_round(
1589 self.chain_index(),
1590 self.chain_config(),
1591 &tipset,
1592 epoch,
1593 )?;
1594
1595 let actor = self.get_required_actor(&addr, *tipset.parent_state())?;
1600 if self.get_actor(&addr, lb_state_root)?.is_none() {
1601 return Ok(None);
1602 }
1603
1604 let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1605
1606 let addr_buf = to_vec(&addr)?;
1607 let rand = draw_randomness(
1608 base.signature(),
1609 DomainSeparationTag::WinningPoStChallengeSeed as i64,
1610 epoch,
1611 &addr_buf,
1612 )?;
1613
1614 let network_version = self.chain_config().network_version(tipset.epoch());
1615 let sectors = self.get_sectors_for_winning_post(
1616 &lb_state_root,
1617 network_version,
1618 &addr,
1619 Randomness::new(rand.to_vec()),
1620 )?;
1621
1622 if sectors.is_empty() {
1623 return Ok(None);
1624 }
1625
1626 let (miner_power, total_power) = self
1627 .get_power(&lb_state_root, Some(&addr))?
1628 .context("failed to get power")?;
1629
1630 let info = miner_state.info(self.blockstore())?;
1631
1632 let worker_key = self
1633 .resolve_to_deterministic_address(info.worker, &tipset)
1634 .await?;
1635 let eligible = self.eligible_to_mine(&addr, &tipset, &lb_tipset)?;
1636
1637 Ok(Some(MiningBaseInfo {
1638 miner_power: miner_power.quality_adj_power,
1639 network_power: total_power.quality_adj_power,
1640 sectors,
1641 worker_key,
1642 sector_size: info.sector_size,
1643 prev_beacon_entry: prev_beacon,
1644 beacon_entries: entries,
1645 eligible_for_mining: eligible,
1646 }))
1647 }
1648
1649 pub fn miner_has_min_power(
1652 &self,
1653 policy: &Policy,
1654 addr: &Address,
1655 ts: &Tipset,
1656 ) -> anyhow::Result<bool> {
1657 let actor = self
1658 .get_actor(&Address::POWER_ACTOR, *ts.parent_state())?
1659 .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
1660 let ps = power::State::load(self.blockstore(), actor.code, actor.state)?;
1661
1662 ps.miner_nominal_power_meets_consensus_minimum(policy, self.blockstore(), addr)
1663 }
1664
1665 #[tracing::instrument(skip(self))]
1687 pub fn validate_range(&self, epochs: RangeInclusive<i64>) -> anyhow::Result<()> {
1688 let heaviest = self.heaviest_tipset();
1689 let heaviest_epoch = heaviest.epoch();
1690 let end = self.chain_index().load_required_tipset_by_height(
1691 *epochs.end(),
1692 heaviest,
1693 ResolveNullTipset::TakeOlder,
1694 ).with_context(|| {
1695 format!(
1696 "couldn't get a tipset at height {} behind heaviest tipset at height {heaviest_epoch}",
1697 *epochs.end(),
1698 )})?;
1699
1700 let tipsets = end
1702 .chain(self.blockstore())
1703 .take_while(|ts| ts.epoch() >= *epochs.start());
1704
1705 self.validate_tipsets(tipsets)
1706 }
1707
1708 pub fn validate_tipsets<T>(&self, tipsets: T) -> anyhow::Result<()>
1709 where
1710 T: Iterator<Item = Tipset> + Send,
1711 {
1712 let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1713 validate_tipsets(
1714 genesis_timestamp,
1715 self.chain_index(),
1716 self.chain_config(),
1717 self.beacon_schedule(),
1718 &self.engine,
1719 tipsets,
1720 )
1721 }
1722
1723 pub fn get_verified_registry_actor_state(
1724 &self,
1725 ts: &Tipset,
1726 ) -> anyhow::Result<verifreg::State> {
1727 let act = self
1728 .get_actor(&Address::VERIFIED_REGISTRY_ACTOR, *ts.parent_state())
1729 .map_err(Error::state)?
1730 .ok_or_else(|| Error::state("actor not found"))?;
1731 verifreg::State::load(self.blockstore(), act.code, act.state)
1732 }
1733 pub fn get_claim(
1734 &self,
1735 addr: &Address,
1736 ts: &Tipset,
1737 claim_id: ClaimID,
1738 ) -> anyhow::Result<Option<Claim>> {
1739 let id_address = self.lookup_required_id(addr, ts)?;
1740 let state = self.get_verified_registry_actor_state(ts)?;
1741 state.get_claim(self.blockstore(), id_address, claim_id)
1742 }
1743
1744 pub fn get_all_claims(&self, ts: &Tipset) -> anyhow::Result<HashMap<ClaimID, Claim>> {
1745 let state = self.get_verified_registry_actor_state(ts)?;
1746 state.get_all_claims(self.blockstore())
1747 }
1748
1749 pub fn get_allocation(
1750 &self,
1751 addr: &Address,
1752 ts: &Tipset,
1753 allocation_id: AllocationID,
1754 ) -> anyhow::Result<Option<Allocation>> {
1755 let id_address = self.lookup_required_id(addr, ts)?;
1756 let state = self.get_verified_registry_actor_state(ts)?;
1757 state.get_allocation(self.blockstore(), id_address.id()?, allocation_id)
1758 }
1759
1760 pub fn get_all_allocations(
1761 &self,
1762 ts: &Tipset,
1763 ) -> anyhow::Result<HashMap<AllocationID, Allocation>> {
1764 let state = self.get_verified_registry_actor_state(ts)?;
1765 state.get_all_allocations(self.blockstore())
1766 }
1767
1768 pub fn verified_client_status(
1769 &self,
1770 addr: &Address,
1771 ts: &Tipset,
1772 ) -> anyhow::Result<Option<DataCap>> {
1773 let id = self.lookup_required_id(addr, ts)?;
1774 let network_version = self.get_network_version(ts.epoch());
1775
1776 if (u32::from(network_version.0)) < 17 {
1780 let state = self.get_verified_registry_actor_state(ts)?;
1781 return state.verified_client_data_cap(self.blockstore(), id);
1782 }
1783
1784 let act = self
1785 .get_actor(&Address::DATACAP_TOKEN_ACTOR, *ts.parent_state())
1786 .map_err(Error::state)?
1787 .ok_or_else(|| Error::state("Miner actor not found"))?;
1788
1789 let state = datacap::State::load(self.blockstore(), act.code, act.state)?;
1790
1791 state.verified_client_data_cap(self.blockstore(), id)
1792 }
1793
1794 pub async fn resolve_to_deterministic_address(
1797 self: &Arc<Self>,
1798 address: Address,
1799 ts: &Tipset,
1800 ) -> anyhow::Result<Address> {
1801 use crate::shim::address::Protocol::*;
1802 match address.protocol() {
1803 BLS | Secp256k1 | Delegated => Ok(address),
1804 Actor => anyhow::bail!("cannot resolve actor address to key address"),
1805 ID => {
1806 let id = address.id()?;
1807 if let Some(cached) = self.id_to_deterministic_address_cache.get_cloned(&id) {
1808 return Ok(cached);
1809 }
1810 let resolved = if let Ok(state) =
1812 StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1813 && let Ok(address) = state
1814 .resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
1815 {
1816 address
1817 } else {
1818 let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?;
1820 let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
1821 state.resolve_to_deterministic_addr(self.chain_store().blockstore(), address)?
1822 };
1823 self.id_to_deterministic_address_cache.push(id, resolved);
1824 Ok(resolved)
1825 }
1826 }
1827 }
1828
1829 pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec<ApiInvocResult>)> {
1830 let mut invoc_trace = vec![];
1831
1832 let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1833
1834 let callback = |ctx: MessageCallbackCtx<'_>| {
1835 match ctx.at {
1836 CalledAt::Applied | CalledAt::Reward => {
1837 invoc_trace.push(ApiInvocResult {
1838 msg_cid: ctx.message.cid(),
1839 msg: ctx.message.message().clone(),
1840 msg_rct: Some(ctx.apply_ret.msg_receipt()),
1841 error: ctx.apply_ret.failure_info().unwrap_or_default(),
1842 duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
1843 gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
1844 execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
1845 .unwrap_or_default(),
1846 });
1847 Ok(())
1848 }
1849 _ => Ok(()), }
1851 };
1852
1853 let ExecutedTipset { state_root, .. } = apply_block_messages(
1854 genesis_timestamp,
1855 self.chain_index().shallow_clone(),
1856 self.chain_config().shallow_clone(),
1857 self.beacon_schedule().shallow_clone(),
1858 &self.engine,
1859 tipset.shallow_clone(),
1860 Some(callback),
1861 VMTrace::Traced,
1862 )?;
1863
1864 Ok((state_root, invoc_trace))
1865 }
1866}
1867
1868pub fn validate_tipsets<DB, T>(
1869 genesis_timestamp: u64,
1870 chain_index: &ChainIndex<DB>,
1871 chain_config: &Arc<ChainConfig>,
1872 beacon: &Arc<BeaconSchedule>,
1873 engine: &MultiEngine,
1874 tipsets: T,
1875) -> anyhow::Result<()>
1876where
1877 DB: Blockstore + Send + Sync + 'static,
1878 T: Iterator<Item = Tipset> + Send,
1879{
1880 for (child, parent) in tipsets.tuple_windows() {
1885 info!(height = parent.epoch(), "compute parent state");
1886 let ExecutedTipset {
1887 state_root: actual_state,
1888 receipt_root: actual_receipt,
1889 ..
1890 } = apply_block_messages(
1891 genesis_timestamp,
1892 chain_index.shallow_clone(),
1893 chain_config.shallow_clone(),
1894 beacon.shallow_clone(),
1895 engine,
1896 parent,
1897 NO_CALLBACK,
1898 VMTrace::NotTraced,
1899 )
1900 .context("couldn't compute tipset state")?;
1901 let expected_receipt = child.min_ticket_block().message_receipts;
1902 let expected_state = child.parent_state();
1903 if (expected_state, expected_receipt) != (&actual_state, actual_receipt) {
1904 error!(
1905 height = child.epoch(),
1906 ?expected_state,
1907 ?expected_receipt,
1908 ?actual_state,
1909 ?actual_receipt,
1910 "state mismatch"
1911 );
1912 bail!("state mismatch");
1913 }
1914 }
1915 Ok(())
1916}
1917
1918struct TipsetExecutor<'a, DB: Blockstore + Send + Sync + 'static> {
1923 tipset: Tipset,
1924 rand: ChainRand<DB>,
1925 chain_config: Arc<ChainConfig>,
1926 chain_index: ChainIndex<DB>,
1927 genesis_info: GenesisInfo,
1928 engine: &'a MultiEngine,
1929}
1930
1931impl<'a, DB: Blockstore + Send + Sync + 'static> TipsetExecutor<'a, DB> {
1932 fn new(
1933 chain_index: ChainIndex<DB>,
1934 chain_config: Arc<ChainConfig>,
1935 beacon: Arc<BeaconSchedule>,
1936 engine: &'a MultiEngine,
1937 tipset: Tipset,
1938 ) -> Self {
1939 let rand = ChainRand::new(
1940 chain_config.shallow_clone(),
1941 tipset.shallow_clone(),
1942 chain_index.shallow_clone(),
1943 beacon,
1944 );
1945 let genesis_info = GenesisInfo::from_chain_config(chain_config.shallow_clone());
1946 Self {
1947 tipset,
1948 rand,
1949 chain_config,
1950 chain_index,
1951 genesis_info,
1952 engine,
1953 }
1954 }
1955
1956 fn create_vm(
1957 &self,
1958 state_root: Cid,
1959 epoch: ChainEpoch,
1960 timestamp: u64,
1961 trace: VMTrace,
1962 ) -> anyhow::Result<VM<DB>> {
1963 let circ_supply = self.genesis_info.get_vm_circulating_supply(
1964 epoch,
1965 self.chain_index.db(),
1966 &state_root,
1967 )?;
1968 VM::new(
1969 ExecutionContext {
1970 heaviest_tipset: self.tipset.shallow_clone(),
1971 state_tree_root: state_root,
1972 epoch,
1973 rand: Box::new(self.rand.shallow_clone()),
1974 base_fee: self.tipset.min_ticket_block().parent_base_fee.clone(),
1975 circ_supply,
1976 chain_config: self.chain_config.shallow_clone(),
1977 chain_index: self.chain_index.shallow_clone(),
1978 timestamp,
1979 },
1980 self.engine,
1981 trace,
1982 )
1983 }
1984
1985 fn prepare_parent_state<F>(
1988 &self,
1989 genesis_timestamp: u64,
1990 null_epoch_trace: VMTrace,
1991 cron_callback: &mut Option<F>,
1992 ) -> anyhow::Result<(Cid, ChainEpoch, Vec<BlockMessages>)>
1993 where
1994 F: FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>,
1995 {
1996 use crate::shim::clock::EPOCH_DURATION_SECONDS;
1997
1998 let mut parent_state = *self.tipset.parent_state();
1999 let parent_epoch = self
2000 .chain_index
2001 .load_required_tipset(self.tipset.parents())?
2002 .epoch();
2003 let epoch = self.tipset.epoch();
2004
2005 for epoch_i in parent_epoch..epoch {
2006 if epoch_i > parent_epoch {
2007 let timestamp = genesis_timestamp + ((EPOCH_DURATION_SECONDS * epoch_i) as u64);
2008 parent_state = stacker::grow(64 << 20, || -> anyhow::Result<Cid> {
2009 let mut vm =
2010 self.create_vm(parent_state, epoch_i, timestamp, null_epoch_trace)?;
2011 if let Err(e) = vm.run_cron(epoch_i, cron_callback.as_mut()) {
2012 error!("Beginning of epoch cron failed to run: {e:#}");
2013 return Err(e);
2014 }
2015 vm.flush()
2016 })?;
2017 }
2018 if let Some(new_state) = run_state_migrations(
2019 epoch_i,
2020 &self.chain_config,
2021 self.chain_index.db(),
2022 &parent_state,
2023 )? {
2024 parent_state = new_state;
2025 }
2026 }
2027
2028 let block_messages = BlockMessages::for_tipset(self.chain_index.db(), &self.tipset)?;
2029 Ok((parent_state, epoch, block_messages))
2030 }
2031}
2032
2033#[allow(clippy::too_many_arguments)]
2110pub fn apply_block_messages<DB>(
2111 genesis_timestamp: u64,
2112 chain_index: ChainIndex<DB>,
2113 chain_config: Arc<ChainConfig>,
2114 beacon: Arc<BeaconSchedule>,
2115 engine: &MultiEngine,
2116 tipset: Tipset,
2117 mut callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
2118 enable_tracing: VMTrace,
2119) -> anyhow::Result<ExecutedTipset>
2120where
2121 DB: Blockstore + Send + Sync + 'static,
2122{
2123 if tipset.epoch() == 0 {
2132 let message_receipts = tipset.min_ticket_block().message_receipts;
2137 return Ok(ExecutedTipset {
2138 state_root: *tipset.parent_state(),
2139 receipt_root: message_receipts,
2140 executed_messages: vec![].into(),
2141 });
2142 }
2143
2144 let exec = TipsetExecutor::new(
2145 chain_index.shallow_clone(),
2146 chain_config,
2147 beacon,
2148 engine,
2149 tipset.shallow_clone(),
2150 );
2151
2152 let (parent_state, epoch, block_messages) =
2155 exec.prepare_parent_state(genesis_timestamp, enable_tracing, &mut callback)?;
2156
2157 stacker::grow(64 << 20, || -> anyhow::Result<ExecutedTipset> {
2160 let mut vm = exec.create_vm(parent_state, epoch, tipset.min_timestamp(), enable_tracing)?;
2161
2162 let (receipts, events, events_roots) =
2164 vm.apply_block_messages(&block_messages, epoch, callback)?;
2165
2166 let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts.iter())?;
2168
2169 for (events, events_root) in events.iter().zip(events_roots.iter()) {
2171 if let Some(events) = events {
2172 let event_root =
2173 events_root.context("events root should be present when events present")?;
2174 let derived_event_root = Amt::new_from_iter_with_bit_width(
2176 chain_index.db(),
2177 EVENTS_AMT_BITWIDTH,
2178 events.iter(),
2179 )
2180 .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;
2181
2182 ensure!(
2184 derived_event_root == event_root,
2185 "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
2186 );
2187 }
2188 }
2189
2190 let state_root = vm.flush()?;
2191
2192 let messages: Vec<ChainMessage> = block_messages
2194 .into_iter()
2195 .flat_map(|bm| bm.messages)
2196 .collect_vec();
2197 anyhow::ensure!(
2198 messages.len() == receipts.len() && messages.len() == events.len(),
2199 "length of messages, receipts, and events should match",
2200 );
2201 Ok(ExecutedTipset {
2202 state_root,
2203 receipt_root,
2204 executed_messages: messages
2205 .into_iter()
2206 .zip(receipts)
2207 .zip(events)
2208 .map(|((message, receipt), events)| ExecutedMessage {
2209 message,
2210 receipt,
2211 events,
2212 })
2213 .collect_vec()
2214 .into(),
2215 })
2216 })
2217}
2218
2219#[allow(clippy::too_many_arguments)]
2220pub fn compute_state<DB>(
2221 _height: ChainEpoch,
2222 messages: Vec<Message>,
2223 tipset: Tipset,
2224 genesis_timestamp: u64,
2225 chain_index: ChainIndex<DB>,
2226 chain_config: Arc<ChainConfig>,
2227 beacon: Arc<BeaconSchedule>,
2228 engine: &MultiEngine,
2229 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
2230 enable_tracing: VMTrace,
2231) -> anyhow::Result<ExecutedTipset>
2232where
2233 DB: Blockstore + Send + Sync + 'static,
2234{
2235 if !messages.is_empty() {
2236 anyhow::bail!("Applying messages is not yet implemented.");
2237 }
2238
2239 let output = apply_block_messages(
2240 genesis_timestamp,
2241 chain_index,
2242 chain_config,
2243 beacon,
2244 engine,
2245 tipset,
2246 callback,
2247 enable_tracing,
2248 )?;
2249
2250 Ok(output)
2251}
2252
2253#[derive(Debug, Copy, Clone, Default)]
2255pub enum VMFlush {
2256 Flush,
2257 #[default]
2258 Skip,
2259}