Skip to main content

forest/state_manager/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4#[cfg(test)]
5mod tests;
6
7mod cache;
8pub mod chain_rand;
9pub mod circulating_supply;
10mod errors;
11pub mod utils;
12
13pub use self::errors::*;
14use self::utils::structured;
15
16use crate::beacon::{BeaconEntry, BeaconSchedule};
17use crate::blocks::{Tipset, TipsetKey};
18use crate::chain::{
19    ChainStore, HeadChange,
20    index::{ChainIndex, ResolveNullTipset},
21};
22use crate::interpreter::{
23    ApplyResult, BlockMessages, CalledAt, ExecutionContext, IMPLICIT_MESSAGE_GAS_LIMIT, VM,
24    resolve_to_key_addr,
25};
26use crate::interpreter::{MessageCallbackCtx, VMTrace};
27use crate::lotus_json::{LotusJson, lotus_json_with_self};
28use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage};
29use crate::networks::ChainConfig;
30use crate::rpc::state::{ApiInvocResult, InvocResult, MessageGasCost};
31use crate::rpc::types::{MiningBaseInfo, SectorOnChainInfo};
32use crate::shim::actors::init::{self, State};
33use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition};
34use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim};
35use crate::shim::actors::*;
36use crate::shim::crypto::{Signature, SignatureType};
37use crate::shim::{
38    actors::{
39        LoadActorStateFromBlockstore, miner::ext::MinerStateExt as _,
40        verifreg::ext::VerifiedRegistryStateExt as _,
41    },
42    executor::{ApplyRet, Receipt, StampedEvent},
43};
44use crate::shim::{
45    address::{Address, Payload, Protocol},
46    clock::ChainEpoch,
47    econ::TokenAmount,
48    machine::{GLOBAL_MULTI_ENGINE, MultiEngine},
49    message::Message,
50    randomness::Randomness,
51    runtime::Policy,
52    state_tree::{ActorState, StateTree},
53    version::NetworkVersion,
54};
55use crate::state_manager::cache::{
56    DisabledTipsetDataCache, EnabledTipsetDataCache, TipsetReceiptEventCacheHandler,
57    TipsetStateCache,
58};
59use crate::state_manager::chain_rand::draw_randomness;
60use crate::state_migration::run_state_migrations;
61use crate::utils::get_size::{
62    GetSize, vec_heap_size_helper, vec_with_stack_only_item_heap_size_helper,
63};
64use ahash::{HashMap, HashMapExt};
65use anyhow::{Context as _, bail, ensure};
66use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _};
67use chain_rand::ChainRand;
68use cid::Cid;
69pub use circulating_supply::GenesisInfo;
70use fil_actor_verifreg_state::v12::DataCap;
71use fil_actor_verifreg_state::v13::ClaimID;
72use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0};
73use fil_actors_shared::fvm_ipld_bitfield::BitField;
74use fil_actors_shared::v12::runtime::DomainSeparationTag;
75use futures::{FutureExt, channel::oneshot, select};
76use fvm_ipld_blockstore::Blockstore;
77use fvm_ipld_encoding::to_vec;
78use fvm_shared4::crypto::signature::SECP_SIG_LEN;
79use itertools::Itertools as _;
80use nonzero_ext::nonzero;
81use num::BigInt;
82use num_traits::identities::Zero;
83use rayon::prelude::ParallelBridge;
84use schemars::JsonSchema;
85use serde::{Deserialize, Serialize};
86use std::ops::RangeInclusive;
87use std::time::Duration;
88use std::{num::NonZeroUsize, sync::Arc};
89use tokio::sync::{RwLock, broadcast::error::RecvError};
90use tracing::{error, info, instrument, trace, warn};
91
92const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
93pub const EVENTS_AMT_BITWIDTH: u32 = 5;
94
95/// Intermediary for retrieving state objects and updating actor states.
96type CidPair = (Cid, Cid);
97
98#[derive(Debug, Clone, GetSize)] // Added Debug
99pub struct StateEvents {
100    #[get_size(size_fn = vec_heap_size_helper)]
101    pub events: Vec<Vec<StampedEvent>>,
102    #[get_size(size_fn = vec_with_stack_only_item_heap_size_helper)]
103    pub roots: Vec<Option<Cid>>,
104}
105
106#[derive(Clone)]
107pub struct StateOutput {
108    pub state_root: Cid,
109    pub receipt_root: Cid,
110    pub events: Vec<Vec<StampedEvent>>,
111    pub events_roots: Vec<Option<Cid>>,
112}
113
114#[derive(Debug, Default, Clone, GetSize)]
115pub struct StateOutputValue {
116    #[get_size(ignore)]
117    pub state_root: Cid,
118    #[get_size(ignore)]
119    pub receipt_root: Cid,
120}
121
122impl From<StateOutputValue> for StateOutput {
123    fn from(value: StateOutputValue) -> Self {
124        Self {
125            state_root: value.state_root,
126            receipt_root: value.receipt_root,
127            events: vec![],
128            events_roots: vec![],
129        }
130    }
131}
132
133impl From<StateOutput> for StateOutputValue {
134    fn from(value: StateOutput) -> Self {
135        StateOutputValue {
136            state_root: value.state_root,
137            receipt_root: value.receipt_root,
138        }
139    }
140}
141
142/// External format for returning market balance from state.
143#[derive(
144    Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema,
145)]
146#[serde(rename_all = "PascalCase")]
147pub struct MarketBalance {
148    #[schemars(with = "LotusJson<TokenAmount>")]
149    #[serde(with = "crate::lotus_json")]
150    pub escrow: TokenAmount,
151    #[schemars(with = "LotusJson<TokenAmount>")]
152    #[serde(with = "crate::lotus_json")]
153    pub locked: TokenAmount,
154}
155lotus_json_with_self!(MarketBalance);
156
157/// State manager handles all interactions with the internal Filecoin actors
158/// state. This encapsulates the [`ChainStore`] functionality, which only
159/// handles chain data, to allow for interactions with the underlying state of
160/// the chain. The state manager not only allows interfacing with state, but
161/// also is used when performing state transitions.
162pub struct StateManager<DB> {
163    /// Chain store
164    cs: Arc<ChainStore<DB>>,
165    /// This is a cache which indexes tipsets to their calculated state output (state root, receipt root).
166    cache: TipsetStateCache<StateOutputValue>,
167    beacon: Arc<crate::beacon::BeaconSchedule>,
168    engine: Arc<MultiEngine>,
169    /// Handler for caching/retrieving tipset events and receipts.
170    receipt_event_cache_handler: Box<dyn TipsetReceiptEventCacheHandler>,
171}
172
173#[allow(clippy::type_complexity)]
174pub const NO_CALLBACK: Option<fn(MessageCallbackCtx<'_>) -> anyhow::Result<()>> = None;
175
176impl<DB> StateManager<DB>
177where
178    DB: Blockstore,
179{
180    pub fn new(cs: Arc<ChainStore<DB>>) -> Result<Self, anyhow::Error> {
181        Self::new_with_engine(cs, GLOBAL_MULTI_ENGINE.clone())
182    }
183
184    pub fn new_with_engine(
185        cs: Arc<ChainStore<DB>>,
186        engine: Arc<MultiEngine>,
187    ) -> Result<Self, anyhow::Error> {
188        let genesis = cs.genesis_block_header();
189        let beacon = Arc::new(cs.chain_config().get_beacon_schedule(genesis.timestamp));
190
191        let cache_handler: Box<dyn TipsetReceiptEventCacheHandler> =
192            if cs.chain_config().enable_receipt_event_caching {
193                Box::new(EnabledTipsetDataCache::new())
194            } else {
195                Box::new(DisabledTipsetDataCache::new())
196            };
197
198        Ok(Self {
199            cs,
200            cache: TipsetStateCache::new("state_output"), // For StateOutputValue
201            beacon,
202            engine,
203            receipt_event_cache_handler: cache_handler,
204        })
205    }
206
207    /// Returns the currently tracked heaviest tipset.
208    pub fn heaviest_tipset(&self) -> Tipset {
209        self.chain_store().heaviest_tipset()
210    }
211
212    /// Returns the currently tracked heaviest tipset and rewind to a most recent valid one if necessary.
213    /// A valid head has
214    ///     - state tree in the blockstore
215    ///     - actor bundle version in the state tree that matches chain configuration
216    pub fn maybe_rewind_heaviest_tipset(&self) -> anyhow::Result<()> {
217        while self.maybe_rewind_heaviest_tipset_once()? {}
218        Ok(())
219    }
220
221    fn maybe_rewind_heaviest_tipset_once(&self) -> anyhow::Result<bool> {
222        let head = self.heaviest_tipset();
223        if let Some(info) = self
224            .chain_config()
225            .network_height_with_actor_bundle(head.epoch())
226        {
227            let expected_height_info = info.info;
228            let expected_bundle = info.manifest(self.blockstore())?;
229            let expected_bundle_metadata = expected_bundle.metadata()?;
230            let state = self.get_state_tree(head.parent_state())?;
231            let bundle_metadata = state.get_actor_bundle_metadata()?;
232            if expected_bundle_metadata != bundle_metadata {
233                let current_epoch = head.epoch();
234                let target_head = self.chain_index().tipset_by_height(
235                    (expected_height_info.epoch - 1).max(0),
236                    head,
237                    ResolveNullTipset::TakeOlder,
238                )?;
239                let target_epoch = target_head.epoch();
240                let bundle_version = &bundle_metadata.version;
241                let expected_bundle_version = &expected_bundle_metadata.version;
242                if target_epoch < current_epoch {
243                    tracing::warn!(
244                        "rewinding chain head from {current_epoch} to {target_epoch}, actor bundle: {bundle_version}, expected: {expected_bundle_version}"
245                    );
246                    if self.blockstore().has(target_head.parent_state())? {
247                        self.chain_store().set_heaviest_tipset(target_head)?;
248                        return Ok(true);
249                    } else {
250                        anyhow::bail!(
251                            "failed to rewind, state tree @ {target_epoch} is missing from blockstore: {}",
252                            target_head.parent_state()
253                        );
254                    }
255                }
256            }
257        }
258        Ok(false)
259    }
260
261    // Given the assumption that the heaviest tipset must always be validated,
262    // we can populate our state cache by walking backwards through the
263    // block-chain. A warm cache cuts 10-20 seconds from the first state
264    // validation, and it prevents duplicate migrations.
265    pub fn populate_cache(&self) {
266        for (child, parent) in self
267            .chain_index()
268            .chain(self.heaviest_tipset())
269            .tuple_windows()
270            .take(DEFAULT_TIPSET_CACHE_SIZE.into())
271        {
272            let key = parent.key();
273            let state_root = child.min_ticket_block().state_root;
274            let receipt_root = child.min_ticket_block().message_receipts;
275            self.cache.insert(
276                key.clone(),
277                StateOutputValue {
278                    state_root,
279                    receipt_root,
280                },
281            );
282            if let Ok(receipts) = Receipt::get_receipts(self.blockstore(), receipt_root)
283                && !receipts.is_empty()
284            {
285                self.receipt_event_cache_handler
286                    .insert_receipt(key, receipts);
287            }
288        }
289    }
290
291    pub fn beacon_schedule(&self) -> &Arc<BeaconSchedule> {
292        &self.beacon
293    }
294
295    /// Returns network version for the given epoch.
296    pub fn get_network_version(&self, epoch: ChainEpoch) -> NetworkVersion {
297        self.chain_config().network_version(epoch)
298    }
299
300    /// Gets the state tree
301    pub fn get_state_tree(&self, state_cid: &Cid) -> anyhow::Result<StateTree<DB>> {
302        StateTree::new_from_root(self.blockstore_owned(), state_cid)
303    }
304
305    /// Gets actor from given [`Cid`], if it exists.
306    pub fn get_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<Option<ActorState>> {
307        let state = self.get_state_tree(&state_cid)?;
308        state.get_actor(addr)
309    }
310
311    /// Gets actor state from implicit actor address
312    pub fn get_actor_state<S: LoadActorStateFromBlockstore>(
313        &self,
314        ts: &Tipset,
315    ) -> anyhow::Result<S> {
316        let state_tree = self.get_state_tree(ts.parent_state())?;
317        state_tree.get_actor_state()
318    }
319
320    /// Gets actor state from explicit actor address
321    pub fn get_actor_state_from_address<S: LoadActorStateFromBlockstore>(
322        &self,
323        ts: &Tipset,
324        actor_address: &Address,
325    ) -> anyhow::Result<S> {
326        let state_tree = self.get_state_tree(ts.parent_state())?;
327        state_tree.get_actor_state_from_address(actor_address)
328    }
329
330    /// Gets required actor from given [`Cid`].
331    pub fn get_required_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<ActorState> {
332        let state = self.get_state_tree(&state_cid)?;
333        state.get_actor(addr)?.with_context(|| {
334            format!("Failed to load actor with addr={addr}, state_cid={state_cid}")
335        })
336    }
337
338    /// Returns a reference to the state manager's [`Blockstore`].
339    pub fn blockstore(&self) -> &Arc<DB> {
340        self.cs.blockstore()
341    }
342
343    pub fn blockstore_owned(&self) -> Arc<DB> {
344        self.blockstore().clone()
345    }
346
347    /// Returns reference to the state manager's [`ChainStore`].
348    pub fn chain_store(&self) -> &Arc<ChainStore<DB>> {
349        &self.cs
350    }
351
352    /// Returns reference to the state manager's [`ChainIndex`].
353    pub fn chain_index(&self) -> &Arc<ChainIndex<Arc<DB>>> {
354        self.cs.chain_index()
355    }
356
357    /// Returns reference to the state manager's [`ChainConfig`].
358    pub fn chain_config(&self) -> &Arc<ChainConfig> {
359        self.cs.chain_config()
360    }
361
362    pub fn chain_rand(&self, tipset: Tipset) -> ChainRand<DB> {
363        ChainRand::new(
364            self.chain_config().clone(),
365            tipset,
366            self.chain_index().clone(),
367            self.beacon.clone(),
368        )
369    }
370
371    /// Returns the internal, protocol-level network chain from the state.
372    pub fn get_network_state_name(
373        &self,
374        state_cid: Cid,
375    ) -> anyhow::Result<crate::networks::StateNetworkName> {
376        let init_act = self
377            .get_actor(&init::ADDRESS.into(), state_cid)?
378            .ok_or_else(|| Error::state("Init actor address could not be resolved"))?;
379        Ok(
380            State::load(self.blockstore(), init_act.code, init_act.state)?
381                .into_network_name()
382                .into(),
383        )
384    }
385
386    /// Returns true if miner has been slashed or is considered invalid.
387    pub fn is_miner_slashed(&self, addr: &Address, state_cid: &Cid) -> anyhow::Result<bool, Error> {
388        let actor = self
389            .get_actor(&Address::POWER_ACTOR, *state_cid)?
390            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
391
392        let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
393
394        Ok(spas.miner_power(self.blockstore(), addr)?.is_none())
395    }
396
397    /// Returns raw work address of a miner given the state root.
398    pub fn get_miner_work_addr(&self, state_cid: Cid, addr: &Address) -> Result<Address, Error> {
399        let state =
400            StateTree::new_from_root(self.blockstore_owned(), &state_cid).map_err(Error::other)?;
401        let ms: miner::State = state.get_actor_state_from_address(addr)?;
402        let info = ms.info(self.blockstore()).map_err(|e| e.to_string())?;
403        let addr = resolve_to_key_addr(&state, self.blockstore(), &info.worker())?;
404        Ok(addr)
405    }
406
407    /// Returns specified actor's claimed power and total network power as a
408    /// tuple.
409    pub fn get_power(
410        &self,
411        state_cid: &Cid,
412        addr: Option<&Address>,
413    ) -> anyhow::Result<Option<(power::Claim, power::Claim)>, Error> {
414        let actor = self
415            .get_actor(&Address::POWER_ACTOR, *state_cid)?
416            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
417
418        let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
419
420        let t_pow = spas.total_power();
421
422        if let Some(maddr) = addr {
423            let m_pow = spas
424                .miner_power(self.blockstore(), maddr)?
425                .ok_or_else(|| Error::state(format!("Miner for address {maddr} not found")))?;
426
427            let min_pow = spas.miner_nominal_power_meets_consensus_minimum(
428                &self.chain_config().policy,
429                self.blockstore(),
430                maddr,
431            )?;
432            if min_pow {
433                return Ok(Some((m_pow, t_pow)));
434            }
435        }
436
437        Ok(None)
438    }
439
440    // Returns all sectors
441    pub fn get_all_sectors(
442        &self,
443        addr: &Address,
444        ts: &Tipset,
445    ) -> anyhow::Result<Vec<SectorOnChainInfo>> {
446        let actor = self
447            .get_actor(addr, *ts.parent_state())?
448            .ok_or_else(|| Error::state("Miner actor not found"))?;
449        let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
450        state.load_sectors_ext(self.blockstore(), None)
451    }
452}
453
454impl<DB> StateManager<DB>
455where
456    DB: Blockstore + Send + Sync + 'static,
457{
458    /// Returns the pair of (state root, message receipt root). This will
459    /// either be cached or will be calculated and fill the cache. Tipset
460    /// state for a given tipset is guaranteed not to be computed twice.
461    pub async fn tipset_state(
462        self: &Arc<Self>,
463        tipset: &Tipset,
464        state_lookup: StateLookupPolicy,
465    ) -> anyhow::Result<CidPair> {
466        let StateOutput {
467            state_root,
468            receipt_root,
469            ..
470        } = self.tipset_state_output(tipset, state_lookup).await?;
471        Ok((state_root, receipt_root))
472    }
473
474    pub async fn tipset_state_output(
475        self: &Arc<Self>,
476        tipset: &Tipset,
477        state_lookup: StateLookupPolicy,
478    ) -> anyhow::Result<StateOutput> {
479        let key = tipset.key();
480        self.cache
481            .get_or_else(key, || async move {
482                info!(
483                    "Evaluating tipset: EPOCH={}, blocks={}, tsk={}",
484                    tipset.epoch(),
485                    tipset.len(),
486                    tipset.key(),
487                );
488
489                // First, try to look up the state and receipt if not found in the blockstore
490                // compute it
491                if matches!(state_lookup, StateLookupPolicy::Enabled)
492                    && let Some(state_from_child) = self.try_lookup_state_from_next_tipset(tipset)
493                {
494                    return Ok(state_from_child);
495                }
496
497                trace!("Computing state for tipset at epoch {}", tipset.epoch());
498                let state_output = self
499                    .compute_tipset_state(tipset.clone(), NO_CALLBACK, VMTrace::NotTraced)
500                    .await?;
501
502                self.update_cache_with_state_output(key, &state_output);
503
504                let ts_state = state_output.into();
505
506                Ok(ts_state)
507            })
508            .await
509            .map(StateOutput::from)
510    }
511
512    /// update the receipt and events caches
513    fn update_cache_with_state_output(&self, key: &TipsetKey, state_output: &StateOutput) {
514        if !state_output.events.is_empty() || !state_output.events_roots.is_empty() {
515            let events_data = StateEvents {
516                events: state_output.events.clone(),
517                roots: state_output.events_roots.clone(),
518            };
519            self.receipt_event_cache_handler
520                .insert_events(key, events_data);
521        }
522
523        if let Ok(receipts) = Receipt::get_receipts(self.blockstore(), state_output.receipt_root)
524            && !receipts.is_empty()
525        {
526            self.receipt_event_cache_handler
527                .insert_receipt(key, receipts);
528        }
529    }
530
531    #[instrument(skip(self))]
532    pub async fn tipset_message_receipts(
533        self: &Arc<Self>,
534        tipset: &Tipset,
535    ) -> anyhow::Result<Vec<Receipt>> {
536        let key = tipset.key();
537        let ts = tipset.clone();
538        let this = Arc::clone(self);
539        self.receipt_event_cache_handler
540            .get_receipt_or_else(
541                key,
542                Box::new(move || {
543                    Box::pin(async move {
544                        let StateOutput { receipt_root, .. } = this
545                            .compute_tipset_state(ts, NO_CALLBACK, VMTrace::NotTraced)
546                            .await?;
547                        trace!("Completed tipset state calculation");
548                        Receipt::get_receipts(this.blockstore(), receipt_root)
549                    })
550                }),
551            )
552            .await
553    }
554
555    #[instrument(skip(self))]
556    pub async fn tipset_state_events(
557        self: &Arc<Self>,
558        tipset: &Tipset,
559    ) -> anyhow::Result<StateEvents> {
560        let key = tipset.key();
561        let ts = tipset.clone();
562        let this = Arc::clone(self);
563        let cids = tipset.cids();
564        self.receipt_event_cache_handler
565            .get_events_or_else(
566                key,
567                Box::new(move || {
568                    Box::pin(async move {
569                        // Fallback: compute the tipset state if events not found in the blockstore
570                        let state_out = this
571                            .compute_tipset_state(ts, NO_CALLBACK, VMTrace::NotTraced)
572                            .await?;
573                        trace!("Completed tipset state calculation {:?}", cids);
574                        Ok(StateEvents {
575                            events: state_out.events,
576                            roots: state_out.events_roots,
577                        })
578                    })
579                }),
580            )
581            .await
582    }
583
584    #[instrument(skip(self, rand))]
585    fn call_raw(
586        &self,
587        state_cid: Option<Cid>,
588        msg: &Message,
589        rand: ChainRand<DB>,
590        tipset: &Tipset,
591    ) -> Result<ApiInvocResult, Error> {
592        let mut msg = msg.clone();
593
594        let state_cid = state_cid.unwrap_or(*tipset.parent_state());
595
596        let tipset_messages = self
597            .chain_store()
598            .messages_for_tipset(tipset)
599            .map_err(|err| Error::Other(err.to_string()))?;
600
601        let prior_messsages = tipset_messages
602            .iter()
603            .filter(|ts_msg| ts_msg.message().from() == msg.from());
604
605        // Handle state forks
606
607        let height = tipset.epoch();
608        let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
609        let mut vm = VM::new(
610            ExecutionContext {
611                heaviest_tipset: tipset.clone(),
612                state_tree_root: state_cid,
613                epoch: height,
614                rand: Box::new(rand),
615                base_fee: tipset.block_headers().first().parent_base_fee.clone(),
616                circ_supply: genesis_info.get_vm_circulating_supply(
617                    height,
618                    self.blockstore(),
619                    &state_cid,
620                )?,
621                chain_config: self.chain_config().clone(),
622                chain_index: self.chain_index().clone(),
623                timestamp: tipset.min_timestamp(),
624            },
625            &self.engine,
626            VMTrace::Traced,
627        )?;
628
629        for m in prior_messsages {
630            vm.apply_message(m)?;
631        }
632
633        // We flush to get the VM's view of the state tree after applying the above messages
634        // This is needed to get the correct nonce from the actor state to match the VM
635        let state_cid = vm.flush()?;
636
637        let state = StateTree::new_from_root(self.blockstore_owned(), &state_cid)?;
638
639        let from_actor = state
640            .get_actor(&msg.from())?
641            .ok_or_else(|| anyhow::anyhow!("actor not found"))?;
642        msg.set_sequence(from_actor.sequence);
643
644        // Implicit messages need to set a special gas limit
645        let mut msg = msg.clone();
646        msg.gas_limit = IMPLICIT_MESSAGE_GAS_LIMIT as u64;
647
648        let (apply_ret, duration) = vm.apply_implicit_message(&msg)?;
649
650        Ok(ApiInvocResult {
651            msg: msg.clone(),
652            msg_rct: Some(apply_ret.msg_receipt()),
653            msg_cid: msg.cid(),
654            error: apply_ret.failure_info().unwrap_or_default(),
655            duration: duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
656            gas_cost: MessageGasCost::default(),
657            execution_trace: structured::parse_events(apply_ret.exec_trace()).unwrap_or_default(),
658        })
659    }
660
661    /// runs the given message and returns its result without any persisted
662    /// changes.
663    pub fn call(&self, message: &Message, tipset: Option<Tipset>) -> Result<ApiInvocResult, Error> {
664        let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
665        let chain_rand = self.chain_rand(ts.clone());
666        self.call_raw(None, message, chain_rand, &ts)
667    }
668
669    /// Same as [`StateManager::call`] but runs the message on the given state and not
670    /// on the parent state of the tipset.
671    pub fn call_on_state(
672        &self,
673        state_cid: Cid,
674        message: &Message,
675        tipset: Option<Tipset>,
676    ) -> Result<ApiInvocResult, Error> {
677        let ts = tipset.unwrap_or_else(|| self.cs.heaviest_tipset());
678        let chain_rand = self.chain_rand(ts.clone());
679        self.call_raw(Some(state_cid), message, chain_rand, &ts)
680    }
681
682    pub async fn apply_on_state_with_gas(
683        self: &Arc<Self>,
684        tipset: Option<Tipset>,
685        msg: Message,
686        state_lookup: StateLookupPolicy,
687    ) -> anyhow::Result<ApiInvocResult> {
688        let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
689
690        let from_a = self.resolve_to_key_addr(&msg.from, &ts).await?;
691
692        // Pretend that the message is signed. This has an influence on the gas
693        // cost. We obviously can't generate a valid signature. Instead, we just
694        // fill the signature with zeros. The validity is not checked.
695        let mut chain_msg = match from_a.protocol() {
696            Protocol::Secp256k1 => ChainMessage::Signed(SignedMessage::new_unchecked(
697                msg.clone(),
698                Signature::new_secp256k1(vec![0; SECP_SIG_LEN]),
699            )),
700            Protocol::Delegated => ChainMessage::Signed(SignedMessage::new_unchecked(
701                msg.clone(),
702                // In Lotus, delegated signatures have the same length as SECP256k1.
703                // This may or may not change in the future.
704                Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]),
705            )),
706            _ => ChainMessage::Unsigned(msg.clone()),
707        };
708
709        let (_invoc_res, apply_ret, duration) = self
710            .call_with_gas(&mut chain_msg, &[], Some(ts), VMTrace::Traced, state_lookup)
711            .await?;
712        Ok(ApiInvocResult {
713            msg_cid: msg.cid(),
714            msg,
715            msg_rct: Some(apply_ret.msg_receipt()),
716            error: apply_ret.failure_info().unwrap_or_default(),
717            duration: duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
718            gas_cost: MessageGasCost::default(),
719            execution_trace: structured::parse_events(apply_ret.exec_trace()).unwrap_or_default(),
720        })
721    }
722
723    /// Computes message on the given [Tipset] state, after applying other
724    /// messages and returns the values computed in the VM.
725    pub async fn call_with_gas(
726        self: &Arc<Self>,
727        message: &mut ChainMessage,
728        prior_messages: &[ChainMessage],
729        tipset: Option<Tipset>,
730        trace_config: VMTrace,
731        state_lookup: StateLookupPolicy,
732    ) -> Result<(InvocResult, ApplyRet, Duration), Error> {
733        let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
734        let (st, _) = self
735            .tipset_state(&ts, state_lookup)
736            .await
737            .map_err(|e| Error::Other(format!("Could not load tipset state: {e}")))?;
738        let chain_rand = self.chain_rand(ts.clone());
739
740        // Since we're simulating a future message, pretend we're applying it in the
741        // "next" tipset
742        let epoch = ts.epoch() + 1;
743        let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
744        // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from
745        // FVM, but that introduces some constraints, and possible deadlocks.
746        let (ret, duration) = stacker::grow(64 << 20, || -> ApplyResult {
747            let mut vm = VM::new(
748                ExecutionContext {
749                    heaviest_tipset: ts.clone(),
750                    state_tree_root: st,
751                    epoch,
752                    rand: Box::new(chain_rand),
753                    base_fee: ts.block_headers().first().parent_base_fee.clone(),
754                    circ_supply: genesis_info.get_vm_circulating_supply(
755                        epoch,
756                        self.blockstore(),
757                        &st,
758                    )?,
759                    chain_config: self.chain_config().clone(),
760                    chain_index: self.chain_index().clone(),
761                    timestamp: ts.min_timestamp(),
762                },
763                &self.engine,
764                trace_config,
765            )?;
766
767            for msg in prior_messages {
768                vm.apply_message(msg)?;
769            }
770            let from_actor = vm
771                .get_actor(&message.from())
772                .map_err(|e| Error::Other(format!("Could not get actor from state: {e}")))?
773                .ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?;
774            message.set_sequence(from_actor.sequence);
775            vm.apply_message(message)
776        })?;
777
778        Ok((
779            InvocResult::new(message.message().clone(), &ret),
780            ret,
781            duration,
782        ))
783    }
784
785    /// Replays the given message and returns the result of executing the
786    /// indicated message, assuming it was executed in the indicated tipset.
787    pub async fn replay(self: &Arc<Self>, ts: Tipset, mcid: Cid) -> Result<ApiInvocResult, Error> {
788        let this = Arc::clone(self);
789        tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid))
790            .await
791            .map_err(|e| Error::Other(format!("{e}")))?
792    }
793
794    /// Blocking version of `replay`
795    pub fn replay_blocking(
796        self: &Arc<Self>,
797        ts: Tipset,
798        mcid: Cid,
799    ) -> Result<ApiInvocResult, Error> {
800        const REPLAY_HALT: &str = "replay_halt";
801
802        let mut api_invoc_result = None;
803        let callback = |ctx: MessageCallbackCtx<'_>| {
804            match ctx.at {
805                CalledAt::Applied | CalledAt::Reward
806                    if api_invoc_result.is_none() && ctx.cid == mcid =>
807                {
808                    api_invoc_result = Some(ApiInvocResult {
809                        msg_cid: ctx.message.cid(),
810                        msg: ctx.message.message().clone(),
811                        msg_rct: Some(ctx.apply_ret.msg_receipt()),
812                        error: ctx.apply_ret.failure_info().unwrap_or_default(),
813                        duration: ctx.duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
814                        gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
815                        execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
816                            .unwrap_or_default(),
817                    });
818                    anyhow::bail!(REPLAY_HALT);
819                }
820                _ => Ok(()), // ignored
821            }
822        };
823        let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced);
824        if let Err(error_message) = result
825            && error_message.to_string() != REPLAY_HALT
826        {
827            return Err(Error::Other(format!(
828                "unexpected error during execution : {error_message:}"
829            )));
830        }
831        api_invoc_result.ok_or_else(|| Error::Other("failed to replay".into()))
832    }
833
834    /// Checks the eligibility of the miner. This is used in the validation that
835    /// a block's miner has the requirements to mine a block.
836    pub fn eligible_to_mine(
837        &self,
838        address: &Address,
839        base_tipset: &Tipset,
840        lookback_tipset: &Tipset,
841    ) -> anyhow::Result<bool, Error> {
842        let hmp =
843            self.miner_has_min_power(&self.chain_config().policy, address, lookback_tipset)?;
844        let version = self.get_network_version(base_tipset.epoch());
845
846        if version <= NetworkVersion::V3 {
847            return Ok(hmp);
848        }
849
850        if !hmp {
851            return Ok(false);
852        }
853
854        let actor = self
855            .get_actor(&Address::POWER_ACTOR, *base_tipset.parent_state())?
856            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
857
858        let power_state = power::State::load(self.blockstore(), actor.code, actor.state)?;
859
860        let actor = self
861            .get_actor(address, *base_tipset.parent_state())?
862            .ok_or_else(|| Error::state("Miner actor address could not be resolved"))?;
863
864        let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
865
866        // Non-empty power claim.
867        let claim = power_state
868            .miner_power(self.blockstore(), address)?
869            .ok_or_else(|| Error::Other("Could not get claim".to_string()))?;
870        if claim.quality_adj_power <= BigInt::zero() {
871            return Ok(false);
872        }
873
874        // No fee debt.
875        if !miner_state.fee_debt().is_zero() {
876            return Ok(false);
877        }
878
879        // No active consensus faults.
880        let info = miner_state.info(self.blockstore())?;
881        if base_tipset.epoch() <= info.consensus_fault_elapsed {
882            return Ok(false);
883        }
884
885        Ok(true)
886    }
887
888    /// Conceptually, a [`Tipset`] consists of _blocks_ which share an _epoch_.
889    /// Each _block_ contains _messages_, which are executed by the _Filecoin Virtual Machine_.
890    ///
891    /// VM message execution essentially looks like this:
892    /// ```text
893    /// state[N-900..N] * message = state[N+1]
894    /// ```
895    ///
896    /// The `state`s above are stored in the `IPLD Blockstore`, and can be referred to by
897    /// a [`Cid`] - the _state root_.
898    /// The previous 900 states (configurable, see
899    /// <https://docs.filecoin.io/reference/general/glossary/#finality>) can be
900    /// queried when executing a message, so a store needs at least that many.
901    /// (a snapshot typically contains 2000, for example).
902    ///
903    /// Each message costs FIL to execute - this is _gas_.
904    /// After execution, the message has a _receipt_, showing how much gas was spent.
905    /// This is similarly a [`Cid`] into the block store.
906    ///
907    /// For details, see the documentation for [`apply_block_messages`].
908    ///
909    #[instrument(skip_all)]
910    pub async fn compute_tipset_state(
911        self: &Arc<Self>,
912        tipset: Tipset,
913        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
914        enable_tracing: VMTrace,
915    ) -> Result<StateOutput, Error> {
916        let this = Arc::clone(self);
917        tokio::task::spawn_blocking(move || {
918            this.compute_tipset_state_blocking(tipset, callback, enable_tracing)
919        })
920        .await?
921    }
922
923    /// Blocking version of `compute_tipset_state`
924    #[tracing::instrument(skip_all)]
925    pub fn compute_tipset_state_blocking(
926        &self,
927        tipset: Tipset,
928        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
929        enable_tracing: VMTrace,
930    ) -> Result<StateOutput, Error> {
931        let epoch = tipset.epoch();
932        let has_callback = callback.is_some();
933        Ok(apply_block_messages(
934            self.chain_store().genesis_block_header().timestamp,
935            Arc::clone(self.chain_index()),
936            Arc::clone(self.chain_config()),
937            self.beacon_schedule().clone(),
938            &self.engine,
939            tipset,
940            callback,
941            enable_tracing,
942        )
943        .map_err(|e| {
944            if has_callback {
945                e
946            } else {
947                anyhow::anyhow!("Failed to compute tipset state@{epoch}: {e}")
948            }
949        })?)
950    }
951
952    #[instrument(skip_all)]
953    pub async fn compute_state(
954        self: &Arc<Self>,
955        height: ChainEpoch,
956        messages: Vec<Message>,
957        tipset: Tipset,
958        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
959        enable_tracing: VMTrace,
960    ) -> Result<StateOutput, Error> {
961        let this = Arc::clone(self);
962        tokio::task::spawn_blocking(move || {
963            this.compute_state_blocking(height, messages, tipset, callback, enable_tracing)
964        })
965        .await?
966    }
967
968    /// Blocking version of `compute_state`
969    #[tracing::instrument(skip_all)]
970    pub fn compute_state_blocking(
971        &self,
972        height: ChainEpoch,
973        messages: Vec<Message>,
974        tipset: Tipset,
975        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
976        enable_tracing: VMTrace,
977    ) -> Result<StateOutput, Error> {
978        Ok(compute_state(
979            height,
980            messages,
981            tipset,
982            self.chain_store().genesis_block_header().timestamp,
983            Arc::clone(self.chain_index()),
984            Arc::clone(self.chain_config()),
985            self.beacon_schedule().clone(),
986            &self.engine,
987            callback,
988            enable_tracing,
989        )?)
990    }
991
992    /// Check if tipset had executed the message, by loading the receipt based
993    /// on the index of the message in the block.
994    fn tipset_executed_message(
995        &self,
996        tipset: &Tipset,
997        message: &ChainMessage,
998        allow_replaced: bool,
999    ) -> Result<Option<Receipt>, Error> {
1000        if tipset.epoch() == 0 {
1001            return Ok(None);
1002        }
1003        let message_from_address = message.from();
1004        let message_sequence = message.sequence();
1005        // Load parent state.
1006        let pts = self
1007            .chain_index()
1008            .load_required_tipset(tipset.parents())
1009            .map_err(|err| Error::Other(format!("Failed to load tipset: {err}")))?;
1010        let messages = self
1011            .cs
1012            .messages_for_tipset(&pts)
1013            .map_err(|err| Error::Other(format!("Failed to load messages for tipset: {err}")))?;
1014        messages
1015            .iter()
1016            .enumerate()
1017            // iterate in reverse because we going backwards through the chain
1018            .rev()
1019            .filter(|(_, s)| {
1020                s.sequence() == message_sequence
1021                    && s.from() == message_from_address
1022                    && s.equal_call(message)
1023            })
1024            .map(|(index, m)| {
1025                // A replacing message is a message with a different CID,
1026                // any of Gas values, and different signature, but with all
1027                // other parameters matching (source/destination, nonce, params, etc.)
1028                if !allow_replaced && message.cid() != m.cid(){
1029                    Err(Error::Other(format!(
1030                        "found message with equal nonce and call params but different CID. wanted {}, found: {}, nonce: {}, from: {}",
1031                        message.cid(),
1032                        m.cid(),
1033                        message.sequence(),
1034                        message.from(),
1035                    )))
1036                } else {
1037                    let block_header = tipset.block_headers().first();
1038                    crate::chain::get_parent_receipt(
1039                        self.blockstore(),
1040                        block_header,
1041                        index,
1042                    )
1043                    .map_err(|err| Error::Other(format!("Failed to get parent receipt (message_receipts={}, index={index}, error={err})", block_header.message_receipts)))
1044                }
1045            })
1046            .next()
1047            .unwrap_or(Ok(None))
1048    }
1049
1050    fn check_search(
1051        &self,
1052        mut current: Tipset,
1053        message: &ChainMessage,
1054        lookback_max_epoch: ChainEpoch,
1055        allow_replaced: bool,
1056    ) -> Result<Option<(Tipset, Receipt)>, Error> {
1057        let message_from_address = message.from();
1058        let message_sequence = message.sequence();
1059        let mut current_actor_state = self
1060            .get_required_actor(&message_from_address, *current.parent_state())
1061            .map_err(Error::state)?;
1062        let message_from_id = self.lookup_required_id(&message_from_address, &current)?;
1063
1064        while current.epoch() >= lookback_max_epoch {
1065            let parent_tipset = self
1066                .chain_index()
1067                .load_required_tipset(current.parents())
1068                .map_err(|err| {
1069                    Error::Other(format!(
1070                        "failed to load tipset during msg wait searchback: {err:}"
1071                    ))
1072                })?;
1073
1074            let parent_actor_state = self
1075                .get_actor(&message_from_id, *parent_tipset.parent_state())
1076                .map_err(|e| Error::State(e.to_string()))?;
1077
1078            if parent_actor_state.is_none()
1079                || (current_actor_state.sequence > message_sequence
1080                    && parent_actor_state.as_ref().unwrap().sequence <= message_sequence)
1081            {
1082                let receipt = self
1083                    .tipset_executed_message(&current, message, allow_replaced)?
1084                    .context("Failed to get receipt with tipset_executed_message")?;
1085                return Ok(Some((current, receipt)));
1086            }
1087
1088            if let Some(parent_actor_state) = parent_actor_state {
1089                current = parent_tipset;
1090                current_actor_state = parent_actor_state;
1091            } else {
1092                break;
1093            }
1094        }
1095
1096        Ok(None)
1097    }
1098
1099    /// Searches backwards through the chain for a message receipt.
1100    fn search_back_for_message(
1101        &self,
1102        current: Tipset,
1103        message: &ChainMessage,
1104        look_back_limit: Option<i64>,
1105        allow_replaced: Option<bool>,
1106    ) -> Result<Option<(Tipset, Receipt)>, Error> {
1107        let current_epoch = current.epoch();
1108        let allow_replaced = allow_replaced.unwrap_or(true);
1109
1110        // Calculate the max lookback epoch (inclusive lower bound) for the search.
1111        let lookback_max_epoch = match look_back_limit {
1112            // No search: limit = 0 means search 0 epochs
1113            Some(0) => return Ok(None),
1114            // Limited search: calculate the inclusive lower bound, clamped to genesis
1115            // Example: limit=5 at epoch=1000 → min_epoch=996, searches [996,1000] = 5 epochs
1116            // Example: limit=2000 at epoch=1000 → min_epoch=0, searches [0,1000] = 1001 epochs (all available)
1117            Some(limit) if limit > 0 => (current_epoch - limit + 1).max(0),
1118            // Search all the way to genesis (epoch 0)
1119            _ => 0,
1120        };
1121
1122        self.check_search(current, message, lookback_max_epoch, allow_replaced)
1123    }
1124
1125    /// Returns a message receipt from a given tipset and message CID.
1126    pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result<Receipt, Error> {
1127        let m = crate::chain::get_chain_message(self.blockstore(), &msg)
1128            .map_err(|e| Error::Other(e.to_string()))?;
1129        let message_receipt = self.tipset_executed_message(&tipset, &m, true)?;
1130        if let Some(receipt) = message_receipt {
1131            return Ok(receipt);
1132        }
1133
1134        let maybe_tuple = self.search_back_for_message(tipset, &m, None, None)?;
1135        let message_receipt = maybe_tuple
1136            .ok_or_else(|| {
1137                Error::Other("Could not get receipt from search back message".to_string())
1138            })?
1139            .1;
1140        Ok(message_receipt)
1141    }
1142
1143    /// `WaitForMessage` blocks until a message appears on chain. It looks
1144    /// backwards in the chain to see if this has already happened. It
1145    /// guarantees that the message has been on chain for at least
1146    /// confidence epochs without being reverted before returning.
1147    pub async fn wait_for_message(
1148        self: &Arc<Self>,
1149        msg_cid: Cid,
1150        confidence: i64,
1151        look_back_limit: Option<ChainEpoch>,
1152        allow_replaced: Option<bool>,
1153    ) -> Result<(Option<Tipset>, Option<Receipt>), Error> {
1154        let mut subscriber = self.cs.publisher().subscribe();
1155        let (sender, mut receiver) = oneshot::channel::<()>();
1156        let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1157            .map_err(|err| Error::Other(format!("failed to load message {err:}")))?;
1158        let current_tipset = self.heaviest_tipset();
1159        let maybe_message_receipt =
1160            self.tipset_executed_message(&current_tipset, &message, true)?;
1161        if let Some(r) = maybe_message_receipt {
1162            return Ok((Some(current_tipset.clone()), Some(r)));
1163        }
1164
1165        let mut candidate_tipset: Option<Tipset> = None;
1166        let mut candidate_receipt: Option<Receipt> = None;
1167
1168        let sm_cloned = Arc::clone(self);
1169
1170        let message_for_task = message.clone();
1171        let height_of_head = current_tipset.epoch();
1172        let task = tokio::task::spawn(async move {
1173            let back_tuple = sm_cloned.search_back_for_message(
1174                current_tipset,
1175                &message_for_task,
1176                look_back_limit,
1177                allow_replaced,
1178            )?;
1179            sender
1180                .send(())
1181                .map_err(|e| Error::Other(format!("Could not send to channel {e:?}")))?;
1182            Ok::<_, Error>(back_tuple)
1183        });
1184
1185        let reverts: Arc<RwLock<HashMap<TipsetKey, bool>>> = Arc::new(RwLock::new(HashMap::new()));
1186        let block_revert = reverts.clone();
1187        let sm_cloned = Arc::clone(self);
1188
1189        // Wait for message to be included in head change.
1190        let mut subscriber_poll = tokio::task::spawn(async move {
1191            loop {
1192                match subscriber.recv().await {
1193                    Ok(subscriber) => match subscriber {
1194                        HeadChange::Apply(tipset) => {
1195                            if candidate_tipset
1196                                .as_ref()
1197                                .map(|s| tipset.epoch() >= s.epoch() + confidence)
1198                                .unwrap_or_default()
1199                            {
1200                                return Ok((candidate_tipset, candidate_receipt));
1201                            }
1202                            let poll_receiver = receiver.try_recv();
1203                            if let Ok(Some(_)) = poll_receiver {
1204                                block_revert
1205                                    .write()
1206                                    .await
1207                                    .insert(tipset.key().to_owned(), true);
1208                            }
1209
1210                            let maybe_receipt =
1211                                sm_cloned.tipset_executed_message(&tipset, &message, true)?;
1212                            if let Some(receipt) = maybe_receipt {
1213                                if confidence == 0 {
1214                                    return Ok((Some(tipset), Some(receipt)));
1215                                }
1216                                candidate_tipset = Some(tipset);
1217                                candidate_receipt = Some(receipt)
1218                            }
1219                        }
1220                    },
1221                    Err(RecvError::Lagged(i)) => {
1222                        warn!(
1223                            "wait for message head change subscriber lagged, skipped {} events",
1224                            i
1225                        );
1226                    }
1227                    Err(RecvError::Closed) => break,
1228                }
1229            }
1230            Ok((None, None))
1231        })
1232        .fuse();
1233
1234        // Search backwards for message.
1235        let mut search_back_poll = tokio::task::spawn(async move {
1236            let back_tuple = task.await.map_err(|e| {
1237                Error::Other(format!("Could not search backwards for message {e}"))
1238            })??;
1239            if let Some((back_tipset, back_receipt)) = back_tuple {
1240                let should_revert = *reverts
1241                    .read()
1242                    .await
1243                    .get(back_tipset.key())
1244                    .unwrap_or(&false);
1245                let larger_height_of_head = height_of_head >= back_tipset.epoch() + confidence;
1246                if !should_revert && larger_height_of_head {
1247                    return Ok::<_, Error>((Some(back_tipset), Some(back_receipt)));
1248                }
1249                return Ok((None, None));
1250            }
1251            Ok((None, None))
1252        })
1253        .fuse();
1254
1255        // Await on first future to finish.
1256        loop {
1257            select! {
1258                res = subscriber_poll => {
1259                    return res?
1260                }
1261                res = search_back_poll => {
1262                    if let Ok((Some(ts), Some(rct))) = res? {
1263                        return Ok((Some(ts), Some(rct)));
1264                    }
1265                }
1266            }
1267        }
1268    }
1269
1270    pub async fn search_for_message(
1271        &self,
1272        from: Option<Tipset>,
1273        msg_cid: Cid,
1274        look_back_limit: Option<i64>,
1275        allow_replaced: Option<bool>,
1276    ) -> Result<Option<(Tipset, Receipt)>, Error> {
1277        let from = from.unwrap_or_else(|| self.heaviest_tipset());
1278        let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1279            .map_err(|err| Error::Other(format!("failed to load message {err}")))?;
1280        let current_tipset = self.heaviest_tipset();
1281        let maybe_message_receipt =
1282            self.tipset_executed_message(&from, &message, allow_replaced.unwrap_or(true))?;
1283        if let Some(r) = maybe_message_receipt {
1284            Ok(Some((from, r)))
1285        } else {
1286            self.search_back_for_message(current_tipset, &message, look_back_limit, allow_replaced)
1287        }
1288    }
1289
1290    /// Returns a BLS public key from provided address
1291    pub fn get_bls_public_key(
1292        db: &Arc<DB>,
1293        addr: &Address,
1294        state_cid: Cid,
1295    ) -> Result<BlsPublicKey, Error> {
1296        let state = StateTree::new_from_root(Arc::clone(db), &state_cid)
1297            .map_err(|e| Error::Other(e.to_string()))?;
1298        let kaddr = resolve_to_key_addr(&state, db, addr)
1299            .map_err(|e| format!("Failed to resolve key address, error: {e}"))?;
1300
1301        match kaddr.into_payload() {
1302            Payload::BLS(key) => BlsPublicKey::from_bytes(&key)
1303                .map_err(|e| Error::Other(format!("Failed to construct bls public key: {e}"))),
1304            _ => Err(Error::state(
1305                "Address must be BLS address to load bls public key",
1306            )),
1307        }
1308    }
1309
1310    /// Looks up ID [Address] from the state at the given [Tipset].
1311    pub fn lookup_id(&self, addr: &Address, ts: &Tipset) -> Result<Option<Address>, Error> {
1312        let state_tree = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1313            .map_err(|e| format!("{e:?}"))?;
1314        Ok(state_tree
1315            .lookup_id(addr)
1316            .map_err(|e| Error::Other(e.to_string()))?
1317            .map(Address::new_id))
1318    }
1319
1320    /// Looks up required ID [Address] from the state at the given [Tipset].
1321    pub fn lookup_required_id(&self, addr: &Address, ts: &Tipset) -> Result<Address, Error> {
1322        self.lookup_id(addr, ts)?
1323            .ok_or_else(|| Error::Other(format!("Failed to lookup the id address {addr}")))
1324    }
1325
1326    /// Retrieves market state
1327    pub fn market_state(&self, ts: &Tipset) -> Result<market::State, Error> {
1328        let actor = self.get_required_actor(&Address::MARKET_ACTOR, *ts.parent_state())?;
1329        let market_state = market::State::load(self.blockstore(), actor.code, actor.state)?;
1330        Ok(market_state)
1331    }
1332
1333    /// Retrieves market balance in escrow and locked tables.
1334    pub fn market_balance(&self, addr: &Address, ts: &Tipset) -> Result<MarketBalance, Error> {
1335        let market_state = self.market_state(ts)?;
1336        let new_addr = self.lookup_required_id(addr, ts)?;
1337        let out = MarketBalance {
1338            escrow: {
1339                market_state
1340                    .escrow_table(self.blockstore())?
1341                    .get(&new_addr)?
1342            },
1343            locked: {
1344                market_state
1345                    .locked_table(self.blockstore())?
1346                    .get(&new_addr)?
1347            },
1348        };
1349
1350        Ok(out)
1351    }
1352
1353    /// Retrieves miner info.
1354    pub fn miner_info(&self, addr: &Address, ts: &Tipset) -> Result<MinerInfo, Error> {
1355        let actor = self
1356            .get_actor(addr, *ts.parent_state())?
1357            .ok_or_else(|| Error::state("Miner actor not found"))?;
1358        let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1359
1360        Ok(state.info(self.blockstore())?)
1361    }
1362
1363    /// Retrieves miner faults.
1364    pub fn miner_faults(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1365        self.all_partition_sectors(addr, ts, |partition| partition.faulty_sectors().clone())
1366    }
1367
1368    /// Retrieves miner recoveries.
1369    pub fn miner_recoveries(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1370        self.all_partition_sectors(addr, ts, |partition| partition.recovering_sectors().clone())
1371    }
1372
1373    fn all_partition_sectors(
1374        &self,
1375        addr: &Address,
1376        ts: &Tipset,
1377        get_sector: impl Fn(Partition<'_>) -> BitField,
1378    ) -> Result<BitField, Error> {
1379        let actor = self
1380            .get_actor(addr, *ts.parent_state())?
1381            .ok_or_else(|| Error::state("Miner actor not found"))?;
1382
1383        let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1384
1385        let mut partitions = Vec::new();
1386
1387        state.for_each_deadline(
1388            &self.chain_config().policy,
1389            self.blockstore(),
1390            |_, deadline| {
1391                deadline.for_each(self.blockstore(), |_, partition| {
1392                    partitions.push(get_sector(partition));
1393                    Ok(())
1394                })
1395            },
1396        )?;
1397
1398        Ok(BitField::union(partitions.iter()))
1399    }
1400
1401    /// Retrieves miner power.
1402    pub fn miner_power(&self, addr: &Address, ts: &Tipset) -> Result<MinerPower, Error> {
1403        if let Some((miner_power, total_power)) = self.get_power(ts.parent_state(), Some(addr))? {
1404            return Ok(MinerPower {
1405                miner_power,
1406                total_power,
1407                has_min_power: true,
1408            });
1409        }
1410
1411        Ok(MinerPower {
1412            has_min_power: false,
1413            miner_power: Default::default(),
1414            total_power: Default::default(),
1415        })
1416    }
1417
1418    /// Similar to `resolve_to_key_addr` in the `forest_vm` [`crate::state_manager`] but does not
1419    /// allow `Actor` type of addresses. Uses `ts` to generate the VM state.
1420    pub async fn resolve_to_key_addr(
1421        self: &Arc<Self>,
1422        addr: &Address,
1423        ts: &Tipset,
1424    ) -> Result<Address, anyhow::Error> {
1425        match addr.protocol() {
1426            Protocol::BLS | Protocol::Secp256k1 | Protocol::Delegated => return Ok(*addr),
1427            Protocol::Actor => {
1428                return Err(Error::Other(
1429                    "cannot resolve actor address to key address".to_string(),
1430                )
1431                .into());
1432            }
1433            _ => {}
1434        };
1435
1436        // First try to resolve the actor in the parent state, so we don't have to
1437        // compute anything.
1438        let state = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())?;
1439        if let Ok(addr) = resolve_to_key_addr(&state, self.blockstore(), addr) {
1440            return Ok(addr);
1441        }
1442
1443        // If that fails, compute the tip-set and try again.
1444        let (st, _) = self.tipset_state(ts, StateLookupPolicy::Enabled).await?;
1445        let state = StateTree::new_from_root(self.blockstore_owned(), &st)?;
1446
1447        resolve_to_key_addr(&state, self.blockstore(), addr)
1448    }
1449
1450    pub async fn miner_get_base_info(
1451        self: &Arc<Self>,
1452        beacon_schedule: &BeaconSchedule,
1453        tipset: Tipset,
1454        addr: Address,
1455        epoch: ChainEpoch,
1456    ) -> anyhow::Result<Option<MiningBaseInfo>> {
1457        let prev_beacon = self
1458            .chain_store()
1459            .chain_index()
1460            .latest_beacon_entry(tipset.clone())?;
1461
1462        let entries: Vec<BeaconEntry> = beacon_schedule
1463            .beacon_entries_for_block(
1464                self.chain_config().network_version(epoch),
1465                epoch,
1466                tipset.epoch(),
1467                &prev_beacon,
1468            )
1469            .await?;
1470
1471        let base = entries.last().unwrap_or(&prev_beacon);
1472
1473        let (lb_tipset, lb_state_root) = ChainStore::get_lookback_tipset_for_round(
1474            self.chain_index(),
1475            self.chain_config(),
1476            &tipset,
1477            epoch,
1478        )?;
1479
1480        // If the miner actor doesn't exist in the current tipset, it is a
1481        // user-error and we must return an error message. If the miner exists
1482        // in the current tipset but not in the lookback tipset, we may not
1483        // error and should instead return None.
1484        let actor = self.get_required_actor(&addr, *tipset.parent_state())?;
1485        if self.get_actor(&addr, lb_state_root)?.is_none() {
1486            return Ok(None);
1487        }
1488
1489        let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1490
1491        let addr_buf = to_vec(&addr)?;
1492        let rand = draw_randomness(
1493            base.signature(),
1494            DomainSeparationTag::WinningPoStChallengeSeed as i64,
1495            epoch,
1496            &addr_buf,
1497        )?;
1498
1499        let network_version = self.chain_config().network_version(tipset.epoch());
1500        let sectors = self.get_sectors_for_winning_post(
1501            &lb_state_root,
1502            network_version,
1503            &addr,
1504            Randomness::new(rand.to_vec()),
1505        )?;
1506
1507        if sectors.is_empty() {
1508            return Ok(None);
1509        }
1510
1511        let (miner_power, total_power) = self
1512            .get_power(&lb_state_root, Some(&addr))?
1513            .context("failed to get power")?;
1514
1515        let info = miner_state.info(self.blockstore())?;
1516
1517        let worker_key = self
1518            .resolve_to_deterministic_address(info.worker, &tipset)
1519            .await?;
1520        let eligible = self.eligible_to_mine(&addr, &tipset, &lb_tipset)?;
1521
1522        Ok(Some(MiningBaseInfo {
1523            miner_power: miner_power.quality_adj_power,
1524            network_power: total_power.quality_adj_power,
1525            sectors,
1526            worker_key,
1527            sector_size: info.sector_size,
1528            prev_beacon_entry: prev_beacon,
1529            beacon_entries: entries,
1530            eligible_for_mining: eligible,
1531        }))
1532    }
1533
1534    /// Checks power actor state for if miner meets consensus minimum
1535    /// requirements.
1536    pub fn miner_has_min_power(
1537        &self,
1538        policy: &Policy,
1539        addr: &Address,
1540        ts: &Tipset,
1541    ) -> anyhow::Result<bool> {
1542        let actor = self
1543            .get_actor(&Address::POWER_ACTOR, *ts.parent_state())?
1544            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
1545        let ps = power::State::load(self.blockstore(), actor.code, actor.state)?;
1546
1547        ps.miner_nominal_power_meets_consensus_minimum(policy, self.blockstore(), addr)
1548    }
1549
1550    /// Validates all tipsets at epoch `start..=end` behind the heaviest tipset.
1551    ///
1552    /// This spawns [`rayon::current_num_threads`] threads to do the compute-heavy work
1553    /// of tipset validation.
1554    ///
1555    /// # What is validation?
1556    /// Every state transition returns a new _state root_, which is typically retained in, e.g., snapshots.
1557    /// For "full" snapshots, all state roots are retained.
1558    /// For standard snapshots, the last 2000 or so state roots are retained.
1559    ///
1560    /// _receipts_ meanwhile, are typically ephemeral, but each tipset knows the _receipt root_
1561    /// (hash) of the previous tipset.
1562    ///
1563    /// This function takes advantage of that fact to validate tipsets:
1564    /// - `tipset[N]` claims that `receipt_root[N-1]` should be `0xDEADBEEF`
1565    /// - find `tipset[N-1]`, and perform its state transition to get the actual `receipt_root`
1566    /// - assert that they match
1567    ///
1568    /// See [`Self::compute_tipset_state_blocking`] for an explanation of state transitions.
1569    ///
1570    /// # Known issues
1571    /// This function is blocking, but we do observe threads waiting and synchronizing.
1572    /// This is suspected to be due something in the VM or its `WASM` runtime.
1573    #[tracing::instrument(skip(self))]
1574    pub fn validate_range(&self, epochs: RangeInclusive<i64>) -> anyhow::Result<()> {
1575        let heaviest = self.heaviest_tipset();
1576        let heaviest_epoch = heaviest.epoch();
1577        let end = self
1578            .chain_index()
1579            .tipset_by_height(*epochs.end(), heaviest, ResolveNullTipset::TakeOlder)
1580            .with_context(|| {
1581                format!(
1582            "couldn't get a tipset at height {} behind heaviest tipset at height {heaviest_epoch}",
1583            *epochs.end(),
1584        )
1585            })?;
1586
1587        // lookup tipset parents as we go along, iterating DOWN from `end`
1588        let tipsets = self
1589            .chain_index()
1590            .chain(end)
1591            .take_while(|tipset| tipset.epoch() >= *epochs.start());
1592
1593        self.validate_tipsets(tipsets)
1594    }
1595
1596    pub fn validate_tipsets<T>(&self, tipsets: T) -> anyhow::Result<()>
1597    where
1598        T: Iterator<Item = Tipset> + Send,
1599    {
1600        let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1601        validate_tipsets(
1602            genesis_timestamp,
1603            self.chain_index().clone(),
1604            self.chain_config().clone(),
1605            self.beacon_schedule().clone(),
1606            &self.engine,
1607            tipsets,
1608        )
1609    }
1610
1611    pub fn get_verified_registry_actor_state(
1612        &self,
1613        ts: &Tipset,
1614    ) -> anyhow::Result<verifreg::State> {
1615        let act = self
1616            .get_actor(&Address::VERIFIED_REGISTRY_ACTOR, *ts.parent_state())
1617            .map_err(Error::state)?
1618            .ok_or_else(|| Error::state("actor not found"))?;
1619        verifreg::State::load(self.blockstore(), act.code, act.state)
1620    }
1621    pub fn get_claim(
1622        &self,
1623        addr: &Address,
1624        ts: &Tipset,
1625        claim_id: ClaimID,
1626    ) -> anyhow::Result<Option<Claim>> {
1627        let id_address = self.lookup_required_id(addr, ts)?;
1628        let state = self.get_verified_registry_actor_state(ts)?;
1629        state.get_claim(self.blockstore(), id_address, claim_id)
1630    }
1631
1632    pub fn get_all_claims(&self, ts: &Tipset) -> anyhow::Result<HashMap<ClaimID, Claim>> {
1633        let state = self.get_verified_registry_actor_state(ts)?;
1634        state.get_all_claims(self.blockstore())
1635    }
1636
1637    pub fn get_allocation(
1638        &self,
1639        addr: &Address,
1640        ts: &Tipset,
1641        allocation_id: AllocationID,
1642    ) -> anyhow::Result<Option<Allocation>> {
1643        let id_address = self.lookup_required_id(addr, ts)?;
1644        let state = self.get_verified_registry_actor_state(ts)?;
1645        state.get_allocation(self.blockstore(), id_address.id()?, allocation_id)
1646    }
1647
1648    pub fn get_all_allocations(
1649        &self,
1650        ts: &Tipset,
1651    ) -> anyhow::Result<HashMap<AllocationID, Allocation>> {
1652        let state = self.get_verified_registry_actor_state(ts)?;
1653        state.get_all_allocations(self.blockstore())
1654    }
1655
1656    pub fn verified_client_status(
1657        &self,
1658        addr: &Address,
1659        ts: &Tipset,
1660    ) -> anyhow::Result<Option<DataCap>> {
1661        let id = self.lookup_required_id(addr, ts)?;
1662        let network_version = self.get_network_version(ts.epoch());
1663
1664        // This is a copy of Lotus code, we need to treat all the actors below version 9
1665        // differently. Which maps to network below version 17.
1666        // Original: https://github.com/filecoin-project/lotus/blob/5e76b05b17771da6939c7b0bf65127c3dc70ee23/node/impl/full/state.go#L1627-L1664.
1667        if (u32::from(network_version.0)) < 17 {
1668            let state = self.get_verified_registry_actor_state(ts)?;
1669            return state.verified_client_data_cap(self.blockstore(), id);
1670        }
1671
1672        let act = self
1673            .get_actor(&Address::DATACAP_TOKEN_ACTOR, *ts.parent_state())
1674            .map_err(Error::state)?
1675            .ok_or_else(|| Error::state("Miner actor not found"))?;
1676
1677        let state = datacap::State::load(self.blockstore(), act.code, act.state)?;
1678
1679        state.verified_client_data_cap(self.blockstore(), id)
1680    }
1681
1682    pub async fn resolve_to_deterministic_address(
1683        self: &Arc<Self>,
1684        address: Address,
1685        ts: &Tipset,
1686    ) -> anyhow::Result<Address> {
1687        use crate::shim::address::Protocol::*;
1688        match address.protocol() {
1689            BLS | Secp256k1 | Delegated => Ok(address),
1690            Actor => anyhow::bail!("cannot resolve actor address to key address"),
1691            _ => {
1692                // First try to resolve the actor in the parent state, so we don't have to compute anything.
1693                if let Ok(state) =
1694                    StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1695                    && let Ok(address) = state
1696                        .resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
1697                {
1698                    return Ok(address);
1699                }
1700
1701                // If that fails, compute the tip-set and try again.
1702                let (state_root, _) = self.tipset_state(ts, StateLookupPolicy::Enabled).await?;
1703                let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
1704                state.resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
1705            }
1706        }
1707    }
1708
1709    pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec<ApiInvocResult>)> {
1710        let mut invoc_trace = vec![];
1711
1712        let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1713
1714        let callback = |ctx: MessageCallbackCtx<'_>| {
1715            match ctx.at {
1716                CalledAt::Applied | CalledAt::Reward => {
1717                    invoc_trace.push(ApiInvocResult {
1718                        msg_cid: ctx.message.cid(),
1719                        msg: ctx.message.message().clone(),
1720                        msg_rct: Some(ctx.apply_ret.msg_receipt()),
1721                        error: ctx.apply_ret.failure_info().unwrap_or_default(),
1722                        duration: ctx.duration.as_nanos().clamp(0, u64::MAX as u128) as u64,
1723                        gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
1724                        execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
1725                            .unwrap_or_default(),
1726                    });
1727                    Ok(())
1728                }
1729                _ => Ok(()), // ignored
1730            }
1731        };
1732
1733        let StateOutput { state_root, .. } = apply_block_messages(
1734            genesis_timestamp,
1735            self.chain_index().clone(),
1736            self.chain_config().clone(),
1737            self.beacon_schedule().clone(),
1738            &self.engine,
1739            tipset.clone(),
1740            Some(callback),
1741            VMTrace::Traced,
1742        )?;
1743
1744        Ok((state_root, invoc_trace))
1745    }
1746
1747    /// Attempts to lookup the state and receipt root of the next tipset.
1748    /// This is a performance optimization to avoid recomputing the state and receipt root by checking the blockstore.
1749    /// It only checks the immediate next epoch, as this is the most likely place to find a child.
1750    fn try_lookup_state_from_next_tipset(&self, tipset: &Tipset) -> Option<StateOutputValue> {
1751        let epoch = tipset.epoch();
1752        let next_epoch = epoch + 1;
1753
1754        // Only check the immediate next epoch - this is the most likely place to find a child
1755        let heaviest = self.heaviest_tipset();
1756        if next_epoch > heaviest.epoch() {
1757            return None;
1758        }
1759
1760        // Check if the next tipset has the same parent
1761        if let Ok(next_tipset) =
1762            self.chain_index()
1763                .tipset_by_height(next_epoch, heaviest, ResolveNullTipset::TakeNewer)
1764        {
1765            // verify that the parent of the `next_tipset` is the same as the current tipset
1766            if !next_tipset.parents().eq(tipset.key()) {
1767                return None;
1768            }
1769
1770            let state_root = next_tipset.parent_state();
1771            let receipt_root = next_tipset.min_ticket_block().message_receipts;
1772
1773            if self.blockstore().has(state_root).unwrap_or(false)
1774                && self.blockstore().has(&receipt_root).unwrap_or(false)
1775            {
1776                return Some(StateOutputValue {
1777                    state_root: state_root.into(),
1778                    receipt_root,
1779                });
1780            }
1781        }
1782
1783        None
1784    }
1785}
1786
1787pub fn validate_tipsets<DB, T>(
1788    genesis_timestamp: u64,
1789    chain_index: Arc<ChainIndex<Arc<DB>>>,
1790    chain_config: Arc<ChainConfig>,
1791    beacon: Arc<BeaconSchedule>,
1792    engine: &MultiEngine,
1793    tipsets: T,
1794) -> anyhow::Result<()>
1795where
1796    DB: Blockstore + Send + Sync + 'static,
1797    T: Iterator<Item = Tipset> + Send,
1798{
1799    use rayon::iter::ParallelIterator as _;
1800    tipsets
1801        .tuple_windows()
1802        .par_bridge()
1803        .try_for_each(|(child, parent)| {
1804            info!(height = parent.epoch(), "compute parent state");
1805            let StateOutput {
1806                state_root: actual_state,
1807                receipt_root: actual_receipt,
1808                ..
1809            } = apply_block_messages(
1810                genesis_timestamp,
1811                chain_index.clone(),
1812                chain_config.clone(),
1813                beacon.clone(),
1814                engine,
1815                parent,
1816                NO_CALLBACK,
1817                VMTrace::NotTraced,
1818            )
1819            .map_err(|e| anyhow::anyhow!("couldn't compute tipset state: {e}"))?;
1820            let expected_receipt = child.min_ticket_block().message_receipts;
1821            let expected_state = child.parent_state();
1822            match (expected_state, expected_receipt) == (&actual_state, actual_receipt) {
1823                true => Ok(()),
1824                false => {
1825                    error!(
1826                        height = child.epoch(),
1827                        ?expected_state,
1828                        ?expected_receipt,
1829                        ?actual_state,
1830                        ?actual_receipt,
1831                        "state mismatch"
1832                    );
1833                    bail!("state mismatch");
1834                }
1835            }
1836        })
1837}
1838
1839/// Messages are transactions that produce new states. The state (usually
1840/// referred to as the 'state-tree') is a mapping from actor addresses to actor
1841/// states. Each block contains the hash of the state-tree that should be used
1842/// as the starting state when executing the block messages.
1843///
1844/// # Execution environment
1845///
1846/// Transaction execution has the following inputs:
1847/// - a current state-tree (stored as IPLD in a key-value database). This
1848///   reference is in [`Tipset::parent_state`].
1849/// - up to 900 past state-trees. See
1850///   <https://docs.filecoin.io/reference/general/glossary/#finality>.
1851/// - up to 900 past tipset IDs.
1852/// - a deterministic source of randomness.
1853/// - the circulating supply of FIL (see
1854///   <https://filecoin.io/blog/filecoin-circulating-supply/>). The circulating
1855///   supply is determined by the epoch and the states of a few key actors.
1856/// - the base fee (see <https://spec.filecoin.io/systems/filecoin_vm/gas_fee/>).
1857///   This value is defined by `tipset.parent_base_fee`.
1858/// - the genesis timestamp (UNIX epoch time when the first block was
1859///   mined/created).
1860/// - a chain configuration (maps epoch to network version, has chain specific
1861///   settings).
1862///
1863/// The result of running a set of block messages is an index to the final
1864/// state-tree and an index to an array of message receipts (listing gas used,
1865/// return codes, etc).
1866///
1867/// # Cron and null tipsets
1868///
1869/// Once per epoch, after all messages have run, a special 'cron' transaction
1870/// must be executed. The tasks of the 'cron' transaction include running batch
1871/// jobs and keeping the state up-to-date with the current epoch.
1872///
1873/// It can happen that no blocks are mined in an epoch. The tipset for such an
1874/// epoch is called a null tipset. A null tipset has no identity and cannot be
1875/// directly executed. This is a problem for 'cron' which must run for every
1876/// epoch, even if there are no messages. The fix is to run 'cron' if there are
1877/// any null tipsets between the current epoch and the parent epoch.
1878///
1879/// Imagine the blockchain looks like this with a null tipset at epoch 9:
1880///
1881/// ```text
1882/// ┌────────┐ ┌────┐ ┌───────┐  ┌───────┐
1883/// │Epoch 10│ │Null│ │Epoch 8├──►Epoch 7├─►
1884/// └───┬────┘ └────┘ └───▲───┘  └───────┘
1885///     └─────────────────┘
1886/// ```
1887///
1888/// The parent of tipset-epoch-10 is tipset-epoch-8. Before executing the
1889/// messages in epoch 10, we have to run cron for epoch 9. However, running
1890/// 'cron' requires the timestamp of the youngest block in the tipset (which
1891/// doesn't exist because there are no blocks in the tipset). Lotus dictates that
1892/// the timestamp of a null tipset is `30s * epoch` after the genesis timestamp.
1893/// So, in the above example, if the genesis block was mined at time `X`, the
1894/// null tipset for epoch 9 will have timestamp `X + 30 * 9`.
1895///
1896/// # Migrations
1897///
1898/// Migrations happen between network upgrades and modify the state tree. If a
1899/// migration is scheduled for epoch 10, it will be run _after_ the messages for
1900/// epoch 10. The tipset for epoch 11 will link the state-tree produced by the
1901/// migration.
1902///
1903/// Example timeline with a migration at epoch 10:
1904///   1. Tipset-epoch-10 executes, producing state-tree A.
1905///   2. Migration consumes state-tree A and produces state-tree B.
1906///   3. Tipset-epoch-11 executes, consuming state-tree B (rather than A).
1907///
1908/// Note: The migration actually happens when tipset-epoch-11 executes. This is
1909///       because tipset-epoch-10 may be null and therefore not executed at all.
1910///
1911/// # Caching
1912///
1913/// Scanning the blockchain to find past tipsets and state-trees may be slow.
1914/// The `ChainStore` caches recent tipsets to make these scans faster.
1915#[allow(clippy::too_many_arguments)]
1916pub fn apply_block_messages<DB>(
1917    genesis_timestamp: u64,
1918    chain_index: Arc<ChainIndex<Arc<DB>>>,
1919    chain_config: Arc<ChainConfig>,
1920    beacon: Arc<BeaconSchedule>,
1921    engine: &MultiEngine,
1922    tipset: Tipset,
1923    mut callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
1924    enable_tracing: VMTrace,
1925) -> anyhow::Result<StateOutput>
1926where
1927    DB: Blockstore + Send + Sync + 'static,
1928{
1929    // This function will:
1930    // 1. handle the genesis block as a special case
1931    // 2. run 'cron' for any null-tipsets between the current tipset and our parent tipset
1932    // 3. run migrations
1933    // 4. execute block messages
1934    // 5. write the state-tree to the DB and return the CID
1935
1936    // step 1: special case for genesis block
1937    if tipset.epoch() == 0 {
1938        // NB: This is here because the process that executes blocks requires that the
1939        // block miner reference a valid miner in the state tree. Unless we create some
1940        // magical genesis miner, this won't work properly, so we short circuit here
1941        // This avoids the question of 'who gets paid the genesis block reward'
1942        let message_receipts = tipset.min_ticket_block().message_receipts;
1943        return Ok(StateOutput {
1944            state_root: *tipset.parent_state(),
1945            receipt_root: message_receipts,
1946            events: vec![],
1947            events_roots: vec![],
1948        });
1949    }
1950
1951    let rand = ChainRand::new(
1952        Arc::clone(&chain_config),
1953        tipset.clone(),
1954        Arc::clone(&chain_index),
1955        beacon,
1956    );
1957
1958    let genesis_info = GenesisInfo::from_chain_config(chain_config.clone());
1959    let create_vm = |state_root: Cid, epoch, timestamp| {
1960        let circulating_supply =
1961            genesis_info.get_vm_circulating_supply(epoch, chain_index.db(), &state_root)?;
1962        VM::new(
1963            ExecutionContext {
1964                heaviest_tipset: tipset.clone(),
1965                state_tree_root: state_root,
1966                epoch,
1967                rand: Box::new(rand.clone()),
1968                base_fee: tipset.min_ticket_block().parent_base_fee.clone(),
1969                circ_supply: circulating_supply,
1970                chain_config: Arc::clone(&chain_config),
1971                chain_index: Arc::clone(&chain_index),
1972                timestamp,
1973            },
1974            engine,
1975            enable_tracing,
1976        )
1977    };
1978
1979    let mut parent_state = *tipset.parent_state();
1980
1981    let parent_epoch = Tipset::load_required(chain_index.db(), tipset.parents())?.epoch();
1982    let epoch = tipset.epoch();
1983
1984    for epoch_i in parent_epoch..epoch {
1985        if epoch_i > parent_epoch {
1986            // step 2: running cron for any null-tipsets
1987            let timestamp = genesis_timestamp + ((EPOCH_DURATION_SECONDS * epoch_i) as u64);
1988
1989            // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from
1990            // FVM, but that introduces some constraints, and possible deadlocks.
1991            parent_state = stacker::grow(64 << 20, || -> anyhow::Result<Cid> {
1992                let mut vm = create_vm(parent_state, epoch_i, timestamp)?;
1993                // run cron for null rounds if any
1994                if let Err(e) = vm.run_cron(epoch_i, callback.as_mut()) {
1995                    error!("Beginning of epoch cron failed to run: {}", e);
1996                }
1997                vm.flush()
1998            })?;
1999        }
2000
2001        // step 3: run migrations
2002        if let Some(new_state) =
2003            run_state_migrations(epoch_i, &chain_config, chain_index.db(), &parent_state)?
2004        {
2005            parent_state = new_state;
2006        }
2007    }
2008
2009    let block_messages = BlockMessages::for_tipset(chain_index.db(), &tipset)
2010        .map_err(|e| Error::Other(e.to_string()))?;
2011
2012    // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from
2013    // FVM, but that introduces some constraints, and possible deadlocks.
2014    stacker::grow(64 << 20, || -> anyhow::Result<StateOutput> {
2015        let mut vm = create_vm(parent_state, epoch, tipset.min_timestamp())?;
2016
2017        // step 4: apply tipset messages
2018        let (receipts, events, events_roots) =
2019            vm.apply_block_messages(&block_messages, epoch, callback)?;
2020
2021        // step 5: construct receipt root from receipts
2022        let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts)?;
2023
2024        // step 6: store events AMTs in the blockstore
2025        for (msg_events, events_root) in events.iter().zip(events_roots.iter()) {
2026            if let Some(event_root) = events_root {
2027                // Store the events AMT - the root CID should match the one computed by FVM
2028                let derived_event_root = Amt::new_from_iter_with_bit_width(
2029                    chain_index.db(),
2030                    EVENTS_AMT_BITWIDTH,
2031                    msg_events.iter(),
2032                )
2033                .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;
2034
2035                // Verify the stored root matches the FVM-computed root
2036                ensure!(
2037                    derived_event_root.eq(event_root),
2038                    "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
2039                );
2040            }
2041        }
2042
2043        let state_root = vm.flush()?;
2044
2045        Ok(StateOutput {
2046            state_root,
2047            receipt_root,
2048            events,
2049            events_roots,
2050        })
2051    })
2052}
2053
2054#[allow(clippy::too_many_arguments)]
2055pub fn compute_state<DB>(
2056    _height: ChainEpoch,
2057    messages: Vec<Message>,
2058    tipset: Tipset,
2059    genesis_timestamp: u64,
2060    chain_index: Arc<ChainIndex<Arc<DB>>>,
2061    chain_config: Arc<ChainConfig>,
2062    beacon: Arc<BeaconSchedule>,
2063    engine: &MultiEngine,
2064    callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
2065    enable_tracing: VMTrace,
2066) -> anyhow::Result<StateOutput>
2067where
2068    DB: Blockstore + Send + Sync + 'static,
2069{
2070    if !messages.is_empty() {
2071        anyhow::bail!("Applying messages is not yet implemented.");
2072    }
2073
2074    let output = apply_block_messages(
2075        genesis_timestamp,
2076        chain_index,
2077        chain_config,
2078        beacon,
2079        engine,
2080        tipset,
2081        callback,
2082        enable_tracing,
2083    )?;
2084
2085    Ok(output)
2086}
2087
2088/// Whether or not to lookup the state output from the next tipset before computing a state
2089#[derive(Debug, Copy, Clone, Default)]
2090pub enum StateLookupPolicy {
2091    #[default]
2092    Enabled,
2093    Disabled,
2094}