Skip to main content

forest/state_manager/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4#[cfg(test)]
5mod tests;
6
7mod cache;
8pub mod chain_rand;
9pub mod circulating_supply;
10mod errors;
11pub mod utils;
12
13pub use self::errors::*;
14use self::utils::structured;
15
16use crate::beacon::{BeaconEntry, BeaconSchedule};
17use crate::blocks::{Tipset, TipsetKey};
18use crate::chain::{
19    ChainStore, HeadChange,
20    index::{ChainIndex, ResolveNullTipset},
21};
22use crate::interpreter::{
23    BlockMessages, CalledAt, ExecutionContext, IMPLICIT_MESSAGE_GAS_LIMIT, VM, resolve_to_key_addr,
24};
25use crate::interpreter::{MessageCallbackCtx, VMTrace};
26use crate::lotus_json::{LotusJson, lotus_json_with_self};
27use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage};
28use crate::networks::ChainConfig;
29use crate::rpc::state::{ApiInvocResult, InvocResult, MessageGasCost};
30use crate::rpc::types::{MiningBaseInfo, SectorOnChainInfo};
31use crate::shim::actors::init::{self, State};
32use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition};
33use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim};
34use crate::shim::actors::*;
35use crate::shim::crypto::{Signature, SignatureType};
36use crate::shim::{
37    actors::{
38        LoadActorStateFromBlockstore, miner::ext::MinerStateExt as _,
39        verifreg::ext::VerifiedRegistryStateExt as _,
40    },
41    executor::{ApplyRet, Receipt, StampedEvent},
42};
43use crate::shim::{
44    address::{Address, Payload, Protocol},
45    clock::ChainEpoch,
46    econ::TokenAmount,
47    machine::{GLOBAL_MULTI_ENGINE, MultiEngine},
48    message::Message,
49    randomness::Randomness,
50    runtime::Policy,
51    state_tree::{ActorState, StateTree},
52    version::NetworkVersion,
53};
54use crate::state_manager::cache::{
55    DisabledTipsetDataCache, EnabledTipsetDataCache, TipsetReceiptEventCacheHandler,
56    TipsetStateCache,
57};
58use crate::state_manager::chain_rand::draw_randomness;
59use crate::state_migration::run_state_migrations;
60use crate::utils::get_size::{
61    GetSize, vec_heap_size_helper, vec_with_stack_only_item_heap_size_helper,
62};
63use ahash::{HashMap, HashMapExt};
64use anyhow::{Context as _, bail, ensure};
65use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _};
66use chain_rand::ChainRand;
67use cid::Cid;
68pub use circulating_supply::GenesisInfo;
69use fil_actor_verifreg_state::v12::DataCap;
70use fil_actor_verifreg_state::v13::ClaimID;
71use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0};
72use fil_actors_shared::fvm_ipld_bitfield::BitField;
73use fil_actors_shared::v12::runtime::DomainSeparationTag;
74use futures::{FutureExt, channel::oneshot, select};
75use fvm_ipld_blockstore::Blockstore;
76use fvm_ipld_encoding::to_vec;
77use fvm_shared4::crypto::signature::SECP_SIG_LEN;
78use itertools::Itertools as _;
79use nonzero_ext::nonzero;
80use num::BigInt;
81use num_traits::identities::Zero;
82use rayon::prelude::ParallelBridge;
83use schemars::JsonSchema;
84use serde::{Deserialize, Serialize};
85use std::ops::RangeInclusive;
86use std::time::Duration;
87use std::{num::NonZeroUsize, sync::Arc};
88use tokio::sync::{RwLock, broadcast::error::RecvError};
89use tracing::{error, info, instrument, trace, warn};
90
91const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
92pub const EVENTS_AMT_BITWIDTH: u32 = 5;
93
94/// Intermediary for retrieving state objects and updating actor states.
95type CidPair = (Cid, Cid);
96
97#[derive(Debug, Clone, GetSize)] // Added Debug
98pub struct StateEvents {
99    #[get_size(size_fn = vec_heap_size_helper)]
100    pub events: Vec<Vec<StampedEvent>>,
101    #[get_size(size_fn = vec_with_stack_only_item_heap_size_helper)]
102    pub roots: Vec<Option<Cid>>,
103}
104
105#[derive(Clone)]
106pub struct StateOutput {
107    pub state_root: Cid,
108    pub receipt_root: Cid,
109    pub events: Vec<Vec<StampedEvent>>,
110    pub events_roots: Vec<Option<Cid>>,
111}
112
113#[derive(Debug, Default, Clone, GetSize)]
114pub struct StateOutputValue {
115    #[get_size(ignore)]
116    pub state_root: Cid,
117    #[get_size(ignore)]
118    pub receipt_root: Cid,
119}
120
121impl From<StateOutputValue> for StateOutput {
122    fn from(value: StateOutputValue) -> Self {
123        Self {
124            state_root: value.state_root,
125            receipt_root: value.receipt_root,
126            events: vec![],
127            events_roots: vec![],
128        }
129    }
130}
131
132impl From<StateOutput> for StateOutputValue {
133    fn from(value: StateOutput) -> Self {
134        StateOutputValue {
135            state_root: value.state_root,
136            receipt_root: value.receipt_root,
137        }
138    }
139}
140
141/// External format for returning market balance from state.
142#[derive(
143    Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema,
144)]
145#[serde(rename_all = "PascalCase")]
146pub struct MarketBalance {
147    #[schemars(with = "LotusJson<TokenAmount>")]
148    #[serde(with = "crate::lotus_json")]
149    pub escrow: TokenAmount,
150    #[schemars(with = "LotusJson<TokenAmount>")]
151    #[serde(with = "crate::lotus_json")]
152    pub locked: TokenAmount,
153}
154lotus_json_with_self!(MarketBalance);
155
156/// State manager handles all interactions with the internal Filecoin actors
157/// state. This encapsulates the [`ChainStore`] functionality, which only
158/// handles chain data, to allow for interactions with the underlying state of
159/// the chain. The state manager not only allows interfacing with state, but
160/// also is used when performing state transitions.
161pub struct StateManager<DB> {
162    /// Chain store
163    cs: Arc<ChainStore<DB>>,
164    /// This is a cache which indexes tipsets to their calculated state output (state root, receipt root).
165    cache: TipsetStateCache<StateOutputValue>,
166    beacon: Arc<crate::beacon::BeaconSchedule>,
167    engine: Arc<MultiEngine>,
168    /// Handler for caching/retrieving tipset events and receipts.
169    receipt_event_cache_handler: Box<dyn TipsetReceiptEventCacheHandler>,
170}
171
172#[allow(clippy::type_complexity)]
173pub const NO_CALLBACK: Option<fn(MessageCallbackCtx<'_>) -> anyhow::Result<()>> = None;
174
175impl<DB> StateManager<DB>
176where
177    DB: Blockstore,
178{
179    pub fn new(cs: Arc<ChainStore<DB>>) -> Result<Self, anyhow::Error> {
180        Self::new_with_engine(cs, GLOBAL_MULTI_ENGINE.clone())
181    }
182
183    pub fn new_with_engine(
184        cs: Arc<ChainStore<DB>>,
185        engine: Arc<MultiEngine>,
186    ) -> Result<Self, anyhow::Error> {
187        let genesis = cs.genesis_block_header();
188        let beacon = Arc::new(cs.chain_config().get_beacon_schedule(genesis.timestamp));
189
190        let cache_handler: Box<dyn TipsetReceiptEventCacheHandler> =
191            if cs.chain_config().enable_receipt_event_caching {
192                Box::new(EnabledTipsetDataCache::new())
193            } else {
194                Box::new(DisabledTipsetDataCache::new())
195            };
196
197        Ok(Self {
198            cs,
199            cache: TipsetStateCache::new("state_output"), // For StateOutputValue
200            beacon,
201            engine,
202            receipt_event_cache_handler: cache_handler,
203        })
204    }
205
206    /// Returns the currently tracked heaviest tipset.
207    pub fn heaviest_tipset(&self) -> Tipset {
208        self.chain_store().heaviest_tipset()
209    }
210
211    /// Returns the currently tracked heaviest tipset and rewind to a most recent valid one if necessary.
212    /// A valid head has
213    ///     - state tree in the blockstore
214    ///     - actor bundle version in the state tree that matches chain configuration
215    pub fn maybe_rewind_heaviest_tipset(&self) -> anyhow::Result<()> {
216        while self.maybe_rewind_heaviest_tipset_once()? {}
217        Ok(())
218    }
219
220    fn maybe_rewind_heaviest_tipset_once(&self) -> anyhow::Result<bool> {
221        let head = self.heaviest_tipset();
222        if let Some(info) = self
223            .chain_config()
224            .network_height_with_actor_bundle(head.epoch())
225        {
226            let expected_height_info = info.info;
227            let expected_bundle = info.manifest(self.blockstore())?;
228            let expected_bundle_metadata = expected_bundle.metadata()?;
229            let state = self.get_state_tree(head.parent_state())?;
230            let bundle_metadata = state.get_actor_bundle_metadata()?;
231            if expected_bundle_metadata != bundle_metadata {
232                let current_epoch = head.epoch();
233                let target_head = self.chain_index().tipset_by_height(
234                    (expected_height_info.epoch - 1).max(0),
235                    head,
236                    ResolveNullTipset::TakeOlder,
237                )?;
238                let target_epoch = target_head.epoch();
239                let bundle_version = &bundle_metadata.version;
240                let expected_bundle_version = &expected_bundle_metadata.version;
241                if target_epoch < current_epoch {
242                    tracing::warn!(
243                        "rewinding chain head from {current_epoch} to {target_epoch}, actor bundle: {bundle_version}, expected: {expected_bundle_version}"
244                    );
245                    if self.blockstore().has(target_head.parent_state())? {
246                        self.chain_store().set_heaviest_tipset(target_head)?;
247                        return Ok(true);
248                    } else {
249                        anyhow::bail!(
250                            "failed to rewind, state tree @ {target_epoch} is missing from blockstore: {}",
251                            target_head.parent_state()
252                        );
253                    }
254                }
255            }
256        }
257        Ok(false)
258    }
259
260    // Given the assumption that the heaviest tipset must always be validated,
261    // we can populate our state cache by walking backwards through the
262    // block-chain. A warm cache cuts 10-20 seconds from the first state
263    // validation, and it prevents duplicate migrations.
264    pub fn populate_cache(&self) {
265        for (child, parent) in self
266            .chain_index()
267            .chain(self.heaviest_tipset())
268            .tuple_windows()
269            .take(DEFAULT_TIPSET_CACHE_SIZE.into())
270        {
271            let key = parent.key();
272            let state_root = child.min_ticket_block().state_root;
273            let receipt_root = child.min_ticket_block().message_receipts;
274            self.cache.insert(
275                key.clone(),
276                StateOutputValue {
277                    state_root,
278                    receipt_root,
279                },
280            );
281            if let Ok(receipts) = Receipt::get_receipts(self.blockstore(), receipt_root)
282                && !receipts.is_empty()
283            {
284                self.receipt_event_cache_handler
285                    .insert_receipt(key, receipts);
286            }
287        }
288    }
289
290    pub fn beacon_schedule(&self) -> &Arc<BeaconSchedule> {
291        &self.beacon
292    }
293
294    /// Returns network version for the given epoch.
295    pub fn get_network_version(&self, epoch: ChainEpoch) -> NetworkVersion {
296        self.chain_config().network_version(epoch)
297    }
298
299    /// Gets the state tree
300    pub fn get_state_tree(&self, state_cid: &Cid) -> anyhow::Result<StateTree<DB>> {
301        StateTree::new_from_root(self.blockstore_owned(), state_cid)
302    }
303
304    /// Gets actor from given [`Cid`], if it exists.
305    pub fn get_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<Option<ActorState>> {
306        let state = self.get_state_tree(&state_cid)?;
307        state.get_actor(addr)
308    }
309
310    /// Gets actor state from implicit actor address
311    pub fn get_actor_state<S: LoadActorStateFromBlockstore>(
312        &self,
313        ts: &Tipset,
314    ) -> anyhow::Result<S> {
315        let state_tree = self.get_state_tree(ts.parent_state())?;
316        state_tree.get_actor_state()
317    }
318
319    /// Gets actor state from explicit actor address
320    pub fn get_actor_state_from_address<S: LoadActorStateFromBlockstore>(
321        &self,
322        ts: &Tipset,
323        actor_address: &Address,
324    ) -> anyhow::Result<S> {
325        let state_tree = self.get_state_tree(ts.parent_state())?;
326        state_tree.get_actor_state_from_address(actor_address)
327    }
328
329    /// Gets required actor from given [`Cid`].
330    pub fn get_required_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<ActorState> {
331        let state = self.get_state_tree(&state_cid)?;
332        state.get_actor(addr)?.with_context(|| {
333            format!("Failed to load actor with addr={addr}, state_cid={state_cid}")
334        })
335    }
336
337    /// Returns a reference to the state manager's [`Blockstore`].
338    pub fn blockstore(&self) -> &Arc<DB> {
339        self.cs.blockstore()
340    }
341
342    pub fn blockstore_owned(&self) -> Arc<DB> {
343        self.blockstore().clone()
344    }
345
346    /// Returns reference to the state manager's [`ChainStore`].
347    pub fn chain_store(&self) -> &Arc<ChainStore<DB>> {
348        &self.cs
349    }
350
351    /// Returns reference to the state manager's [`ChainIndex`].
352    pub fn chain_index(&self) -> &Arc<ChainIndex<Arc<DB>>> {
353        self.cs.chain_index()
354    }
355
356    /// Returns reference to the state manager's [`ChainConfig`].
357    pub fn chain_config(&self) -> &Arc<ChainConfig> {
358        self.cs.chain_config()
359    }
360
361    pub fn chain_rand(&self, tipset: Tipset) -> ChainRand<DB> {
362        ChainRand::new(
363            self.chain_config().clone(),
364            tipset,
365            self.chain_index().clone(),
366            self.beacon.clone(),
367        )
368    }
369
370    /// Returns the internal, protocol-level network chain from the state.
371    pub fn get_network_state_name(
372        &self,
373        state_cid: Cid,
374    ) -> anyhow::Result<crate::networks::StateNetworkName> {
375        let init_act = self
376            .get_actor(&init::ADDRESS.into(), state_cid)?
377            .ok_or_else(|| Error::state("Init actor address could not be resolved"))?;
378        Ok(
379            State::load(self.blockstore(), init_act.code, init_act.state)?
380                .into_network_name()
381                .into(),
382        )
383    }
384
385    /// Returns true if miner has been slashed or is considered invalid.
386    pub fn is_miner_slashed(&self, addr: &Address, state_cid: &Cid) -> anyhow::Result<bool, Error> {
387        let actor = self
388            .get_actor(&Address::POWER_ACTOR, *state_cid)?
389            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
390
391        let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
392
393        Ok(spas.miner_power(self.blockstore(), addr)?.is_none())
394    }
395
396    /// Returns raw work address of a miner given the state root.
397    pub fn get_miner_work_addr(&self, state_cid: Cid, addr: &Address) -> Result<Address, Error> {
398        let state =
399            StateTree::new_from_root(self.blockstore_owned(), &state_cid).map_err(Error::other)?;
400        let ms: miner::State = state.get_actor_state_from_address(addr)?;
401        let info = ms.info(self.blockstore()).map_err(|e| e.to_string())?;
402        let addr = resolve_to_key_addr(&state, self.blockstore(), &info.worker())?;
403        Ok(addr)
404    }
405
406    /// Returns specified actor's claimed power and total network power as a
407    /// tuple.
408    pub fn get_power(
409        &self,
410        state_cid: &Cid,
411        addr: Option<&Address>,
412    ) -> anyhow::Result<Option<(power::Claim, power::Claim)>, Error> {
413        let actor = self
414            .get_actor(&Address::POWER_ACTOR, *state_cid)?
415            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
416
417        let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
418
419        let t_pow = spas.total_power();
420
421        if let Some(maddr) = addr {
422            let m_pow = spas
423                .miner_power(self.blockstore(), maddr)?
424                .ok_or_else(|| Error::state(format!("Miner for address {maddr} not found")))?;
425
426            let min_pow = spas.miner_nominal_power_meets_consensus_minimum(
427                &self.chain_config().policy,
428                self.blockstore(),
429                maddr,
430            )?;
431            if min_pow {
432                return Ok(Some((m_pow, t_pow)));
433            }
434        }
435
436        Ok(None)
437    }
438
439    // Returns all sectors
440    pub fn get_all_sectors(
441        &self,
442        addr: &Address,
443        ts: &Tipset,
444    ) -> anyhow::Result<Vec<SectorOnChainInfo>> {
445        let actor = self
446            .get_actor(addr, *ts.parent_state())?
447            .ok_or_else(|| Error::state("Miner actor not found"))?;
448        let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
449        state.load_sectors_ext(self.blockstore(), None)
450    }
451}
452
453impl<DB> StateManager<DB>
454where
455    DB: Blockstore + Send + Sync + 'static,
456{
457    /// Returns the pair of (state root, message receipt root). This will
458    /// either be cached or will be calculated and fill the cache. Tipset
459    /// state for a given tipset is guaranteed not to be computed twice.
460    pub async fn tipset_state(
461        self: &Arc<Self>,
462        tipset: &Tipset,
463        state_lookup: StateLookupPolicy,
464    ) -> anyhow::Result<CidPair> {
465        let StateOutput {
466            state_root,
467            receipt_root,
468            ..
469        } = self.tipset_state_output(tipset, state_lookup).await?;
470        Ok((state_root, receipt_root))
471    }
472
473    pub async fn tipset_state_output(
474        self: &Arc<Self>,
475        tipset: &Tipset,
476        state_lookup: StateLookupPolicy,
477    ) -> anyhow::Result<StateOutput> {
478        let key = tipset.key();
479        self.cache
480            .get_or_else(key, || async move {
481                info!(
482                    "Evaluating tipset: EPOCH={}, blocks={}, tsk={}",
483                    tipset.epoch(),
484                    tipset.len(),
485                    tipset.key(),
486                );
487
488                // First, try to look up the state and receipt if not found in the blockstore
489                // compute it
490                if matches!(state_lookup, StateLookupPolicy::Enabled)
491                    && let Some(state_from_child) = self.try_lookup_state_from_next_tipset(tipset)
492                {
493                    return Ok(state_from_child);
494                }
495
496                trace!("Computing state for tipset at epoch {}", tipset.epoch());
497                let state_output = self
498                    .compute_tipset_state(tipset.clone(), NO_CALLBACK, VMTrace::NotTraced)
499                    .await?;
500
501                self.update_cache_with_state_output(key, &state_output);
502
503                let ts_state = state_output.into();
504
505                Ok(ts_state)
506            })
507            .await
508            .map(StateOutput::from)
509    }
510
511    /// update the receipt and events caches
512    fn update_cache_with_state_output(&self, key: &TipsetKey, state_output: &StateOutput) {
513        if !state_output.events.is_empty() || !state_output.events_roots.is_empty() {
514            let events_data = StateEvents {
515                events: state_output.events.clone(),
516                roots: state_output.events_roots.clone(),
517            };
518            self.receipt_event_cache_handler
519                .insert_events(key, events_data);
520        }
521
522        if let Ok(receipts) = Receipt::get_receipts(self.blockstore(), state_output.receipt_root)
523            && !receipts.is_empty()
524        {
525            self.receipt_event_cache_handler
526                .insert_receipt(key, receipts);
527        }
528    }
529
530    #[instrument(skip(self))]
531    pub async fn tipset_message_receipts(
532        self: &Arc<Self>,
533        tipset: &Tipset,
534    ) -> anyhow::Result<Vec<Receipt>> {
535        let key = tipset.key();
536        let ts = tipset.clone();
537        let this = Arc::clone(self);
538        self.receipt_event_cache_handler
539            .get_receipt_or_else(
540                key,
541                Box::new(move || {
542                    Box::pin(async move {
543                        let StateOutput { receipt_root, .. } = this
544                            .compute_tipset_state(ts, NO_CALLBACK, VMTrace::NotTraced)
545                            .await?;
546                        trace!("Completed tipset state calculation");
547                        Receipt::get_receipts(this.blockstore(), receipt_root)
548                    })
549                }),
550            )
551            .await
552    }
553
554    #[instrument(skip(self))]
555    pub async fn tipset_state_events(
556        self: &Arc<Self>,
557        tipset: &Tipset,
558    ) -> anyhow::Result<StateEvents> {
559        let key = tipset.key();
560        let ts = tipset.clone();
561        let this = Arc::clone(self);
562        let cids = tipset.cids();
563        self.receipt_event_cache_handler
564            .get_events_or_else(
565                key,
566                Box::new(move || {
567                    Box::pin(async move {
568                        // Fallback: compute the tipset state if events not found in the blockstore
569                        let state_out = this
570                            .compute_tipset_state(ts, NO_CALLBACK, VMTrace::NotTraced)
571                            .await?;
572                        trace!("Completed tipset state calculation {:?}", cids);
573                        Ok(StateEvents {
574                            events: state_out.events,
575                            roots: state_out.events_roots,
576                        })
577                    })
578                }),
579            )
580            .await
581    }
582
583    #[instrument(skip(self, rand))]
584    fn call_raw(
585        &self,
586        state_cid: Option<Cid>,
587        msg: &Message,
588        rand: ChainRand<DB>,
589        tipset: &Tipset,
590    ) -> Result<ApiInvocResult, Error> {
591        let mut msg = msg.clone();
592
593        let state_cid = state_cid.unwrap_or(*tipset.parent_state());
594
595        let tipset_messages = self
596            .chain_store()
597            .messages_for_tipset(tipset)
598            .map_err(|err| Error::Other(err.to_string()))?;
599
600        let prior_messsages = tipset_messages
601            .iter()
602            .filter(|ts_msg| ts_msg.message().from() == msg.from());
603
604        // Handle state forks
605
606        let height = tipset.epoch();
607        let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
608        let mut vm = VM::new(
609            ExecutionContext {
610                heaviest_tipset: tipset.clone(),
611                state_tree_root: state_cid,
612                epoch: height,
613                rand: Box::new(rand),
614                base_fee: tipset.block_headers().first().parent_base_fee.clone(),
615                circ_supply: genesis_info.get_vm_circulating_supply(
616                    height,
617                    self.blockstore(),
618                    &state_cid,
619                )?,
620                chain_config: self.chain_config().clone(),
621                chain_index: self.chain_index().clone(),
622                timestamp: tipset.min_timestamp(),
623            },
624            &self.engine,
625            VMTrace::Traced,
626        )?;
627
628        for m in prior_messsages {
629            vm.apply_message(m)?;
630        }
631
632        // We flush to get the VM's view of the state tree after applying the above messages
633        // This is needed to get the correct nonce from the actor state to match the VM
634        let state_cid = vm.flush()?;
635
636        let state = StateTree::new_from_root(self.blockstore_owned(), &state_cid)?;
637
638        let from_actor = state
639            .get_actor(&msg.from())?
640            .ok_or_else(|| anyhow::anyhow!("actor not found"))?;
641        msg.set_sequence(from_actor.sequence);
642
643        // Implicit messages need to set a special gas limit
644        let mut msg = msg.clone();
645        msg.gas_limit = IMPLICIT_MESSAGE_GAS_LIMIT as u64;
646
647        let (apply_ret, duration) = vm.apply_implicit_message(&msg)?;
648
649        Ok(ApiInvocResult {
650            msg: msg.clone(),
651            msg_rct: Some(apply_ret.msg_receipt()),
652            msg_cid: msg.cid(),
653            error: apply_ret.failure_info().unwrap_or_default(),
654            duration: duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
655            gas_cost: MessageGasCost::default(),
656            execution_trace: structured::parse_events(apply_ret.exec_trace()).unwrap_or_default(),
657        })
658    }
659
660    /// runs the given message and returns its result without any persisted
661    /// changes.
662    pub fn call(&self, message: &Message, tipset: Option<Tipset>) -> Result<ApiInvocResult, Error> {
663        let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
664        let chain_rand = self.chain_rand(ts.clone());
665        self.call_raw(None, message, chain_rand, &ts)
666    }
667
668    /// Same as [`StateManager::call`] but runs the message on the given state and not
669    /// on the parent state of the tipset.
670    pub fn call_on_state(
671        &self,
672        state_cid: Cid,
673        message: &Message,
674        tipset: Option<Tipset>,
675    ) -> Result<ApiInvocResult, Error> {
676        let ts = tipset.unwrap_or_else(|| self.cs.heaviest_tipset());
677        let chain_rand = self.chain_rand(ts.clone());
678        self.call_raw(Some(state_cid), message, chain_rand, &ts)
679    }
680
681    pub async fn apply_on_state_with_gas(
682        self: &Arc<Self>,
683        tipset: Option<Tipset>,
684        msg: Message,
685        state_lookup: StateLookupPolicy,
686        vm_flush: VMFlush,
687    ) -> anyhow::Result<(ApiInvocResult, Option<Cid>)> {
688        let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
689
690        let from_a = self.resolve_to_key_addr(&msg.from, &ts).await?;
691
692        // Pretend that the message is signed. This has an influence on the gas
693        // cost. We obviously can't generate a valid signature. Instead, we just
694        // fill the signature with zeros. The validity is not checked.
695        let mut chain_msg = match from_a.protocol() {
696            Protocol::Secp256k1 => ChainMessage::Signed(SignedMessage::new_unchecked(
697                msg.clone(),
698                Signature::new_secp256k1(vec![0; SECP_SIG_LEN]),
699            )),
700            Protocol::Delegated => ChainMessage::Signed(SignedMessage::new_unchecked(
701                msg.clone(),
702                // In Lotus, delegated signatures have the same length as SECP256k1.
703                // This may or may not change in the future.
704                Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]),
705            )),
706            _ => ChainMessage::Unsigned(msg.clone()),
707        };
708
709        let (_invoc_res, apply_ret, duration, state_root) = self
710            .call_with_gas(
711                &mut chain_msg,
712                &[],
713                Some(ts),
714                VMTrace::Traced,
715                state_lookup,
716                vm_flush,
717            )
718            .await?;
719
720        Ok((
721            ApiInvocResult {
722                msg_cid: msg.cid(),
723                msg,
724                msg_rct: Some(apply_ret.msg_receipt()),
725                error: apply_ret.failure_info().unwrap_or_default(),
726                duration: duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
727                gas_cost: MessageGasCost::default(),
728                execution_trace: structured::parse_events(apply_ret.exec_trace())
729                    .unwrap_or_default(),
730            },
731            state_root,
732        ))
733    }
734
735    /// Computes message on the given [Tipset] state, after applying other
736    /// messages and returns the values computed in the VM.
737    pub async fn call_with_gas(
738        self: &Arc<Self>,
739        message: &mut ChainMessage,
740        prior_messages: &[ChainMessage],
741        tipset: Option<Tipset>,
742        trace_config: VMTrace,
743        state_lookup: StateLookupPolicy,
744        vm_flush: VMFlush,
745    ) -> Result<(InvocResult, ApplyRet, Duration, Option<Cid>), Error> {
746        let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
747        let (st, _) = self
748            .tipset_state(&ts, state_lookup)
749            .await
750            .map_err(|e| Error::Other(format!("Could not load tipset state: {e}")))?;
751        let chain_rand = self.chain_rand(ts.clone());
752
753        // Since we're simulating a future message, pretend we're applying it in the
754        // "next" tipset
755        let epoch = ts.epoch() + 1;
756        let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
757        // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from
758        // FVM, but that introduces some constraints, and possible deadlocks.
759        let (ret, duration, state_cid) = stacker::grow(64 << 20, || -> anyhow::Result<_> {
760            let mut vm = VM::new(
761                ExecutionContext {
762                    heaviest_tipset: ts.clone(),
763                    state_tree_root: st,
764                    epoch,
765                    rand: Box::new(chain_rand),
766                    base_fee: ts.block_headers().first().parent_base_fee.clone(),
767                    circ_supply: genesis_info.get_vm_circulating_supply(
768                        epoch,
769                        self.blockstore(),
770                        &st,
771                    )?,
772                    chain_config: self.chain_config().clone(),
773                    chain_index: self.chain_index().clone(),
774                    timestamp: ts.min_timestamp(),
775                },
776                &self.engine,
777                trace_config,
778            )?;
779
780            for msg in prior_messages {
781                vm.apply_message(msg)?;
782            }
783            let from_actor = vm
784                .get_actor(&message.from())
785                .map_err(|e| Error::Other(format!("Could not get actor from state: {e}")))?
786                .ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?;
787
788            message.set_sequence(from_actor.sequence);
789            let (ret, duration) = vm.apply_message(message)?;
790            let state_root = match vm_flush {
791                VMFlush::Flush => Some(vm.flush()?),
792                VMFlush::Skip => None,
793            };
794            Ok((ret, duration, state_root))
795        })?;
796
797        Ok((
798            InvocResult::new(message.message().clone(), &ret),
799            ret,
800            duration,
801            state_cid,
802        ))
803    }
804
805    /// Replays the given message and returns the result of executing the
806    /// indicated message, assuming it was executed in the indicated tipset.
807    pub async fn replay(self: &Arc<Self>, ts: Tipset, mcid: Cid) -> Result<ApiInvocResult, Error> {
808        let this = Arc::clone(self);
809        tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid))
810            .await
811            .map_err(|e| Error::Other(format!("{e}")))?
812    }
813
814    /// Blocking version of `replay`
815    pub fn replay_blocking(
816        self: &Arc<Self>,
817        ts: Tipset,
818        mcid: Cid,
819    ) -> Result<ApiInvocResult, Error> {
820        const REPLAY_HALT: &str = "replay_halt";
821
822        let mut api_invoc_result = None;
823        let callback = |ctx: MessageCallbackCtx<'_>| {
824            match ctx.at {
825                CalledAt::Applied | CalledAt::Reward
826                    if api_invoc_result.is_none() && ctx.cid == mcid =>
827                {
828                    api_invoc_result = Some(ApiInvocResult {
829                        msg_cid: ctx.message.cid(),
830                        msg: ctx.message.message().clone(),
831                        msg_rct: Some(ctx.apply_ret.msg_receipt()),
832                        error: ctx.apply_ret.failure_info().unwrap_or_default(),
833                        duration: ctx.duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
834                        gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
835                        execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
836                            .unwrap_or_default(),
837                    });
838                    anyhow::bail!(REPLAY_HALT);
839                }
840                _ => Ok(()), // ignored
841            }
842        };
843        let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced);
844        if let Err(error_message) = result
845            && error_message.to_string() != REPLAY_HALT
846        {
847            return Err(Error::Other(format!(
848                "unexpected error during execution : {error_message:}"
849            )));
850        }
851        api_invoc_result.ok_or_else(|| Error::Other("failed to replay".into()))
852    }
853
854    /// Checks the eligibility of the miner. This is used in the validation that
855    /// a block's miner has the requirements to mine a block.
856    pub fn eligible_to_mine(
857        &self,
858        address: &Address,
859        base_tipset: &Tipset,
860        lookback_tipset: &Tipset,
861    ) -> anyhow::Result<bool, Error> {
862        let hmp =
863            self.miner_has_min_power(&self.chain_config().policy, address, lookback_tipset)?;
864        let version = self.get_network_version(base_tipset.epoch());
865
866        if version <= NetworkVersion::V3 {
867            return Ok(hmp);
868        }
869
870        if !hmp {
871            return Ok(false);
872        }
873
874        let actor = self
875            .get_actor(&Address::POWER_ACTOR, *base_tipset.parent_state())?
876            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
877
878        let power_state = power::State::load(self.blockstore(), actor.code, actor.state)?;
879
880        let actor = self
881            .get_actor(address, *base_tipset.parent_state())?
882            .ok_or_else(|| Error::state("Miner actor address could not be resolved"))?;
883
884        let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
885
886        // Non-empty power claim.
887        let claim = power_state
888            .miner_power(self.blockstore(), address)?
889            .ok_or_else(|| Error::Other("Could not get claim".to_string()))?;
890        if claim.quality_adj_power <= BigInt::zero() {
891            return Ok(false);
892        }
893
894        // No fee debt.
895        if !miner_state.fee_debt().is_zero() {
896            return Ok(false);
897        }
898
899        // No active consensus faults.
900        let info = miner_state.info(self.blockstore())?;
901        if base_tipset.epoch() <= info.consensus_fault_elapsed {
902            return Ok(false);
903        }
904
905        Ok(true)
906    }
907
908    /// Conceptually, a [`Tipset`] consists of _blocks_ which share an _epoch_.
909    /// Each _block_ contains _messages_, which are executed by the _Filecoin Virtual Machine_.
910    ///
911    /// VM message execution essentially looks like this:
912    /// ```text
913    /// state[N-900..N] * message = state[N+1]
914    /// ```
915    ///
916    /// The `state`s above are stored in the `IPLD Blockstore`, and can be referred to by
917    /// a [`Cid`] - the _state root_.
918    /// The previous 900 states (configurable, see
919    /// <https://docs.filecoin.io/reference/general/glossary/#finality>) can be
920    /// queried when executing a message, so a store needs at least that many.
921    /// (a snapshot typically contains 2000, for example).
922    ///
923    /// Each message costs FIL to execute - this is _gas_.
924    /// After execution, the message has a _receipt_, showing how much gas was spent.
925    /// This is similarly a [`Cid`] into the block store.
926    ///
927    /// For details, see the documentation for [`apply_block_messages`].
928    ///
929    #[instrument(skip_all)]
930    pub async fn compute_tipset_state(
931        self: &Arc<Self>,
932        tipset: Tipset,
933        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
934        enable_tracing: VMTrace,
935    ) -> Result<StateOutput, Error> {
936        let this = Arc::clone(self);
937        tokio::task::spawn_blocking(move || {
938            this.compute_tipset_state_blocking(tipset, callback, enable_tracing)
939        })
940        .await?
941    }
942
943    /// Blocking version of `compute_tipset_state`
944    #[tracing::instrument(skip_all)]
945    pub fn compute_tipset_state_blocking(
946        &self,
947        tipset: Tipset,
948        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
949        enable_tracing: VMTrace,
950    ) -> Result<StateOutput, Error> {
951        let epoch = tipset.epoch();
952        let has_callback = callback.is_some();
953        Ok(apply_block_messages(
954            self.chain_store().genesis_block_header().timestamp,
955            Arc::clone(self.chain_index()),
956            Arc::clone(self.chain_config()),
957            self.beacon_schedule().clone(),
958            &self.engine,
959            tipset,
960            callback,
961            enable_tracing,
962        )
963        .map_err(|e| {
964            if has_callback {
965                e
966            } else {
967                anyhow::anyhow!("Failed to compute tipset state@{epoch}: {e}")
968            }
969        })?)
970    }
971
972    #[instrument(skip_all)]
973    pub async fn compute_state(
974        self: &Arc<Self>,
975        height: ChainEpoch,
976        messages: Vec<Message>,
977        tipset: Tipset,
978        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
979        enable_tracing: VMTrace,
980    ) -> Result<StateOutput, Error> {
981        let this = Arc::clone(self);
982        tokio::task::spawn_blocking(move || {
983            this.compute_state_blocking(height, messages, tipset, callback, enable_tracing)
984        })
985        .await?
986    }
987
988    /// Blocking version of `compute_state`
989    #[tracing::instrument(skip_all)]
990    pub fn compute_state_blocking(
991        &self,
992        height: ChainEpoch,
993        messages: Vec<Message>,
994        tipset: Tipset,
995        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
996        enable_tracing: VMTrace,
997    ) -> Result<StateOutput, Error> {
998        Ok(compute_state(
999            height,
1000            messages,
1001            tipset,
1002            self.chain_store().genesis_block_header().timestamp,
1003            Arc::clone(self.chain_index()),
1004            Arc::clone(self.chain_config()),
1005            self.beacon_schedule().clone(),
1006            &self.engine,
1007            callback,
1008            enable_tracing,
1009        )?)
1010    }
1011
1012    /// Check if tipset had executed the message, by loading the receipt based
1013    /// on the index of the message in the block.
1014    fn tipset_executed_message(
1015        &self,
1016        tipset: &Tipset,
1017        message: &ChainMessage,
1018        allow_replaced: bool,
1019    ) -> Result<Option<Receipt>, Error> {
1020        if tipset.epoch() == 0 {
1021            return Ok(None);
1022        }
1023        let message_from_address = message.from();
1024        let message_sequence = message.sequence();
1025        // Load parent state.
1026        let pts = self
1027            .chain_index()
1028            .load_required_tipset(tipset.parents())
1029            .map_err(|err| Error::Other(format!("Failed to load tipset: {err}")))?;
1030        let messages = self
1031            .cs
1032            .messages_for_tipset(&pts)
1033            .map_err(|err| Error::Other(format!("Failed to load messages for tipset: {err}")))?;
1034        messages
1035            .iter()
1036            .enumerate()
1037            // iterate in reverse because we going backwards through the chain
1038            .rev()
1039            .filter(|(_, s)| {
1040                s.sequence() == message_sequence
1041                    && s.from() == message_from_address
1042                    && s.equal_call(message)
1043            })
1044            .map(|(index, m)| {
1045                // A replacing message is a message with a different CID,
1046                // any of Gas values, and different signature, but with all
1047                // other parameters matching (source/destination, nonce, params, etc.)
1048                if !allow_replaced && message.cid() != m.cid(){
1049                    Err(Error::Other(format!(
1050                        "found message with equal nonce and call params but different CID. wanted {}, found: {}, nonce: {}, from: {}",
1051                        message.cid(),
1052                        m.cid(),
1053                        message.sequence(),
1054                        message.from(),
1055                    )))
1056                } else {
1057                    let block_header = tipset.block_headers().first();
1058                    crate::chain::get_parent_receipt(
1059                        self.blockstore(),
1060                        block_header,
1061                        index,
1062                    )
1063                    .map_err(|err| Error::Other(format!("Failed to get parent receipt (message_receipts={}, index={index}, error={err})", block_header.message_receipts)))
1064                }
1065            })
1066            .next()
1067            .unwrap_or(Ok(None))
1068    }
1069
1070    fn check_search(
1071        &self,
1072        mut current: Tipset,
1073        message: &ChainMessage,
1074        lookback_max_epoch: ChainEpoch,
1075        allow_replaced: bool,
1076    ) -> Result<Option<(Tipset, Receipt)>, Error> {
1077        let message_from_address = message.from();
1078        let message_sequence = message.sequence();
1079        let mut current_actor_state = self
1080            .get_required_actor(&message_from_address, *current.parent_state())
1081            .map_err(Error::state)?;
1082        let message_from_id = self.lookup_required_id(&message_from_address, &current)?;
1083
1084        while current.epoch() >= lookback_max_epoch {
1085            let parent_tipset = self
1086                .chain_index()
1087                .load_required_tipset(current.parents())
1088                .map_err(|err| {
1089                    Error::Other(format!(
1090                        "failed to load tipset during msg wait searchback: {err:}"
1091                    ))
1092                })?;
1093
1094            let parent_actor_state = self
1095                .get_actor(&message_from_id, *parent_tipset.parent_state())
1096                .map_err(|e| Error::State(e.to_string()))?;
1097
1098            if parent_actor_state.is_none()
1099                || (current_actor_state.sequence > message_sequence
1100                    && parent_actor_state.as_ref().unwrap().sequence <= message_sequence)
1101            {
1102                let receipt = self
1103                    .tipset_executed_message(&current, message, allow_replaced)?
1104                    .context("Failed to get receipt with tipset_executed_message")?;
1105                return Ok(Some((current, receipt)));
1106            }
1107
1108            if let Some(parent_actor_state) = parent_actor_state {
1109                current = parent_tipset;
1110                current_actor_state = parent_actor_state;
1111            } else {
1112                break;
1113            }
1114        }
1115
1116        Ok(None)
1117    }
1118
1119    /// Searches backwards through the chain for a message receipt.
1120    fn search_back_for_message(
1121        &self,
1122        current: Tipset,
1123        message: &ChainMessage,
1124        look_back_limit: Option<i64>,
1125        allow_replaced: Option<bool>,
1126    ) -> Result<Option<(Tipset, Receipt)>, Error> {
1127        let current_epoch = current.epoch();
1128        let allow_replaced = allow_replaced.unwrap_or(true);
1129
1130        // Calculate the max lookback epoch (inclusive lower bound) for the search.
1131        let lookback_max_epoch = match look_back_limit {
1132            // No search: limit = 0 means search 0 epochs
1133            Some(0) => return Ok(None),
1134            // Limited search: calculate the inclusive lower bound, clamped to genesis
1135            // Example: limit=5 at epoch=1000 → min_epoch=996, searches [996,1000] = 5 epochs
1136            // Example: limit=2000 at epoch=1000 → min_epoch=0, searches [0,1000] = 1001 epochs (all available)
1137            Some(limit) if limit > 0 => (current_epoch - limit + 1).max(0),
1138            // Search all the way to genesis (epoch 0)
1139            _ => 0,
1140        };
1141
1142        self.check_search(current, message, lookback_max_epoch, allow_replaced)
1143    }
1144
1145    /// Returns a message receipt from a given tipset and message CID.
1146    pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result<Receipt, Error> {
1147        let m = crate::chain::get_chain_message(self.blockstore(), &msg)
1148            .map_err(|e| Error::Other(e.to_string()))?;
1149        let message_receipt = self.tipset_executed_message(&tipset, &m, true)?;
1150        if let Some(receipt) = message_receipt {
1151            return Ok(receipt);
1152        }
1153
1154        let maybe_tuple = self.search_back_for_message(tipset, &m, None, None)?;
1155        let message_receipt = maybe_tuple
1156            .ok_or_else(|| {
1157                Error::Other("Could not get receipt from search back message".to_string())
1158            })?
1159            .1;
1160        Ok(message_receipt)
1161    }
1162
1163    /// `WaitForMessage` blocks until a message appears on chain. It looks
1164    /// backwards in the chain to see if this has already happened. It
1165    /// guarantees that the message has been on chain for at least
1166    /// confidence epochs without being reverted before returning.
1167    pub async fn wait_for_message(
1168        self: &Arc<Self>,
1169        msg_cid: Cid,
1170        confidence: i64,
1171        look_back_limit: Option<ChainEpoch>,
1172        allow_replaced: Option<bool>,
1173    ) -> Result<(Option<Tipset>, Option<Receipt>), Error> {
1174        let mut subscriber = self.cs.publisher().subscribe();
1175        let (sender, mut receiver) = oneshot::channel::<()>();
1176        let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1177            .map_err(|err| Error::Other(format!("failed to load message {err:}")))?;
1178        let current_tipset = self.heaviest_tipset();
1179        let maybe_message_receipt =
1180            self.tipset_executed_message(&current_tipset, &message, true)?;
1181        if let Some(r) = maybe_message_receipt {
1182            return Ok((Some(current_tipset.clone()), Some(r)));
1183        }
1184
1185        let mut candidate_tipset: Option<Tipset> = None;
1186        let mut candidate_receipt: Option<Receipt> = None;
1187
1188        let sm_cloned = Arc::clone(self);
1189
1190        let message_for_task = message.clone();
1191        let height_of_head = current_tipset.epoch();
1192        let task = tokio::task::spawn(async move {
1193            let back_tuple = sm_cloned.search_back_for_message(
1194                current_tipset,
1195                &message_for_task,
1196                look_back_limit,
1197                allow_replaced,
1198            )?;
1199            sender
1200                .send(())
1201                .map_err(|e| Error::Other(format!("Could not send to channel {e:?}")))?;
1202            Ok::<_, Error>(back_tuple)
1203        });
1204
1205        let reverts: Arc<RwLock<HashMap<TipsetKey, bool>>> = Arc::new(RwLock::new(HashMap::new()));
1206        let block_revert = reverts.clone();
1207        let sm_cloned = Arc::clone(self);
1208
1209        // Wait for message to be included in head change.
1210        let mut subscriber_poll = tokio::task::spawn(async move {
1211            loop {
1212                match subscriber.recv().await {
1213                    Ok(subscriber) => match subscriber {
1214                        HeadChange::Apply(tipset) => {
1215                            if candidate_tipset
1216                                .as_ref()
1217                                .map(|s| tipset.epoch() >= s.epoch() + confidence)
1218                                .unwrap_or_default()
1219                            {
1220                                return Ok((candidate_tipset, candidate_receipt));
1221                            }
1222                            let poll_receiver = receiver.try_recv();
1223                            if let Ok(Some(_)) = poll_receiver {
1224                                block_revert
1225                                    .write()
1226                                    .await
1227                                    .insert(tipset.key().to_owned(), true);
1228                            }
1229
1230                            let maybe_receipt =
1231                                sm_cloned.tipset_executed_message(&tipset, &message, true)?;
1232                            if let Some(receipt) = maybe_receipt {
1233                                if confidence == 0 {
1234                                    return Ok((Some(tipset), Some(receipt)));
1235                                }
1236                                candidate_tipset = Some(tipset);
1237                                candidate_receipt = Some(receipt)
1238                            }
1239                        }
1240                    },
1241                    Err(RecvError::Lagged(i)) => {
1242                        warn!(
1243                            "wait for message head change subscriber lagged, skipped {} events",
1244                            i
1245                        );
1246                    }
1247                    Err(RecvError::Closed) => break,
1248                }
1249            }
1250            Ok((None, None))
1251        })
1252        .fuse();
1253
1254        // Search backwards for message.
1255        let mut search_back_poll = tokio::task::spawn(async move {
1256            let back_tuple = task.await.map_err(|e| {
1257                Error::Other(format!("Could not search backwards for message {e}"))
1258            })??;
1259            if let Some((back_tipset, back_receipt)) = back_tuple {
1260                let should_revert = *reverts
1261                    .read()
1262                    .await
1263                    .get(back_tipset.key())
1264                    .unwrap_or(&false);
1265                let larger_height_of_head = height_of_head >= back_tipset.epoch() + confidence;
1266                if !should_revert && larger_height_of_head {
1267                    return Ok::<_, Error>((Some(back_tipset), Some(back_receipt)));
1268                }
1269                return Ok((None, None));
1270            }
1271            Ok((None, None))
1272        })
1273        .fuse();
1274
1275        // Await on first future to finish.
1276        loop {
1277            select! {
1278                res = subscriber_poll => {
1279                    return res?
1280                }
1281                res = search_back_poll => {
1282                    if let Ok((Some(ts), Some(rct))) = res? {
1283                        return Ok((Some(ts), Some(rct)));
1284                    }
1285                }
1286            }
1287        }
1288    }
1289
1290    pub async fn search_for_message(
1291        &self,
1292        from: Option<Tipset>,
1293        msg_cid: Cid,
1294        look_back_limit: Option<i64>,
1295        allow_replaced: Option<bool>,
1296    ) -> Result<Option<(Tipset, Receipt)>, Error> {
1297        let from = from.unwrap_or_else(|| self.heaviest_tipset());
1298        let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1299            .map_err(|err| Error::Other(format!("failed to load message {err}")))?;
1300        let current_tipset = self.heaviest_tipset();
1301        let maybe_message_receipt =
1302            self.tipset_executed_message(&from, &message, allow_replaced.unwrap_or(true))?;
1303        if let Some(r) = maybe_message_receipt {
1304            Ok(Some((from, r)))
1305        } else {
1306            self.search_back_for_message(current_tipset, &message, look_back_limit, allow_replaced)
1307        }
1308    }
1309
1310    /// Returns a BLS public key from provided address
1311    pub fn get_bls_public_key(
1312        db: &Arc<DB>,
1313        addr: &Address,
1314        state_cid: Cid,
1315    ) -> Result<BlsPublicKey, Error> {
1316        let state = StateTree::new_from_root(Arc::clone(db), &state_cid)
1317            .map_err(|e| Error::Other(e.to_string()))?;
1318        let kaddr = resolve_to_key_addr(&state, db, addr)
1319            .map_err(|e| format!("Failed to resolve key address, error: {e}"))?;
1320
1321        match kaddr.into_payload() {
1322            Payload::BLS(key) => BlsPublicKey::from_bytes(&key)
1323                .map_err(|e| Error::Other(format!("Failed to construct bls public key: {e}"))),
1324            _ => Err(Error::state(
1325                "Address must be BLS address to load bls public key",
1326            )),
1327        }
1328    }
1329
1330    /// Looks up ID [Address] from the state at the given [Tipset].
1331    pub fn lookup_id(&self, addr: &Address, ts: &Tipset) -> Result<Option<Address>, Error> {
1332        let state_tree = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1333            .map_err(|e| format!("{e:?}"))?;
1334        Ok(state_tree
1335            .lookup_id(addr)
1336            .map_err(|e| Error::Other(e.to_string()))?
1337            .map(Address::new_id))
1338    }
1339
1340    /// Looks up required ID [Address] from the state at the given [Tipset].
1341    pub fn lookup_required_id(&self, addr: &Address, ts: &Tipset) -> Result<Address, Error> {
1342        self.lookup_id(addr, ts)?
1343            .ok_or_else(|| Error::Other(format!("Failed to lookup the id address {addr}")))
1344    }
1345
1346    /// Retrieves market state
1347    pub fn market_state(&self, ts: &Tipset) -> Result<market::State, Error> {
1348        let actor = self.get_required_actor(&Address::MARKET_ACTOR, *ts.parent_state())?;
1349        let market_state = market::State::load(self.blockstore(), actor.code, actor.state)?;
1350        Ok(market_state)
1351    }
1352
1353    /// Retrieves market balance in escrow and locked tables.
1354    pub fn market_balance(&self, addr: &Address, ts: &Tipset) -> Result<MarketBalance, Error> {
1355        let market_state = self.market_state(ts)?;
1356        let new_addr = self.lookup_required_id(addr, ts)?;
1357        let out = MarketBalance {
1358            escrow: {
1359                market_state
1360                    .escrow_table(self.blockstore())?
1361                    .get(&new_addr)?
1362            },
1363            locked: {
1364                market_state
1365                    .locked_table(self.blockstore())?
1366                    .get(&new_addr)?
1367            },
1368        };
1369
1370        Ok(out)
1371    }
1372
1373    /// Retrieves miner info.
1374    pub fn miner_info(&self, addr: &Address, ts: &Tipset) -> Result<MinerInfo, Error> {
1375        let actor = self
1376            .get_actor(addr, *ts.parent_state())?
1377            .ok_or_else(|| Error::state("Miner actor not found"))?;
1378        let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1379
1380        Ok(state.info(self.blockstore())?)
1381    }
1382
1383    /// Retrieves miner faults.
1384    pub fn miner_faults(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1385        self.all_partition_sectors(addr, ts, |partition| partition.faulty_sectors().clone())
1386    }
1387
1388    /// Retrieves miner recoveries.
1389    pub fn miner_recoveries(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1390        self.all_partition_sectors(addr, ts, |partition| partition.recovering_sectors().clone())
1391    }
1392
1393    fn all_partition_sectors(
1394        &self,
1395        addr: &Address,
1396        ts: &Tipset,
1397        get_sector: impl Fn(Partition<'_>) -> BitField,
1398    ) -> Result<BitField, Error> {
1399        let actor = self
1400            .get_actor(addr, *ts.parent_state())?
1401            .ok_or_else(|| Error::state("Miner actor not found"))?;
1402
1403        let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1404
1405        let mut partitions = Vec::new();
1406
1407        state.for_each_deadline(
1408            &self.chain_config().policy,
1409            self.blockstore(),
1410            |_, deadline| {
1411                deadline.for_each(self.blockstore(), |_, partition| {
1412                    partitions.push(get_sector(partition));
1413                    Ok(())
1414                })
1415            },
1416        )?;
1417
1418        Ok(BitField::union(partitions.iter()))
1419    }
1420
1421    /// Retrieves miner power.
1422    pub fn miner_power(&self, addr: &Address, ts: &Tipset) -> Result<MinerPower, Error> {
1423        if let Some((miner_power, total_power)) = self.get_power(ts.parent_state(), Some(addr))? {
1424            return Ok(MinerPower {
1425                miner_power,
1426                total_power,
1427                has_min_power: true,
1428            });
1429        }
1430
1431        Ok(MinerPower {
1432            has_min_power: false,
1433            miner_power: Default::default(),
1434            total_power: Default::default(),
1435        })
1436    }
1437
1438    /// Similar to `resolve_to_key_addr` in the `forest_vm` [`crate::state_manager`] but does not
1439    /// allow `Actor` type of addresses. Uses `ts` to generate the VM state.
1440    pub async fn resolve_to_key_addr(
1441        self: &Arc<Self>,
1442        addr: &Address,
1443        ts: &Tipset,
1444    ) -> Result<Address, anyhow::Error> {
1445        match addr.protocol() {
1446            Protocol::BLS | Protocol::Secp256k1 | Protocol::Delegated => return Ok(*addr),
1447            Protocol::Actor => {
1448                return Err(Error::Other(
1449                    "cannot resolve actor address to key address".to_string(),
1450                )
1451                .into());
1452            }
1453            _ => {}
1454        };
1455
1456        // First try to resolve the actor in the parent state, so we don't have to
1457        // compute anything.
1458        let state = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())?;
1459        if let Ok(addr) = resolve_to_key_addr(&state, self.blockstore(), addr) {
1460            return Ok(addr);
1461        }
1462
1463        // If that fails, compute the tip-set and try again.
1464        let (st, _) = self.tipset_state(ts, StateLookupPolicy::Enabled).await?;
1465        let state = StateTree::new_from_root(self.blockstore_owned(), &st)?;
1466
1467        resolve_to_key_addr(&state, self.blockstore(), addr)
1468    }
1469
1470    pub async fn miner_get_base_info(
1471        self: &Arc<Self>,
1472        beacon_schedule: &BeaconSchedule,
1473        tipset: Tipset,
1474        addr: Address,
1475        epoch: ChainEpoch,
1476    ) -> anyhow::Result<Option<MiningBaseInfo>> {
1477        let prev_beacon = self
1478            .chain_store()
1479            .chain_index()
1480            .latest_beacon_entry(tipset.clone())?;
1481
1482        let entries: Vec<BeaconEntry> = beacon_schedule
1483            .beacon_entries_for_block(
1484                self.chain_config().network_version(epoch),
1485                epoch,
1486                tipset.epoch(),
1487                &prev_beacon,
1488            )
1489            .await?;
1490
1491        let base = entries.last().unwrap_or(&prev_beacon);
1492
1493        let (lb_tipset, lb_state_root) = ChainStore::get_lookback_tipset_for_round(
1494            self.chain_index(),
1495            self.chain_config(),
1496            &tipset,
1497            epoch,
1498        )?;
1499
1500        // If the miner actor doesn't exist in the current tipset, it is a
1501        // user-error and we must return an error message. If the miner exists
1502        // in the current tipset but not in the lookback tipset, we may not
1503        // error and should instead return None.
1504        let actor = self.get_required_actor(&addr, *tipset.parent_state())?;
1505        if self.get_actor(&addr, lb_state_root)?.is_none() {
1506            return Ok(None);
1507        }
1508
1509        let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1510
1511        let addr_buf = to_vec(&addr)?;
1512        let rand = draw_randomness(
1513            base.signature(),
1514            DomainSeparationTag::WinningPoStChallengeSeed as i64,
1515            epoch,
1516            &addr_buf,
1517        )?;
1518
1519        let network_version = self.chain_config().network_version(tipset.epoch());
1520        let sectors = self.get_sectors_for_winning_post(
1521            &lb_state_root,
1522            network_version,
1523            &addr,
1524            Randomness::new(rand.to_vec()),
1525        )?;
1526
1527        if sectors.is_empty() {
1528            return Ok(None);
1529        }
1530
1531        let (miner_power, total_power) = self
1532            .get_power(&lb_state_root, Some(&addr))?
1533            .context("failed to get power")?;
1534
1535        let info = miner_state.info(self.blockstore())?;
1536
1537        let worker_key = self
1538            .resolve_to_deterministic_address(info.worker, &tipset)
1539            .await?;
1540        let eligible = self.eligible_to_mine(&addr, &tipset, &lb_tipset)?;
1541
1542        Ok(Some(MiningBaseInfo {
1543            miner_power: miner_power.quality_adj_power,
1544            network_power: total_power.quality_adj_power,
1545            sectors,
1546            worker_key,
1547            sector_size: info.sector_size,
1548            prev_beacon_entry: prev_beacon,
1549            beacon_entries: entries,
1550            eligible_for_mining: eligible,
1551        }))
1552    }
1553
1554    /// Checks power actor state for if miner meets consensus minimum
1555    /// requirements.
1556    pub fn miner_has_min_power(
1557        &self,
1558        policy: &Policy,
1559        addr: &Address,
1560        ts: &Tipset,
1561    ) -> anyhow::Result<bool> {
1562        let actor = self
1563            .get_actor(&Address::POWER_ACTOR, *ts.parent_state())?
1564            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
1565        let ps = power::State::load(self.blockstore(), actor.code, actor.state)?;
1566
1567        ps.miner_nominal_power_meets_consensus_minimum(policy, self.blockstore(), addr)
1568    }
1569
1570    /// Validates all tipsets at epoch `start..=end` behind the heaviest tipset.
1571    ///
1572    /// This spawns [`rayon::current_num_threads`] threads to do the compute-heavy work
1573    /// of tipset validation.
1574    ///
1575    /// # What is validation?
1576    /// Every state transition returns a new _state root_, which is typically retained in, e.g., snapshots.
1577    /// For "full" snapshots, all state roots are retained.
1578    /// For standard snapshots, the last 2000 or so state roots are retained.
1579    ///
1580    /// _receipts_ meanwhile, are typically ephemeral, but each tipset knows the _receipt root_
1581    /// (hash) of the previous tipset.
1582    ///
1583    /// This function takes advantage of that fact to validate tipsets:
1584    /// - `tipset[N]` claims that `receipt_root[N-1]` should be `0xDEADBEEF`
1585    /// - find `tipset[N-1]`, and perform its state transition to get the actual `receipt_root`
1586    /// - assert that they match
1587    ///
1588    /// See [`Self::compute_tipset_state_blocking`] for an explanation of state transitions.
1589    ///
1590    /// # Known issues
1591    /// This function is blocking, but we do observe threads waiting and synchronizing.
1592    /// This is suspected to be due something in the VM or its `WASM` runtime.
1593    #[tracing::instrument(skip(self))]
1594    pub fn validate_range(&self, epochs: RangeInclusive<i64>) -> anyhow::Result<()> {
1595        let heaviest = self.heaviest_tipset();
1596        let heaviest_epoch = heaviest.epoch();
1597        let end = self
1598            .chain_index()
1599            .tipset_by_height(*epochs.end(), heaviest, ResolveNullTipset::TakeOlder)
1600            .with_context(|| {
1601                format!(
1602            "couldn't get a tipset at height {} behind heaviest tipset at height {heaviest_epoch}",
1603            *epochs.end(),
1604        )
1605            })?;
1606
1607        // lookup tipset parents as we go along, iterating DOWN from `end`
1608        let tipsets = self
1609            .chain_index()
1610            .chain(end)
1611            .take_while(|tipset| tipset.epoch() >= *epochs.start());
1612
1613        self.validate_tipsets(tipsets)
1614    }
1615
1616    pub fn validate_tipsets<T>(&self, tipsets: T) -> anyhow::Result<()>
1617    where
1618        T: Iterator<Item = Tipset> + Send,
1619    {
1620        let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1621        validate_tipsets(
1622            genesis_timestamp,
1623            self.chain_index().clone(),
1624            self.chain_config().clone(),
1625            self.beacon_schedule().clone(),
1626            &self.engine,
1627            tipsets,
1628        )
1629    }
1630
1631    pub fn get_verified_registry_actor_state(
1632        &self,
1633        ts: &Tipset,
1634    ) -> anyhow::Result<verifreg::State> {
1635        let act = self
1636            .get_actor(&Address::VERIFIED_REGISTRY_ACTOR, *ts.parent_state())
1637            .map_err(Error::state)?
1638            .ok_or_else(|| Error::state("actor not found"))?;
1639        verifreg::State::load(self.blockstore(), act.code, act.state)
1640    }
1641    pub fn get_claim(
1642        &self,
1643        addr: &Address,
1644        ts: &Tipset,
1645        claim_id: ClaimID,
1646    ) -> anyhow::Result<Option<Claim>> {
1647        let id_address = self.lookup_required_id(addr, ts)?;
1648        let state = self.get_verified_registry_actor_state(ts)?;
1649        state.get_claim(self.blockstore(), id_address, claim_id)
1650    }
1651
1652    pub fn get_all_claims(&self, ts: &Tipset) -> anyhow::Result<HashMap<ClaimID, Claim>> {
1653        let state = self.get_verified_registry_actor_state(ts)?;
1654        state.get_all_claims(self.blockstore())
1655    }
1656
1657    pub fn get_allocation(
1658        &self,
1659        addr: &Address,
1660        ts: &Tipset,
1661        allocation_id: AllocationID,
1662    ) -> anyhow::Result<Option<Allocation>> {
1663        let id_address = self.lookup_required_id(addr, ts)?;
1664        let state = self.get_verified_registry_actor_state(ts)?;
1665        state.get_allocation(self.blockstore(), id_address.id()?, allocation_id)
1666    }
1667
1668    pub fn get_all_allocations(
1669        &self,
1670        ts: &Tipset,
1671    ) -> anyhow::Result<HashMap<AllocationID, Allocation>> {
1672        let state = self.get_verified_registry_actor_state(ts)?;
1673        state.get_all_allocations(self.blockstore())
1674    }
1675
1676    pub fn verified_client_status(
1677        &self,
1678        addr: &Address,
1679        ts: &Tipset,
1680    ) -> anyhow::Result<Option<DataCap>> {
1681        let id = self.lookup_required_id(addr, ts)?;
1682        let network_version = self.get_network_version(ts.epoch());
1683
1684        // This is a copy of Lotus code, we need to treat all the actors below version 9
1685        // differently. Which maps to network below version 17.
1686        // Original: https://github.com/filecoin-project/lotus/blob/5e76b05b17771da6939c7b0bf65127c3dc70ee23/node/impl/full/state.go#L1627-L1664.
1687        if (u32::from(network_version.0)) < 17 {
1688            let state = self.get_verified_registry_actor_state(ts)?;
1689            return state.verified_client_data_cap(self.blockstore(), id);
1690        }
1691
1692        let act = self
1693            .get_actor(&Address::DATACAP_TOKEN_ACTOR, *ts.parent_state())
1694            .map_err(Error::state)?
1695            .ok_or_else(|| Error::state("Miner actor not found"))?;
1696
1697        let state = datacap::State::load(self.blockstore(), act.code, act.state)?;
1698
1699        state.verified_client_data_cap(self.blockstore(), id)
1700    }
1701
1702    pub async fn resolve_to_deterministic_address(
1703        self: &Arc<Self>,
1704        address: Address,
1705        ts: &Tipset,
1706    ) -> anyhow::Result<Address> {
1707        use crate::shim::address::Protocol::*;
1708        match address.protocol() {
1709            BLS | Secp256k1 | Delegated => Ok(address),
1710            Actor => anyhow::bail!("cannot resolve actor address to key address"),
1711            _ => {
1712                // First try to resolve the actor in the parent state, so we don't have to compute anything.
1713                if let Ok(state) =
1714                    StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1715                    && let Ok(address) = state
1716                        .resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
1717                {
1718                    return Ok(address);
1719                }
1720
1721                // If that fails, compute the tip-set and try again.
1722                let (state_root, _) = self.tipset_state(ts, StateLookupPolicy::Enabled).await?;
1723                let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
1724                state.resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
1725            }
1726        }
1727    }
1728
1729    pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec<ApiInvocResult>)> {
1730        let mut invoc_trace = vec![];
1731
1732        let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1733
1734        let callback = |ctx: MessageCallbackCtx<'_>| {
1735            match ctx.at {
1736                CalledAt::Applied | CalledAt::Reward => {
1737                    invoc_trace.push(ApiInvocResult {
1738                        msg_cid: ctx.message.cid(),
1739                        msg: ctx.message.message().clone(),
1740                        msg_rct: Some(ctx.apply_ret.msg_receipt()),
1741                        error: ctx.apply_ret.failure_info().unwrap_or_default(),
1742                        duration: ctx.duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
1743                        gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
1744                        execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
1745                            .unwrap_or_default(),
1746                    });
1747                    Ok(())
1748                }
1749                _ => Ok(()), // ignored
1750            }
1751        };
1752
1753        let StateOutput { state_root, .. } = apply_block_messages(
1754            genesis_timestamp,
1755            self.chain_index().clone(),
1756            self.chain_config().clone(),
1757            self.beacon_schedule().clone(),
1758            &self.engine,
1759            tipset.clone(),
1760            Some(callback),
1761            VMTrace::Traced,
1762        )?;
1763
1764        Ok((state_root, invoc_trace))
1765    }
1766
1767    /// Attempts to lookup the state and receipt root of the next tipset.
1768    /// This is a performance optimization to avoid recomputing the state and receipt root by checking the blockstore.
1769    /// It only checks the immediate next epoch, as this is the most likely place to find a child.
1770    fn try_lookup_state_from_next_tipset(&self, tipset: &Tipset) -> Option<StateOutputValue> {
1771        let epoch = tipset.epoch();
1772        let next_epoch = epoch + 1;
1773
1774        // Only check the immediate next epoch - this is the most likely place to find a child
1775        let heaviest = self.heaviest_tipset();
1776        if next_epoch > heaviest.epoch() {
1777            return None;
1778        }
1779
1780        // Check if the next tipset has the same parent
1781        if let Ok(next_tipset) =
1782            self.chain_index()
1783                .tipset_by_height(next_epoch, heaviest, ResolveNullTipset::TakeNewer)
1784        {
1785            // verify that the parent of the `next_tipset` is the same as the current tipset
1786            if !next_tipset.parents().eq(tipset.key()) {
1787                return None;
1788            }
1789
1790            let state_root = next_tipset.parent_state();
1791            let receipt_root = next_tipset.min_ticket_block().message_receipts;
1792
1793            if self.blockstore().has(state_root).unwrap_or(false)
1794                && self.blockstore().has(&receipt_root).unwrap_or(false)
1795            {
1796                return Some(StateOutputValue {
1797                    state_root: state_root.into(),
1798                    receipt_root,
1799                });
1800            }
1801        }
1802
1803        None
1804    }
1805}
1806
1807pub fn validate_tipsets<DB, T>(
1808    genesis_timestamp: u64,
1809    chain_index: Arc<ChainIndex<Arc<DB>>>,
1810    chain_config: Arc<ChainConfig>,
1811    beacon: Arc<BeaconSchedule>,
1812    engine: &MultiEngine,
1813    tipsets: T,
1814) -> anyhow::Result<()>
1815where
1816    DB: Blockstore + Send + Sync + 'static,
1817    T: Iterator<Item = Tipset> + Send,
1818{
1819    use rayon::iter::ParallelIterator as _;
1820    tipsets
1821        .tuple_windows()
1822        .par_bridge()
1823        .try_for_each(|(child, parent)| {
1824            info!(height = parent.epoch(), "compute parent state");
1825            let StateOutput {
1826                state_root: actual_state,
1827                receipt_root: actual_receipt,
1828                ..
1829            } = apply_block_messages(
1830                genesis_timestamp,
1831                chain_index.clone(),
1832                chain_config.clone(),
1833                beacon.clone(),
1834                engine,
1835                parent,
1836                NO_CALLBACK,
1837                VMTrace::NotTraced,
1838            )
1839            .map_err(|e| anyhow::anyhow!("couldn't compute tipset state: {e}"))?;
1840            let expected_receipt = child.min_ticket_block().message_receipts;
1841            let expected_state = child.parent_state();
1842            match (expected_state, expected_receipt) == (&actual_state, actual_receipt) {
1843                true => Ok(()),
1844                false => {
1845                    error!(
1846                        height = child.epoch(),
1847                        ?expected_state,
1848                        ?expected_receipt,
1849                        ?actual_state,
1850                        ?actual_receipt,
1851                        "state mismatch"
1852                    );
1853                    bail!("state mismatch");
1854                }
1855            }
1856        })
1857}
1858
1859/// Messages are transactions that produce new states. The state (usually
1860/// referred to as the 'state-tree') is a mapping from actor addresses to actor
1861/// states. Each block contains the hash of the state-tree that should be used
1862/// as the starting state when executing the block messages.
1863///
1864/// # Execution environment
1865///
1866/// Transaction execution has the following inputs:
1867/// - a current state-tree (stored as IPLD in a key-value database). This
1868///   reference is in [`Tipset::parent_state`].
1869/// - up to 900 past state-trees. See
1870///   <https://docs.filecoin.io/reference/general/glossary/#finality>.
1871/// - up to 900 past tipset IDs.
1872/// - a deterministic source of randomness.
1873/// - the circulating supply of FIL (see
1874///   <https://filecoin.io/blog/filecoin-circulating-supply/>). The circulating
1875///   supply is determined by the epoch and the states of a few key actors.
1876/// - the base fee (see <https://spec.filecoin.io/systems/filecoin_vm/gas_fee/>).
1877///   This value is defined by `tipset.parent_base_fee`.
1878/// - the genesis timestamp (UNIX epoch time when the first block was
1879///   mined/created).
1880/// - a chain configuration (maps epoch to network version, has chain specific
1881///   settings).
1882///
1883/// The result of running a set of block messages is an index to the final
1884/// state-tree and an index to an array of message receipts (listing gas used,
1885/// return codes, etc).
1886///
1887/// # Cron and null tipsets
1888///
1889/// Once per epoch, after all messages have run, a special 'cron' transaction
1890/// must be executed. The tasks of the 'cron' transaction include running batch
1891/// jobs and keeping the state up-to-date with the current epoch.
1892///
1893/// It can happen that no blocks are mined in an epoch. The tipset for such an
1894/// epoch is called a null tipset. A null tipset has no identity and cannot be
1895/// directly executed. This is a problem for 'cron' which must run for every
1896/// epoch, even if there are no messages. The fix is to run 'cron' if there are
1897/// any null tipsets between the current epoch and the parent epoch.
1898///
1899/// Imagine the blockchain looks like this with a null tipset at epoch 9:
1900///
1901/// ```text
1902/// ┌────────┐ ┌────┐ ┌───────┐  ┌───────┐
1903/// │Epoch 10│ │Null│ │Epoch 8├──►Epoch 7├─►
1904/// └───┬────┘ └────┘ └───▲───┘  └───────┘
1905///     └─────────────────┘
1906/// ```
1907///
1908/// The parent of tipset-epoch-10 is tipset-epoch-8. Before executing the
1909/// messages in epoch 10, we have to run cron for epoch 9. However, running
1910/// 'cron' requires the timestamp of the youngest block in the tipset (which
1911/// doesn't exist because there are no blocks in the tipset). Lotus dictates that
1912/// the timestamp of a null tipset is `30s * epoch` after the genesis timestamp.
1913/// So, in the above example, if the genesis block was mined at time `X`, the
1914/// null tipset for epoch 9 will have timestamp `X + 30 * 9`.
1915///
1916/// # Migrations
1917///
1918/// Migrations happen between network upgrades and modify the state tree. If a
1919/// migration is scheduled for epoch 10, it will be run _after_ the messages for
1920/// epoch 10. The tipset for epoch 11 will link the state-tree produced by the
1921/// migration.
1922///
1923/// Example timeline with a migration at epoch 10:
1924///   1. Tipset-epoch-10 executes, producing state-tree A.
1925///   2. Migration consumes state-tree A and produces state-tree B.
1926///   3. Tipset-epoch-11 executes, consuming state-tree B (rather than A).
1927///
1928/// Note: The migration actually happens when tipset-epoch-11 executes. This is
1929///       because tipset-epoch-10 may be null and therefore not executed at all.
1930///
1931/// # Caching
1932///
1933/// Scanning the blockchain to find past tipsets and state-trees may be slow.
1934/// The `ChainStore` caches recent tipsets to make these scans faster.
1935#[allow(clippy::too_many_arguments)]
1936pub fn apply_block_messages<DB>(
1937    genesis_timestamp: u64,
1938    chain_index: Arc<ChainIndex<Arc<DB>>>,
1939    chain_config: Arc<ChainConfig>,
1940    beacon: Arc<BeaconSchedule>,
1941    engine: &MultiEngine,
1942    tipset: Tipset,
1943    mut callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
1944    enable_tracing: VMTrace,
1945) -> anyhow::Result<StateOutput>
1946where
1947    DB: Blockstore + Send + Sync + 'static,
1948{
1949    // This function will:
1950    // 1. handle the genesis block as a special case
1951    // 2. run 'cron' for any null-tipsets between the current tipset and our parent tipset
1952    // 3. run migrations
1953    // 4. execute block messages
1954    // 5. write the state-tree to the DB and return the CID
1955
1956    // step 1: special case for genesis block
1957    if tipset.epoch() == 0 {
1958        // NB: This is here because the process that executes blocks requires that the
1959        // block miner reference a valid miner in the state tree. Unless we create some
1960        // magical genesis miner, this won't work properly, so we short circuit here
1961        // This avoids the question of 'who gets paid the genesis block reward'
1962        let message_receipts = tipset.min_ticket_block().message_receipts;
1963        return Ok(StateOutput {
1964            state_root: *tipset.parent_state(),
1965            receipt_root: message_receipts,
1966            events: vec![],
1967            events_roots: vec![],
1968        });
1969    }
1970
1971    let rand = ChainRand::new(
1972        Arc::clone(&chain_config),
1973        tipset.clone(),
1974        Arc::clone(&chain_index),
1975        beacon,
1976    );
1977
1978    let genesis_info = GenesisInfo::from_chain_config(chain_config.clone());
1979    let create_vm = |state_root: Cid, epoch, timestamp| {
1980        let circulating_supply =
1981            genesis_info.get_vm_circulating_supply(epoch, chain_index.db(), &state_root)?;
1982        VM::new(
1983            ExecutionContext {
1984                heaviest_tipset: tipset.clone(),
1985                state_tree_root: state_root,
1986                epoch,
1987                rand: Box::new(rand.clone()),
1988                base_fee: tipset.min_ticket_block().parent_base_fee.clone(),
1989                circ_supply: circulating_supply,
1990                chain_config: Arc::clone(&chain_config),
1991                chain_index: Arc::clone(&chain_index),
1992                timestamp,
1993            },
1994            engine,
1995            enable_tracing,
1996        )
1997    };
1998
1999    let mut parent_state = *tipset.parent_state();
2000
2001    let parent_epoch = Tipset::load_required(chain_index.db(), tipset.parents())?.epoch();
2002    let epoch = tipset.epoch();
2003
2004    for epoch_i in parent_epoch..epoch {
2005        if epoch_i > parent_epoch {
2006            // step 2: running cron for any null-tipsets
2007            let timestamp = genesis_timestamp + ((EPOCH_DURATION_SECONDS * epoch_i) as u64);
2008
2009            // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from
2010            // FVM, but that introduces some constraints, and possible deadlocks.
2011            parent_state = stacker::grow(64 << 20, || -> anyhow::Result<Cid> {
2012                let mut vm = create_vm(parent_state, epoch_i, timestamp)?;
2013                // run cron for null rounds if any
2014                if let Err(e) = vm.run_cron(epoch_i, callback.as_mut()) {
2015                    error!("Beginning of epoch cron failed to run: {}", e);
2016                }
2017                vm.flush()
2018            })?;
2019        }
2020
2021        // step 3: run migrations
2022        if let Some(new_state) =
2023            run_state_migrations(epoch_i, &chain_config, chain_index.db(), &parent_state)?
2024        {
2025            parent_state = new_state;
2026        }
2027    }
2028
2029    let block_messages = BlockMessages::for_tipset(chain_index.db(), &tipset)
2030        .map_err(|e| Error::Other(e.to_string()))?;
2031
2032    // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from
2033    // FVM, but that introduces some constraints, and possible deadlocks.
2034    stacker::grow(64 << 20, || -> anyhow::Result<StateOutput> {
2035        let mut vm = create_vm(parent_state, epoch, tipset.min_timestamp())?;
2036
2037        // step 4: apply tipset messages
2038        let (receipts, events, events_roots) =
2039            vm.apply_block_messages(&block_messages, epoch, callback)?;
2040
2041        // step 5: construct receipt root from receipts
2042        let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts)?;
2043
2044        // step 6: store events AMTs in the blockstore
2045        for (msg_events, events_root) in events.iter().zip(events_roots.iter()) {
2046            if let Some(event_root) = events_root {
2047                // Store the events AMT - the root CID should match the one computed by FVM
2048                let derived_event_root = Amt::new_from_iter_with_bit_width(
2049                    chain_index.db(),
2050                    EVENTS_AMT_BITWIDTH,
2051                    msg_events.iter(),
2052                )
2053                .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;
2054
2055                // Verify the stored root matches the FVM-computed root
2056                ensure!(
2057                    derived_event_root.eq(event_root),
2058                    "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
2059                );
2060            }
2061        }
2062
2063        let state_root = vm.flush()?;
2064
2065        Ok(StateOutput {
2066            state_root,
2067            receipt_root,
2068            events,
2069            events_roots,
2070        })
2071    })
2072}
2073
2074#[allow(clippy::too_many_arguments)]
2075pub fn compute_state<DB>(
2076    _height: ChainEpoch,
2077    messages: Vec<Message>,
2078    tipset: Tipset,
2079    genesis_timestamp: u64,
2080    chain_index: Arc<ChainIndex<Arc<DB>>>,
2081    chain_config: Arc<ChainConfig>,
2082    beacon: Arc<BeaconSchedule>,
2083    engine: &MultiEngine,
2084    callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
2085    enable_tracing: VMTrace,
2086) -> anyhow::Result<StateOutput>
2087where
2088    DB: Blockstore + Send + Sync + 'static,
2089{
2090    if !messages.is_empty() {
2091        anyhow::bail!("Applying messages is not yet implemented.");
2092    }
2093
2094    let output = apply_block_messages(
2095        genesis_timestamp,
2096        chain_index,
2097        chain_config,
2098        beacon,
2099        engine,
2100        tipset,
2101        callback,
2102        enable_tracing,
2103    )?;
2104
2105    Ok(output)
2106}
2107
2108/// Whether or not to lookup the state output from the next tipset before computing a state
2109#[derive(Debug, Copy, Clone, Default)]
2110pub enum StateLookupPolicy {
2111    #[default]
2112    Enabled,
2113    Disabled,
2114}
2115
2116/// Controls whether the VM should flush its state after execution
2117#[derive(Debug, Copy, Clone, Default)]
2118pub enum VMFlush {
2119    Flush,
2120    #[default]
2121    Skip,
2122}