Skip to main content

forest/state_manager/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4#[cfg(test)]
5mod tests;
6
7mod cache;
8pub mod chain_rand;
9pub mod circulating_supply;
10mod errors;
11pub mod utils;
12
13pub use self::errors::*;
14use self::utils::structured;
15
16use crate::beacon::{BeaconEntry, BeaconSchedule};
17use crate::blocks::{Tipset, TipsetKey};
18use crate::chain::{
19    ChainStore,
20    index::{ChainIndex, ResolveNullTipset},
21};
22use crate::interpreter::{
23    BlockMessages, CalledAt, ExecutionContext, IMPLICIT_MESSAGE_GAS_LIMIT, VM, resolve_to_key_addr,
24};
25use crate::interpreter::{MessageCallbackCtx, VMTrace};
26use crate::lotus_json::{LotusJson, lotus_json_with_self};
27use crate::message::{ChainMessage, MessageRead as _, MessageReadWrite as _, SignedMessage};
28use crate::networks::ChainConfig;
29use crate::rpc::state::{ApiInvocResult, InvocResult, MessageGasCost};
30use crate::rpc::types::{MiningBaseInfo, SectorOnChainInfo};
31use crate::shim::actors::init::{self, State};
32use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition};
33use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim};
34use crate::shim::actors::*;
35use crate::shim::address::AddressId;
36use crate::shim::crypto::{Signature, SignatureType};
37use crate::shim::{
38    actors::{
39        LoadActorStateFromBlockstore, miner::ext::MinerStateExt as _,
40        verifreg::ext::VerifiedRegistryStateExt as _,
41    },
42    executor::{ApplyRet, Receipt, StampedEvent},
43};
44use crate::shim::{
45    address::{Address, Payload, Protocol},
46    clock::ChainEpoch,
47    econ::TokenAmount,
48    machine::{GLOBAL_MULTI_ENGINE, MultiEngine},
49    message::Message,
50    randomness::Randomness,
51    runtime::Policy,
52    state_tree::{ActorState, StateTree},
53    version::NetworkVersion,
54};
55use crate::state_manager::cache::TipsetStateCache;
56use crate::state_manager::chain_rand::draw_randomness;
57use crate::state_migration::run_state_migrations;
58use crate::utils::ShallowClone as _;
59use crate::utils::cache::SizeTrackingLruCache;
60use crate::utils::get_size::{GetSize, vec_heap_size_helper};
61use ahash::{HashMap, HashMapExt};
62use anyhow::{Context as _, bail, ensure};
63use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _};
64use chain_rand::ChainRand;
65use cid::Cid;
66pub use circulating_supply::GenesisInfo;
67use fil_actor_verifreg_state::v12::DataCap;
68use fil_actor_verifreg_state::v13::ClaimID;
69use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0};
70use fil_actors_shared::fvm_ipld_bitfield::BitField;
71use fil_actors_shared::v12::runtime::DomainSeparationTag;
72use futures::{FutureExt, channel::oneshot, select};
73use fvm_ipld_blockstore::Blockstore;
74use fvm_ipld_encoding::to_vec;
75use fvm_shared4::crypto::signature::SECP_SIG_LEN;
76use itertools::Itertools as _;
77use nonzero_ext::nonzero;
78use num::BigInt;
79use num_traits::identities::Zero;
80use schemars::JsonSchema;
81use serde::{Deserialize, Serialize};
82use std::ops::RangeInclusive;
83use std::time::Duration;
84use std::{num::NonZeroUsize, sync::Arc};
85use tokio::sync::{RwLock, broadcast::error::RecvError};
86use tracing::{error, info, instrument, warn};
87
88const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
89const DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
90pub const EVENTS_AMT_BITWIDTH: u32 = 5;
91pub type IdToAddressCache = SizeTrackingLruCache<AddressId, Address>;
92
93/// Result of executing an individual chain message in a tipset.
94///
95/// Includes the executed message itself, the execution receipt, and
96/// optional events emitted by the actor during execution.
97#[derive(Debug, Clone)]
98pub struct ExecutedMessage {
99    pub message: ChainMessage,
100    pub receipt: Receipt,
101    pub events: Option<Vec<StampedEvent>>,
102}
103
104impl GetSize for ExecutedMessage {
105    fn get_heap_size(&self) -> usize {
106        self.message.get_heap_size()
107            + self.receipt.get_heap_size()
108            + self
109                .events
110                .as_ref()
111                .map(vec_heap_size_helper)
112                .unwrap_or_default()
113    }
114}
115
116/// Aggregated execution result for a tipset.
117#[derive(Debug, Clone, GetSize)]
118pub struct ExecutedTipset {
119    /// Resulting state tree root after message execution
120    #[get_size(ignore)]
121    pub state_root: Cid,
122    /// Resulting message receipts root after message execution
123    #[get_size(ignore)]
124    pub receipt_root: Cid,
125    /// Per-message execution details.
126    /// Wrapped in an `Arc` to reduce cloning cost, as this can be quite large.
127    pub executed_messages: Arc<Vec<ExecutedMessage>>,
128}
129
130/// Basic execution result for a tipset.
131#[derive(Debug, Clone, GetSize)]
132pub struct TipsetState {
133    /// Resulting state tree root after message execution
134    #[get_size(ignore)]
135    pub state_root: Cid,
136    /// Resulting message receipts root after message execution
137    #[allow(dead_code)]
138    #[get_size(ignore)]
139    pub receipt_root: Cid,
140}
141
142impl From<ExecutedTipset> for TipsetState {
143    fn from(
144        ExecutedTipset {
145            state_root,
146            receipt_root,
147            ..
148        }: ExecutedTipset,
149    ) -> Self {
150        Self {
151            state_root,
152            receipt_root,
153        }
154    }
155}
156
157impl From<&ExecutedTipset> for TipsetState {
158    fn from(
159        ExecutedTipset {
160            state_root,
161            receipt_root,
162            ..
163        }: &ExecutedTipset,
164    ) -> Self {
165        Self {
166            state_root: *state_root,
167            receipt_root: *receipt_root,
168        }
169    }
170}
171
172/// External format for returning market balance from state.
173#[derive(
174    Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema,
175)]
176#[serde(rename_all = "PascalCase")]
177pub struct MarketBalance {
178    #[schemars(with = "LotusJson<TokenAmount>")]
179    #[serde(with = "crate::lotus_json")]
180    pub escrow: TokenAmount,
181    #[schemars(with = "LotusJson<TokenAmount>")]
182    #[serde(with = "crate::lotus_json")]
183    pub locked: TokenAmount,
184}
185lotus_json_with_self!(MarketBalance);
186
187/// State manager handles all interactions with the internal Filecoin actors
188/// state. This encapsulates the [`ChainStore`] functionality, which only
189/// handles chain data, to allow for interactions with the underlying state of
190/// the chain. The state manager not only allows interfacing with state, but
191/// also is used when performing state transitions.
192pub struct StateManager<DB> {
193    /// Chain store
194    cs: Arc<ChainStore<DB>>,
195    /// This is a cache which indexes tipsets to their calculated state output (state root, receipt root).
196    cache: TipsetStateCache<ExecutedTipset>,
197    id_to_deterministic_address_cache: IdToAddressCache,
198    beacon: Arc<crate::beacon::BeaconSchedule>,
199    engine: Arc<MultiEngine>,
200}
201
202#[allow(clippy::type_complexity)]
203pub const NO_CALLBACK: Option<fn(MessageCallbackCtx<'_>) -> anyhow::Result<()>> = None;
204
205impl<DB> StateManager<DB>
206where
207    DB: Blockstore,
208{
209    pub fn new(cs: Arc<ChainStore<DB>>) -> anyhow::Result<Self> {
210        Self::new_with_engine(cs, GLOBAL_MULTI_ENGINE.clone())
211    }
212
213    pub fn new_with_engine(
214        cs: Arc<ChainStore<DB>>,
215        engine: Arc<MultiEngine>,
216    ) -> anyhow::Result<Self> {
217        let genesis = cs.genesis_block_header();
218        let beacon = Arc::new(cs.chain_config().get_beacon_schedule(genesis.timestamp));
219
220        Ok(Self {
221            cs,
222            cache: TipsetStateCache::new("executed_tipset"), // For StateOutput
223            beacon,
224            engine,
225            id_to_deterministic_address_cache: SizeTrackingLruCache::new_with_metrics(
226                "id_to_deterministic_address".into(),
227                DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE,
228            ),
229        })
230    }
231
232    /// Returns the currently tracked heaviest tipset.
233    pub fn heaviest_tipset(&self) -> Tipset {
234        self.chain_store().heaviest_tipset()
235    }
236
237    /// Returns the currently tracked heaviest tipset and rewind to a most recent valid one if necessary.
238    /// A valid head has
239    ///     - state tree in the blockstore
240    ///     - actor bundle version in the state tree that matches chain configuration
241    pub fn maybe_rewind_heaviest_tipset(&self) -> anyhow::Result<()> {
242        while self.maybe_rewind_heaviest_tipset_once()? {}
243        Ok(())
244    }
245
246    fn maybe_rewind_heaviest_tipset_once(&self) -> anyhow::Result<bool> {
247        let head = self.heaviest_tipset();
248        if let Some(info) = self
249            .chain_config()
250            .network_height_with_actor_bundle(head.epoch())
251        {
252            let expected_height_info = info.info;
253            let expected_bundle = info.manifest(self.blockstore())?;
254            let expected_bundle_metadata = expected_bundle.metadata()?;
255            let state = self.get_state_tree(head.parent_state())?;
256            let bundle_metadata = state.get_actor_bundle_metadata()?;
257            if expected_bundle_metadata != bundle_metadata {
258                let current_epoch = head.epoch();
259                let target_head = self.chain_index().load_required_tipset_by_height(
260                    (expected_height_info.epoch - 1).max(0),
261                    head,
262                    ResolveNullTipset::TakeOlder,
263                )?;
264                let target_epoch = target_head.epoch();
265                let bundle_version = &bundle_metadata.version;
266                let expected_bundle_version = &expected_bundle_metadata.version;
267                if target_epoch < current_epoch {
268                    tracing::warn!(
269                        "rewinding chain head from {current_epoch} to {target_epoch}, actor bundle: {bundle_version}, expected: {expected_bundle_version}"
270                    );
271                    if self.blockstore().has(target_head.parent_state())? {
272                        self.chain_store().set_heaviest_tipset(target_head)?;
273                        return Ok(true);
274                    } else {
275                        anyhow::bail!(
276                            "failed to rewind, state tree @ {target_epoch} is missing from blockstore: {}",
277                            target_head.parent_state()
278                        );
279                    }
280                }
281            }
282        }
283        Ok(false)
284    }
285
286    pub fn beacon_schedule(&self) -> &Arc<BeaconSchedule> {
287        &self.beacon
288    }
289
290    /// Returns network version for the given epoch.
291    pub fn get_network_version(&self, epoch: ChainEpoch) -> NetworkVersion {
292        self.chain_config().network_version(epoch)
293    }
294
295    /// Gets the state tree
296    pub fn get_state_tree(&self, state_cid: &Cid) -> anyhow::Result<StateTree<DB>> {
297        StateTree::new_from_root(self.blockstore_owned(), state_cid)
298    }
299
300    /// Gets actor from given [`Cid`], if it exists.
301    pub fn get_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<Option<ActorState>> {
302        let state = self.get_state_tree(&state_cid)?;
303        state.get_actor(addr)
304    }
305
306    /// Gets actor state from implicit actor address
307    pub fn get_actor_state<S: LoadActorStateFromBlockstore>(
308        &self,
309        ts: &Tipset,
310    ) -> anyhow::Result<S> {
311        let state_tree = self.get_state_tree(ts.parent_state())?;
312        state_tree.get_actor_state()
313    }
314
315    /// Gets actor state from explicit actor address
316    pub fn get_actor_state_from_address<S: LoadActorStateFromBlockstore>(
317        &self,
318        ts: &Tipset,
319        actor_address: &Address,
320    ) -> anyhow::Result<S> {
321        let state_tree = self.get_state_tree(ts.parent_state())?;
322        state_tree.get_actor_state_from_address(actor_address)
323    }
324
325    /// Gets required actor from given [`Cid`].
326    pub fn get_required_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<ActorState> {
327        let state = self.get_state_tree(&state_cid)?;
328        state.get_actor(addr)?.with_context(|| {
329            format!("Failed to load actor with addr={addr}, state_cid={state_cid}")
330        })
331    }
332
333    /// Returns a reference to the state manager's [`Blockstore`].
334    pub fn blockstore(&self) -> &Arc<DB> {
335        self.cs.blockstore()
336    }
337
338    pub fn blockstore_owned(&self) -> Arc<DB> {
339        self.blockstore().clone()
340    }
341
342    /// Returns reference to the state manager's [`ChainStore`].
343    pub fn chain_store(&self) -> &Arc<ChainStore<DB>> {
344        &self.cs
345    }
346
347    /// Returns reference to the state manager's [`ChainIndex`].
348    pub fn chain_index(&self) -> &ChainIndex<DB> {
349        self.cs.chain_index()
350    }
351
352    /// Returns reference to the state manager's [`ChainConfig`].
353    pub fn chain_config(&self) -> &Arc<ChainConfig> {
354        self.cs.chain_config()
355    }
356
357    pub fn chain_rand(&self, tipset: Tipset) -> ChainRand<DB> {
358        ChainRand::new(
359            self.chain_config().shallow_clone(),
360            tipset,
361            self.chain_index().shallow_clone(),
362            self.beacon.shallow_clone(),
363        )
364    }
365
366    /// Returns the internal, protocol-level network chain from the state.
367    pub fn get_network_state_name(
368        &self,
369        state_cid: Cid,
370    ) -> anyhow::Result<crate::networks::StateNetworkName> {
371        let init_act = self
372            .get_actor(&init::ADDRESS.into(), state_cid)?
373            .ok_or_else(|| Error::state("Init actor address could not be resolved"))?;
374        Ok(
375            State::load(self.blockstore(), init_act.code, init_act.state)?
376                .into_network_name()
377                .into(),
378        )
379    }
380
381    /// Returns true if miner has been slashed or is considered invalid.
382    pub fn is_miner_slashed(&self, addr: &Address, state_cid: &Cid) -> anyhow::Result<bool, Error> {
383        let actor = self
384            .get_actor(&Address::POWER_ACTOR, *state_cid)?
385            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
386
387        let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
388
389        Ok(spas.miner_power(self.blockstore(), addr)?.is_none())
390    }
391
392    /// Returns raw work address of a miner given the state root.
393    pub fn get_miner_work_addr(&self, state_cid: Cid, addr: &Address) -> Result<Address, Error> {
394        let state =
395            StateTree::new_from_root(self.blockstore_owned(), &state_cid).map_err(Error::other)?;
396        let ms: miner::State = state.get_actor_state_from_address(addr)?;
397        let info = ms.info(self.blockstore()).map_err(|e| e.to_string())?;
398        let addr = resolve_to_key_addr(&state, self.blockstore(), &info.worker())?;
399        Ok(addr)
400    }
401
402    /// Returns specified actor's claimed power and total network power as a
403    /// tuple.
404    pub fn get_power(
405        &self,
406        state_cid: &Cid,
407        addr: Option<&Address>,
408    ) -> anyhow::Result<Option<(power::Claim, power::Claim)>, Error> {
409        let actor = self
410            .get_actor(&Address::POWER_ACTOR, *state_cid)?
411            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
412
413        let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
414
415        let t_pow = spas.total_power();
416
417        if let Some(maddr) = addr {
418            let m_pow = spas
419                .miner_power(self.blockstore(), maddr)?
420                .ok_or_else(|| Error::state(format!("Miner for address {maddr} not found")))?;
421
422            let min_pow = spas.miner_nominal_power_meets_consensus_minimum(
423                &self.chain_config().policy,
424                self.blockstore(),
425                maddr,
426            )?;
427            if min_pow {
428                return Ok(Some((m_pow, t_pow)));
429            }
430        }
431
432        Ok(None)
433    }
434
435    // Returns all sectors
436    pub fn get_all_sectors(
437        &self,
438        addr: &Address,
439        ts: &Tipset,
440    ) -> anyhow::Result<Vec<SectorOnChainInfo>> {
441        let actor = self
442            .get_actor(addr, *ts.parent_state())?
443            .ok_or_else(|| Error::state("Miner actor not found"))?;
444        let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
445        state.load_sectors_ext(self.blockstore(), None)
446    }
447}
448
449impl<DB> StateManager<DB>
450where
451    DB: Blockstore + Send + Sync + 'static,
452{
453    /// Load the state of a tipset, including state root, message receipts
454    pub async fn load_tipset_state(self: &Arc<Self>, ts: &Tipset) -> anyhow::Result<TipsetState> {
455        if let Some(state) = self.cache.get_map(ts.key(), |et| et.into()) {
456            Ok(state)
457        } else {
458            match self.chain_store().load_child_tipset(ts)? {
459                Some(receipt_ts) => Ok(TipsetState {
460                    state_root: *receipt_ts.parent_state(),
461                    receipt_root: *receipt_ts.parent_message_receipts(),
462                }),
463                None => Ok(self.load_executed_tipset(ts).await?.into()),
464            }
465        }
466    }
467
468    /// Load an executed tipset, including state root, message receipts and events with caching.
469    pub async fn load_executed_tipset(
470        self: &Arc<Self>,
471        ts: &Tipset,
472    ) -> anyhow::Result<ExecutedTipset> {
473        // validate the existence of state trees for post-chain-head-epoch tipsets in case chain head is reset(e.g. manually or via GC).
474        if ts.epoch() >= self.heaviest_tipset().epoch()
475            && let Some(cached) = self.cache.get(ts.key())
476        {
477            if StateTree::new_from_root(self.blockstore_owned(), &cached.state_root).is_ok() {
478                return Ok(cached);
479            } else {
480                self.cache.remove(ts.key());
481            }
482        }
483        self.cache
484            .get_or_else(ts.key(), || async move {
485                let receipt_ts = self.chain_store().load_child_tipset(ts)?;
486                self.load_executed_tipset_inner(ts, receipt_ts.as_ref())
487                    .await
488            })
489            .await
490    }
491
492    async fn load_executed_tipset_inner(
493        self: &Arc<Self>,
494        msg_ts: &Tipset,
495        // when `msg_ts` is the current head, `receipt_ts` is `None`
496        receipt_ts: Option<&Tipset>,
497    ) -> anyhow::Result<ExecutedTipset> {
498        if let Some(receipt_ts) = receipt_ts {
499            anyhow::ensure!(
500                msg_ts.key() == receipt_ts.parents(),
501                "message tipset should be the parent of message receipt tipset"
502            );
503        }
504        let mut recomputed = false;
505        let (state_root, receipt_root, receipts) = match receipt_ts.and_then(|ts| {
506            let receipt_root = *ts.parent_message_receipts();
507            Receipt::get_receipts(self.cs.blockstore(), receipt_root)
508                .ok()
509                .map(|r| (*ts.parent_state(), receipt_root, r))
510        }) {
511            Some((state_root, receipt_root, receipts)) => (state_root, receipt_root, receipts),
512            None => {
513                let state_output = self
514                    .compute_tipset_state(msg_ts.shallow_clone(), NO_CALLBACK, VMTrace::NotTraced)
515                    .await?;
516                recomputed = true;
517                (
518                    state_output.state_root,
519                    state_output.receipt_root,
520                    Receipt::get_receipts(self.cs.blockstore(), state_output.receipt_root)?,
521                )
522            }
523        };
524
525        let messages = self.chain_store().messages_for_tipset(msg_ts)?;
526        anyhow::ensure!(
527            messages.len() == receipts.len(),
528            "mismatching message and receipt counts ({} messages, {} receipts)",
529            messages.len(),
530            receipts.len()
531        );
532        let mut executed_messages = Vec::with_capacity(messages.len());
533        for (message, receipt) in messages.iter().cloned().zip(receipts) {
534            let events = if let Some(events_root) = receipt.events_root() {
535                Some(
536                    match StampedEvent::get_events(self.cs.blockstore(), &events_root) {
537                        Ok(events) => events,
538                        Err(e) if recomputed => return Err(e),
539                        Err(_) => {
540                            self.compute_tipset_state(
541                                msg_ts.shallow_clone(),
542                                NO_CALLBACK,
543                                VMTrace::NotTraced,
544                            )
545                            .await?;
546                            recomputed = true;
547                            StampedEvent::get_events(self.cs.blockstore(), &events_root)?
548                        }
549                    },
550                )
551            } else {
552                None
553            };
554            executed_messages.push(ExecutedMessage {
555                message,
556                receipt,
557                events,
558            });
559        }
560        Ok(ExecutedTipset {
561            state_root,
562            receipt_root,
563            executed_messages: Arc::new(executed_messages),
564        })
565    }
566
567    #[instrument(skip(self, rand))]
568    fn call_raw(
569        &self,
570        state_cid: Option<Cid>,
571        msg: &Message,
572        rand: ChainRand<DB>,
573        tipset: &Tipset,
574    ) -> Result<ApiInvocResult, Error> {
575        let mut msg = msg.clone();
576
577        let state_cid = state_cid.unwrap_or(*tipset.parent_state());
578
579        let tipset_messages = self
580            .chain_store()
581            .messages_for_tipset(tipset)
582            .map_err(|err| Error::Other(err.to_string()))?;
583
584        let prior_messsages = tipset_messages
585            .iter()
586            .filter(|ts_msg| ts_msg.message().from() == msg.from());
587
588        // Handle state forks
589
590        let height = tipset.epoch();
591        let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
592        let mut vm = VM::new(
593            ExecutionContext {
594                heaviest_tipset: tipset.shallow_clone(),
595                state_tree_root: state_cid,
596                epoch: height,
597                rand: Box::new(rand),
598                base_fee: tipset.block_headers().first().parent_base_fee.clone(),
599                circ_supply: genesis_info.get_vm_circulating_supply(
600                    height,
601                    self.blockstore(),
602                    &state_cid,
603                )?,
604                chain_config: self.chain_config().shallow_clone(),
605                chain_index: self.chain_index().shallow_clone(),
606                timestamp: tipset.min_timestamp(),
607            },
608            &self.engine,
609            VMTrace::Traced,
610        )?;
611
612        for m in prior_messsages {
613            vm.apply_message(m)?;
614        }
615
616        // We flush to get the VM's view of the state tree after applying the above messages
617        // This is needed to get the correct nonce from the actor state to match the VM
618        let state_cid = vm.flush()?;
619
620        let state = StateTree::new_from_root(self.blockstore_owned(), &state_cid)?;
621
622        let from_actor = state
623            .get_actor(&msg.from())?
624            .ok_or_else(|| anyhow::anyhow!("actor not found"))?;
625        msg.set_sequence(from_actor.sequence);
626
627        // Implicit messages need to set a special gas limit
628        let mut msg = msg.clone();
629        msg.gas_limit = IMPLICIT_MESSAGE_GAS_LIMIT as u64;
630
631        let (apply_ret, duration) = vm.apply_implicit_message(&msg)?;
632
633        Ok(ApiInvocResult {
634            msg: msg.clone(),
635            msg_rct: Some(apply_ret.msg_receipt()),
636            msg_cid: msg.cid(),
637            error: apply_ret.failure_info().unwrap_or_default(),
638            duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
639            gas_cost: MessageGasCost::default(),
640            execution_trace: structured::parse_events(apply_ret.exec_trace()).unwrap_or_default(),
641        })
642    }
643
644    /// runs the given message and returns its result without any persisted
645    /// changes.
646    pub fn call(&self, message: &Message, tipset: Option<Tipset>) -> Result<ApiInvocResult, Error> {
647        let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
648        let chain_rand = self.chain_rand(ts.shallow_clone());
649        self.call_raw(None, message, chain_rand, &ts)
650    }
651
652    /// Same as [`StateManager::call`] but runs the message on the given state and not
653    /// on the parent state of the tipset.
654    pub fn call_on_state(
655        &self,
656        state_cid: Cid,
657        message: &Message,
658        tipset: Option<Tipset>,
659    ) -> Result<ApiInvocResult, Error> {
660        let ts = tipset.unwrap_or_else(|| self.cs.heaviest_tipset());
661        let chain_rand = self.chain_rand(ts.shallow_clone());
662        self.call_raw(Some(state_cid), message, chain_rand, &ts)
663    }
664
665    pub async fn apply_on_state_with_gas(
666        self: &Arc<Self>,
667        tipset: Option<Tipset>,
668        msg: Message,
669        vm_flush: VMFlush,
670    ) -> anyhow::Result<(ApiInvocResult, Option<Cid>)> {
671        let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
672
673        let from_a = self.resolve_to_key_addr(&msg.from, &ts).await?;
674
675        // Pretend that the message is signed. This has an influence on the gas
676        // cost. We obviously can't generate a valid signature. Instead, we just
677        // fill the signature with zeros. The validity is not checked.
678        let mut chain_msg = match from_a.protocol() {
679            Protocol::Secp256k1 => SignedMessage::new_unchecked(
680                msg.clone(),
681                Signature::new_secp256k1(vec![0; SECP_SIG_LEN]),
682            )
683            .into(),
684            Protocol::Delegated => SignedMessage::new_unchecked(
685                msg.clone(),
686                // In Lotus, delegated signatures have the same length as SECP256k1.
687                // This may or may not change in the future.
688                Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]),
689            )
690            .into(),
691            _ => msg.clone().into(),
692        };
693
694        let (_invoc_res, apply_ret, duration, state_root) = self
695            .call_with_gas(&mut chain_msg, &[], Some(ts), vm_flush)
696            .await?;
697
698        Ok((
699            ApiInvocResult {
700                msg_cid: msg.cid(),
701                msg,
702                msg_rct: Some(apply_ret.msg_receipt()),
703                error: apply_ret.failure_info().unwrap_or_default(),
704                duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
705                gas_cost: MessageGasCost::default(),
706                execution_trace: structured::parse_events(apply_ret.exec_trace())
707                    .unwrap_or_default(),
708            },
709            state_root,
710        ))
711    }
712
713    /// Computes message on the given [Tipset] state, after applying other
714    /// messages and returns the values computed in the VM.
715    pub async fn call_with_gas(
716        self: &Arc<Self>,
717        message: &mut ChainMessage,
718        prior_messages: &[ChainMessage],
719        tipset: Option<Tipset>,
720        vm_flush: VMFlush,
721    ) -> Result<(InvocResult, ApplyRet, Duration, Option<Cid>), Error> {
722        let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
723        let TipsetState { state_root, .. } = self
724            .load_tipset_state(&ts)
725            .await
726            .map_err(|e| Error::Other(format!("Could not load tipset state: {e:#}")))?;
727        let chain_rand = self.chain_rand(ts.clone());
728
729        // Since we're simulating a future message, pretend we're applying it in the
730        // "next" tipset
731        let epoch = ts.epoch() + 1;
732        let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
733        // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from
734        // FVM, but that introduces some constraints, and possible deadlocks.
735        let (ret, duration, state_cid) = stacker::grow(64 << 20, || -> anyhow::Result<_> {
736            let mut vm = VM::new(
737                ExecutionContext {
738                    heaviest_tipset: ts.clone(),
739                    state_tree_root: state_root,
740                    epoch,
741                    rand: Box::new(chain_rand),
742                    base_fee: ts.block_headers().first().parent_base_fee.clone(),
743                    circ_supply: genesis_info.get_vm_circulating_supply(
744                        epoch,
745                        self.blockstore(),
746                        &state_root,
747                    )?,
748                    chain_config: self.chain_config().shallow_clone(),
749                    chain_index: self.chain_index().shallow_clone(),
750                    timestamp: ts.min_timestamp(),
751                },
752                &self.engine,
753                VMTrace::NotTraced,
754            )?;
755
756            for msg in prior_messages {
757                vm.apply_message(msg)?;
758            }
759            let from_actor = vm
760                .get_actor(&message.from())
761                .map_err(|e| Error::Other(format!("Could not get actor from state: {e:#}")))?
762                .ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?;
763
764            message.set_sequence(from_actor.sequence);
765            let (ret, duration) = vm.apply_message(message)?;
766            let state_root = match vm_flush {
767                VMFlush::Flush => Some(vm.flush()?),
768                VMFlush::Skip => None,
769            };
770            Ok((ret, duration, state_root))
771        })?;
772
773        Ok((
774            InvocResult::new(message.message().clone(), &ret),
775            ret,
776            duration,
777            state_cid,
778        ))
779    }
780
781    /// Replays the given message and returns the result of executing the
782    /// indicated message, assuming it was executed in the indicated tipset.
783    pub async fn replay(self: &Arc<Self>, ts: Tipset, mcid: Cid) -> Result<ApiInvocResult, Error> {
784        let this = Arc::clone(self);
785        tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid)).await?
786    }
787
788    /// Blocking version of `replay`
789    pub fn replay_blocking(
790        self: &Arc<Self>,
791        ts: Tipset,
792        mcid: Cid,
793    ) -> Result<ApiInvocResult, Error> {
794        const REPLAY_HALT: &str = "replay_halt";
795
796        let mut api_invoc_result = None;
797        let callback = |ctx: MessageCallbackCtx<'_>| {
798            match ctx.at {
799                CalledAt::Applied | CalledAt::Reward
800                    if api_invoc_result.is_none() && ctx.cid == mcid =>
801                {
802                    api_invoc_result = Some(ApiInvocResult {
803                        msg_cid: ctx.message.cid(),
804                        msg: ctx.message.message().clone(),
805                        msg_rct: Some(ctx.apply_ret.msg_receipt()),
806                        error: ctx.apply_ret.failure_info().unwrap_or_default(),
807                        duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
808                        gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
809                        execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
810                            .unwrap_or_default(),
811                    });
812                    anyhow::bail!(REPLAY_HALT);
813                }
814                _ => Ok(()), // ignored
815            }
816        };
817        let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced);
818        if let Err(error_message) = result
819            && error_message.to_string() != REPLAY_HALT
820        {
821            return Err(Error::Other(format!(
822                "unexpected error during execution : {error_message:}"
823            )));
824        }
825        api_invoc_result.ok_or_else(|| Error::Other("failed to replay".into()))
826    }
827
828    /// Replays a tipset up to a target message, capturing the state root before
829    /// and after execution.
830    pub async fn replay_for_prestate(
831        self: &Arc<Self>,
832        ts: Tipset,
833        target_message_cid: Cid,
834    ) -> Result<(Cid, ApiInvocResult, Cid), Error> {
835        let this = Arc::clone(self);
836        tokio::task::spawn_blocking(move || {
837            this.replay_for_prestate_blocking(ts, target_message_cid)
838        })
839        .await
840        .map_err(|e| Error::Other(format!("{e}")))?
841    }
842
843    fn replay_for_prestate_blocking(
844        self: &Arc<Self>,
845        ts: Tipset,
846        target_msg_cid: Cid,
847    ) -> Result<(Cid, ApiInvocResult, Cid), Error> {
848        if ts.epoch() == 0 {
849            return Err(Error::Other(
850                "cannot trace messages in the genesis block".into(),
851            ));
852        }
853
854        let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
855        let exec = TipsetExecutor::new(
856            self.chain_index().shallow_clone(),
857            self.chain_config().shallow_clone(),
858            self.beacon_schedule().shallow_clone(),
859            &self.engine,
860            ts.shallow_clone(),
861        );
862        let mut no_cb = NO_CALLBACK;
863        let (parent_state, epoch, block_messages) =
864            exec.prepare_parent_state(genesis_timestamp, VMTrace::NotTraced, &mut no_cb)?;
865
866        Ok(stacker::grow(64 << 20, || {
867            let mut vm =
868                exec.create_vm(parent_state, epoch, ts.min_timestamp(), VMTrace::NotTraced)?;
869            let mut processed = ahash::HashSet::default();
870
871            for block in block_messages.iter() {
872                let mut penalty = TokenAmount::zero();
873                let mut gas_reward = TokenAmount::zero();
874
875                for msg in block.messages.iter() {
876                    let cid = msg.cid();
877                    if processed.contains(&cid) {
878                        continue;
879                    }
880
881                    processed.insert(cid);
882
883                    if cid == target_msg_cid {
884                        let pre_root = vm.flush()?;
885                        let mut traced_vm =
886                            exec.create_vm(pre_root, epoch, ts.min_timestamp(), VMTrace::Traced)?;
887                        let (ret, duration) = traced_vm.apply_message(msg)?;
888                        let post_root = traced_vm.flush()?;
889
890                        return Ok((
891                            pre_root,
892                            ApiInvocResult {
893                                msg_cid: cid,
894                                msg: msg.message().clone(),
895                                msg_rct: Some(ret.msg_receipt()),
896                                error: ret.failure_info().unwrap_or_default(),
897                                duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
898                                gas_cost: MessageGasCost::default(),
899                                execution_trace: structured::parse_events(ret.exec_trace())
900                                    .unwrap_or_default(),
901                            },
902                            post_root,
903                        ));
904                    }
905
906                    let (ret, _) = vm.apply_message(msg)?;
907                    gas_reward += ret.miner_tip();
908                    penalty += ret.penalty();
909                }
910
911                if let Some(rew_msg) =
912                    vm.reward_message(epoch, block.miner, block.win_count, penalty, gas_reward)?
913                {
914                    let (ret, _) = vm.apply_implicit_message(&rew_msg)?;
915                    if let Some(err) = ret.failure_info() {
916                        bail!(
917                            "failed to apply reward message for miner {}: {err}",
918                            block.miner
919                        );
920                    }
921
922                    // This is more of a sanity check, this should not be able to be hit.
923                    if !ret.msg_receipt().exit_code().is_success() {
924                        bail!(
925                            "reward application message failed (exit: {:?})",
926                            ret.msg_receipt().exit_code()
927                        );
928                    }
929                }
930            }
931
932            bail!("message {target_msg_cid} not found in tipset")
933        })?)
934    }
935
936    /// Checks the eligibility of the miner. This is used in the validation that
937    /// a block's miner has the requirements to mine a block.
938    pub fn eligible_to_mine(
939        &self,
940        address: &Address,
941        base_tipset: &Tipset,
942        lookback_tipset: &Tipset,
943    ) -> anyhow::Result<bool, Error> {
944        let hmp =
945            self.miner_has_min_power(&self.chain_config().policy, address, lookback_tipset)?;
946        let version = self.get_network_version(base_tipset.epoch());
947
948        if version <= NetworkVersion::V3 {
949            return Ok(hmp);
950        }
951
952        if !hmp {
953            return Ok(false);
954        }
955
956        let actor = self
957            .get_actor(&Address::POWER_ACTOR, *base_tipset.parent_state())?
958            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
959
960        let power_state = power::State::load(self.blockstore(), actor.code, actor.state)?;
961
962        let actor = self
963            .get_actor(address, *base_tipset.parent_state())?
964            .ok_or_else(|| Error::state("Miner actor address could not be resolved"))?;
965
966        let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
967
968        // Non-empty power claim.
969        let claim = power_state
970            .miner_power(self.blockstore(), address)?
971            .ok_or_else(|| Error::Other("Could not get claim".to_string()))?;
972        if claim.quality_adj_power <= BigInt::zero() {
973            return Ok(false);
974        }
975
976        // No fee debt.
977        if !miner_state.fee_debt().is_zero() {
978            return Ok(false);
979        }
980
981        // No active consensus faults.
982        let info = miner_state.info(self.blockstore())?;
983        if base_tipset.epoch() <= info.consensus_fault_elapsed {
984            return Ok(false);
985        }
986
987        Ok(true)
988    }
989
990    /// Conceptually, a [`Tipset`] consists of _blocks_ which share an _epoch_.
991    /// Each _block_ contains _messages_, which are executed by the _Filecoin Virtual Machine_.
992    ///
993    /// VM message execution essentially looks like this:
994    /// ```text
995    /// state[N-900..N] * message = state[N+1]
996    /// ```
997    ///
998    /// The `state`s above are stored in the `IPLD Blockstore`, and can be referred to by
999    /// a [`Cid`] - the _state root_.
1000    /// The previous 900 states (configurable, see
1001    /// <https://docs.filecoin.io/reference/general/glossary/#finality>) can be
1002    /// queried when executing a message, so a store needs at least that many.
1003    /// (a snapshot typically contains 2000, for example).
1004    ///
1005    /// Each message costs FIL to execute - this is _gas_.
1006    /// After execution, the message has a _receipt_, showing how much gas was spent.
1007    /// This is similarly a [`Cid`] into the block store.
1008    ///
1009    /// For details, see the documentation for [`apply_block_messages`].
1010    ///
1011    pub async fn compute_tipset_state(
1012        self: &Arc<Self>,
1013        tipset: Tipset,
1014        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
1015        enable_tracing: VMTrace,
1016    ) -> Result<ExecutedTipset, Error> {
1017        let this = Arc::clone(self);
1018        tokio::task::spawn_blocking(move || {
1019            this.compute_tipset_state_blocking(tipset, callback, enable_tracing)
1020        })
1021        .await?
1022    }
1023
1024    /// Blocking version of `compute_tipset_state`
1025    pub fn compute_tipset_state_blocking(
1026        &self,
1027        tipset: Tipset,
1028        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
1029        enable_tracing: VMTrace,
1030    ) -> Result<ExecutedTipset, Error> {
1031        let epoch = tipset.epoch();
1032        let has_callback = callback.is_some();
1033        info!(
1034            "Evaluating tipset: EPOCH={epoch}, blocks={}, tsk={}",
1035            tipset.len(),
1036            tipset.key(),
1037        );
1038        Ok(apply_block_messages(
1039            self.chain_store().genesis_block_header().timestamp,
1040            self.chain_index().shallow_clone(),
1041            self.chain_config().shallow_clone(),
1042            self.beacon_schedule().shallow_clone(),
1043            &self.engine,
1044            tipset,
1045            callback,
1046            enable_tracing,
1047        )
1048        .map_err(|e| {
1049            if has_callback {
1050                e
1051            } else {
1052                e.context(format!("Failed to compute tipset state@{epoch}"))
1053            }
1054        })?)
1055    }
1056
1057    #[instrument(skip_all)]
1058    pub async fn compute_state(
1059        self: &Arc<Self>,
1060        height: ChainEpoch,
1061        messages: Vec<Message>,
1062        tipset: Tipset,
1063        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
1064        enable_tracing: VMTrace,
1065    ) -> Result<ExecutedTipset, Error> {
1066        let this = Arc::clone(self);
1067        tokio::task::spawn_blocking(move || {
1068            this.compute_state_blocking(height, messages, tipset, callback, enable_tracing)
1069        })
1070        .await?
1071    }
1072
1073    /// Blocking version of `compute_state`
1074    #[tracing::instrument(skip_all)]
1075    pub fn compute_state_blocking(
1076        &self,
1077        height: ChainEpoch,
1078        messages: Vec<Message>,
1079        tipset: Tipset,
1080        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
1081        enable_tracing: VMTrace,
1082    ) -> Result<ExecutedTipset, Error> {
1083        Ok(compute_state(
1084            height,
1085            messages,
1086            tipset,
1087            self.chain_store().genesis_block_header().timestamp,
1088            self.chain_index().shallow_clone(),
1089            self.chain_config().shallow_clone(),
1090            self.beacon_schedule().shallow_clone(),
1091            &self.engine,
1092            callback,
1093            enable_tracing,
1094        )?)
1095    }
1096
1097    /// Check if tipset had executed the message, by loading the receipt based
1098    /// on the index of the message in the block.
1099    fn tipset_executed_message(
1100        &self,
1101        tipset: &Tipset,
1102        message: &ChainMessage,
1103        allow_replaced: bool,
1104    ) -> Result<Option<Receipt>, Error> {
1105        if tipset.epoch() == 0 {
1106            return Ok(None);
1107        }
1108        let message_from_address = message.from();
1109        let message_sequence = message.sequence();
1110        // Load parent state.
1111        let pts = self
1112            .chain_index()
1113            .load_required_tipset(tipset.parents())
1114            .map_err(|err| Error::Other(format!("Failed to load tipset: {err}")))?;
1115        let messages = self
1116            .cs
1117            .messages_for_tipset(&pts)
1118            .map_err(|err| Error::Other(format!("Failed to load messages for tipset: {err}")))?;
1119        messages
1120            .iter()
1121            .enumerate()
1122            // iterate in reverse because we going backwards through the chain
1123            .rev()
1124            .filter(|(_, s)| {
1125                s.sequence() == message_sequence
1126                    && s.from() == message_from_address
1127                    && s.equal_call(message)
1128            })
1129            .map(|(index, m)| {
1130                // A replacing message is a message with a different CID,
1131                // any of Gas values, and different signature, but with all
1132                // other parameters matching (source/destination, nonce, params, etc.)
1133                if !allow_replaced && message.cid() != m.cid(){
1134                    Err(Error::Other(format!(
1135                        "found message with equal nonce and call params but different CID. wanted {}, found: {}, nonce: {}, from: {}",
1136                        message.cid(),
1137                        m.cid(),
1138                        message.sequence(),
1139                        message.from(),
1140                    )))
1141                } else {
1142                    let block_header = tipset.block_headers().first();
1143                    crate::chain::get_parent_receipt(
1144                        self.blockstore(),
1145                        block_header,
1146                        index,
1147                    )
1148                    .map_err(|err| Error::Other(format!("Failed to get parent receipt (message_receipts={}, index={index}, error={err})", block_header.message_receipts)))
1149                }
1150            })
1151            .next()
1152            .unwrap_or(Ok(None))
1153    }
1154
1155    fn check_search(
1156        &self,
1157        mut current: Tipset,
1158        message: &ChainMessage,
1159        lookback_max_epoch: ChainEpoch,
1160        allow_replaced: bool,
1161    ) -> Result<Option<(Tipset, Receipt)>, Error> {
1162        let message_from_address = message.from();
1163        let message_sequence = message.sequence();
1164        let mut current_actor_state = self
1165            .get_required_actor(&message_from_address, *current.parent_state())
1166            .map_err(Error::state)?;
1167        let message_from_id = self.lookup_required_id(&message_from_address, &current)?;
1168
1169        while current.epoch() >= lookback_max_epoch {
1170            let parent_tipset = self
1171                .chain_index()
1172                .load_required_tipset(current.parents())
1173                .map_err(|err| {
1174                    Error::Other(format!(
1175                        "failed to load tipset during msg wait searchback: {err:}"
1176                    ))
1177                })?;
1178
1179            let parent_actor_state = self
1180                .get_actor(&message_from_id, *parent_tipset.parent_state())
1181                .map_err(|e| Error::State(e.to_string()))?;
1182
1183            if parent_actor_state.is_none()
1184                || (current_actor_state.sequence > message_sequence
1185                    && parent_actor_state.as_ref().unwrap().sequence <= message_sequence)
1186            {
1187                let receipt = self
1188                    .tipset_executed_message(&current, message, allow_replaced)?
1189                    .context("Failed to get receipt with tipset_executed_message")?;
1190                return Ok(Some((current, receipt)));
1191            }
1192
1193            if let Some(parent_actor_state) = parent_actor_state {
1194                current = parent_tipset;
1195                current_actor_state = parent_actor_state;
1196            } else {
1197                break;
1198            }
1199        }
1200
1201        Ok(None)
1202    }
1203
1204    /// Searches backwards through the chain for a message receipt.
1205    fn search_back_for_message(
1206        &self,
1207        current: Tipset,
1208        message: &ChainMessage,
1209        look_back_limit: Option<i64>,
1210        allow_replaced: Option<bool>,
1211    ) -> Result<Option<(Tipset, Receipt)>, Error> {
1212        let current_epoch = current.epoch();
1213        let allow_replaced = allow_replaced.unwrap_or(true);
1214
1215        // Calculate the max lookback epoch (inclusive lower bound) for the search.
1216        let lookback_max_epoch = match look_back_limit {
1217            // No search: limit = 0 means search 0 epochs
1218            Some(0) => return Ok(None),
1219            // Limited search: calculate the inclusive lower bound, clamped to genesis
1220            // Example: limit=5 at epoch=1000 → min_epoch=996, searches [996,1000] = 5 epochs
1221            // Example: limit=2000 at epoch=1000 → min_epoch=0, searches [0,1000] = 1001 epochs (all available)
1222            Some(limit) if limit > 0 => (current_epoch - limit + 1).max(0),
1223            // Search all the way to genesis (epoch 0)
1224            _ => 0,
1225        };
1226
1227        self.check_search(current, message, lookback_max_epoch, allow_replaced)
1228    }
1229
1230    /// Returns a message receipt from a given tipset and message CID.
1231    pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result<Receipt, Error> {
1232        let m = crate::chain::get_chain_message(self.blockstore(), &msg)
1233            .map_err(|e| Error::Other(e.to_string()))?;
1234        let message_receipt = self.tipset_executed_message(&tipset, &m, true)?;
1235        if let Some(receipt) = message_receipt {
1236            return Ok(receipt);
1237        }
1238
1239        let maybe_tuple = self.search_back_for_message(tipset, &m, None, None)?;
1240        let message_receipt = maybe_tuple
1241            .ok_or_else(|| {
1242                Error::Other("Could not get receipt from search back message".to_string())
1243            })?
1244            .1;
1245        Ok(message_receipt)
1246    }
1247
1248    /// `WaitForMessage` blocks until a message appears on chain. It looks
1249    /// backwards in the chain to see if this has already happened. It
1250    /// guarantees that the message has been on chain for at least
1251    /// confidence epochs without being reverted before returning.
1252    pub async fn wait_for_message(
1253        self: &Arc<Self>,
1254        msg_cid: Cid,
1255        confidence: i64,
1256        look_back_limit: Option<ChainEpoch>,
1257        allow_replaced: Option<bool>,
1258    ) -> Result<(Option<Tipset>, Option<Receipt>), Error> {
1259        let mut head_changes_rx = self.cs.subscribe_head_changes();
1260        let (sender, mut receiver) = oneshot::channel::<()>();
1261        let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1262            .map_err(|err| Error::Other(format!("failed to load message {err:}")))?;
1263        let current_tipset = self.heaviest_tipset();
1264        let maybe_message_receipt =
1265            self.tipset_executed_message(&current_tipset, &message, true)?;
1266        if let Some(r) = maybe_message_receipt {
1267            return Ok((Some(current_tipset.shallow_clone()), Some(r)));
1268        }
1269
1270        let mut candidate_tipset: Option<Tipset> = None;
1271        let mut candidate_receipt: Option<Receipt> = None;
1272
1273        let sm_cloned = self.shallow_clone();
1274
1275        let message_for_task = message.clone();
1276        let height_of_head = current_tipset.epoch();
1277        let task = tokio::task::spawn(async move {
1278            let back_tuple = sm_cloned.search_back_for_message(
1279                current_tipset,
1280                &message_for_task,
1281                look_back_limit,
1282                allow_replaced,
1283            )?;
1284            sender
1285                .send(())
1286                .map_err(|e| Error::Other(format!("Could not send to channel {e:?}")))?;
1287            Ok::<_, Error>(back_tuple)
1288        });
1289
1290        let reverts: Arc<RwLock<HashMap<TipsetKey, bool>>> = Arc::new(RwLock::new(HashMap::new()));
1291        let block_revert = reverts.clone();
1292        let sm_cloned = Arc::clone(self);
1293
1294        // Wait for message to be included in head change.
1295        let mut subscriber_poll = tokio::task::spawn(async move {
1296            loop {
1297                match head_changes_rx.recv().await {
1298                    Ok(head_changes) => {
1299                        for tipset in head_changes.reverts {
1300                            if candidate_tipset
1301                                .as_ref()
1302                                .is_some_and(|candidate| candidate.key() == tipset.key())
1303                            {
1304                                candidate_tipset = None;
1305                                candidate_receipt = None;
1306                            }
1307                        }
1308                        for tipset in head_changes.applies {
1309                            if candidate_tipset
1310                                .as_ref()
1311                                .map(|s| tipset.epoch() >= s.epoch() + confidence)
1312                                .unwrap_or_default()
1313                            {
1314                                return Ok((candidate_tipset, candidate_receipt));
1315                            }
1316                            let poll_receiver = receiver.try_recv();
1317                            if let Ok(Some(_)) = poll_receiver {
1318                                block_revert
1319                                    .write()
1320                                    .await
1321                                    .insert(tipset.key().to_owned(), true);
1322                            }
1323
1324                            let maybe_receipt =
1325                                sm_cloned.tipset_executed_message(&tipset, &message, true)?;
1326                            if let Some(receipt) = maybe_receipt {
1327                                if confidence == 0 {
1328                                    return Ok((Some(tipset), Some(receipt)));
1329                                }
1330                                candidate_tipset = Some(tipset);
1331                                candidate_receipt = Some(receipt)
1332                            }
1333                        }
1334                    }
1335                    Err(RecvError::Lagged(i)) => {
1336                        warn!(
1337                            "wait for message head change subscriber lagged, skipped {} events",
1338                            i
1339                        );
1340                    }
1341                    Err(RecvError::Closed) => break,
1342                }
1343            }
1344            Ok((None, None))
1345        })
1346        .fuse();
1347
1348        // Search backwards for message.
1349        let mut search_back_poll = tokio::task::spawn(async move {
1350            let back_tuple = task.await.map_err(|e| {
1351                Error::Other(format!("Could not search backwards for message {e}"))
1352            })??;
1353            if let Some((back_tipset, back_receipt)) = back_tuple {
1354                let should_revert = *reverts
1355                    .read()
1356                    .await
1357                    .get(back_tipset.key())
1358                    .unwrap_or(&false);
1359                let larger_height_of_head = height_of_head >= back_tipset.epoch() + confidence;
1360                if !should_revert && larger_height_of_head {
1361                    return Ok::<_, Error>((Some(back_tipset), Some(back_receipt)));
1362                }
1363                return Ok((None, None));
1364            }
1365            Ok((None, None))
1366        })
1367        .fuse();
1368
1369        // Await on first future to finish.
1370        loop {
1371            select! {
1372                res = subscriber_poll => {
1373                    return res?
1374                }
1375                res = search_back_poll => {
1376                    if let Ok((Some(ts), Some(rct))) = res? {
1377                        return Ok((Some(ts), Some(rct)));
1378                    }
1379                }
1380            }
1381        }
1382    }
1383
1384    pub async fn search_for_message(
1385        &self,
1386        from: Option<Tipset>,
1387        msg_cid: Cid,
1388        look_back_limit: Option<i64>,
1389        allow_replaced: Option<bool>,
1390    ) -> Result<Option<(Tipset, Receipt)>, Error> {
1391        let from = from.unwrap_or_else(|| self.heaviest_tipset());
1392        let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1393            .map_err(|err| Error::Other(format!("failed to load message {err}")))?;
1394        let current_tipset = self.heaviest_tipset();
1395        let maybe_message_receipt =
1396            self.tipset_executed_message(&from, &message, allow_replaced.unwrap_or(true))?;
1397        if let Some(r) = maybe_message_receipt {
1398            Ok(Some((from, r)))
1399        } else {
1400            self.search_back_for_message(current_tipset, &message, look_back_limit, allow_replaced)
1401        }
1402    }
1403
1404    /// Returns a BLS public key from provided address
1405    pub fn get_bls_public_key(
1406        db: &Arc<DB>,
1407        addr: &Address,
1408        state_cid: Cid,
1409    ) -> Result<BlsPublicKey, Error> {
1410        let state = StateTree::new_from_root(Arc::clone(db), &state_cid)
1411            .map_err(|e| Error::Other(e.to_string()))?;
1412        let kaddr =
1413            resolve_to_key_addr(&state, db, addr).context("Failed to resolve key address")?;
1414
1415        match kaddr.into_payload() {
1416            Payload::BLS(key) => BlsPublicKey::from_bytes(&key)
1417                .context("Failed to construct bls public key")
1418                .map_err(Error::from),
1419            _ => Err(Error::state(
1420                "Address must be BLS address to load bls public key",
1421            )),
1422        }
1423    }
1424
1425    /// Looks up ID [Address] from the state at the given [Tipset].
1426    pub fn lookup_id(&self, addr: &Address, ts: &Tipset) -> Result<Option<Address>, Error> {
1427        let state_tree = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1428            .map_err(|e| format!("{e:?}"))?;
1429        Ok(state_tree
1430            .lookup_id(addr)
1431            .map_err(|e| Error::Other(e.to_string()))?
1432            .map(Address::new_id))
1433    }
1434
1435    /// Looks up required ID [Address] from the state at the given [Tipset].
1436    pub fn lookup_required_id(&self, addr: &Address, ts: &Tipset) -> Result<Address, Error> {
1437        self.lookup_id(addr, ts)?
1438            .ok_or_else(|| Error::Other(format!("Failed to lookup the id address {addr}")))
1439    }
1440
1441    /// Retrieves market state
1442    pub fn market_state(&self, ts: &Tipset) -> Result<market::State, Error> {
1443        let actor = self.get_required_actor(&Address::MARKET_ACTOR, *ts.parent_state())?;
1444        let market_state = market::State::load(self.blockstore(), actor.code, actor.state)?;
1445        Ok(market_state)
1446    }
1447
1448    /// Retrieves market balance in escrow and locked tables.
1449    pub fn market_balance(&self, addr: &Address, ts: &Tipset) -> Result<MarketBalance, Error> {
1450        let market_state = self.market_state(ts)?;
1451        let new_addr = self.lookup_required_id(addr, ts)?;
1452        let out = MarketBalance {
1453            escrow: {
1454                market_state
1455                    .escrow_table(self.blockstore())?
1456                    .get(&new_addr)?
1457            },
1458            locked: {
1459                market_state
1460                    .locked_table(self.blockstore())?
1461                    .get(&new_addr)?
1462            },
1463        };
1464
1465        Ok(out)
1466    }
1467
1468    /// Retrieves miner info.
1469    pub fn miner_info(&self, addr: &Address, ts: &Tipset) -> Result<MinerInfo, Error> {
1470        let actor = self
1471            .get_actor(addr, *ts.parent_state())?
1472            .ok_or_else(|| Error::state("Miner actor not found"))?;
1473        let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1474
1475        Ok(state.info(self.blockstore())?)
1476    }
1477
1478    /// Retrieves miner faults.
1479    pub fn miner_faults(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1480        self.all_partition_sectors(addr, ts, |partition| partition.faulty_sectors().clone())
1481    }
1482
1483    /// Retrieves miner recoveries.
1484    pub fn miner_recoveries(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1485        self.all_partition_sectors(addr, ts, |partition| partition.recovering_sectors().clone())
1486    }
1487
1488    fn all_partition_sectors(
1489        &self,
1490        addr: &Address,
1491        ts: &Tipset,
1492        get_sector: impl Fn(Partition<'_>) -> BitField,
1493    ) -> Result<BitField, Error> {
1494        let actor = self
1495            .get_actor(addr, *ts.parent_state())?
1496            .ok_or_else(|| Error::state("Miner actor not found"))?;
1497
1498        let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1499
1500        let mut partitions = Vec::new();
1501
1502        state.for_each_deadline(
1503            &self.chain_config().policy,
1504            self.blockstore(),
1505            |_, deadline| {
1506                deadline.for_each(self.blockstore(), |_, partition| {
1507                    partitions.push(get_sector(partition));
1508                    Ok(())
1509                })
1510            },
1511        )?;
1512
1513        Ok(BitField::union(partitions.iter()))
1514    }
1515
1516    /// Retrieves miner power.
1517    pub fn miner_power(&self, addr: &Address, ts: &Tipset) -> Result<MinerPower, Error> {
1518        if let Some((miner_power, total_power)) = self.get_power(ts.parent_state(), Some(addr))? {
1519            return Ok(MinerPower {
1520                miner_power,
1521                total_power,
1522                has_min_power: true,
1523            });
1524        }
1525
1526        Ok(MinerPower {
1527            has_min_power: false,
1528            miner_power: Default::default(),
1529            total_power: Default::default(),
1530        })
1531    }
1532
1533    /// Similar to `resolve_to_key_addr` in the `forest_vm` [`crate::state_manager`] but does not
1534    /// allow `Actor` type of addresses. Uses `ts` to generate the VM state.
1535    pub async fn resolve_to_key_addr(
1536        self: &Arc<Self>,
1537        addr: &Address,
1538        ts: &Tipset,
1539    ) -> anyhow::Result<Address> {
1540        match addr.protocol() {
1541            Protocol::BLS | Protocol::Secp256k1 | Protocol::Delegated => return Ok(*addr),
1542            Protocol::Actor => {
1543                return Err(Error::Other(
1544                    "cannot resolve actor address to key address".to_string(),
1545                )
1546                .into());
1547            }
1548            _ => {}
1549        };
1550
1551        // First try to resolve the actor in the parent state, so we don't have to
1552        // compute anything.
1553        let state = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())?;
1554        if let Ok(addr) = resolve_to_key_addr(&state, self.blockstore(), addr) {
1555            return Ok(addr);
1556        }
1557
1558        // If that fails, compute the tip-set and try again.
1559        let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?;
1560        let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
1561
1562        resolve_to_key_addr(&state, self.blockstore(), addr)
1563    }
1564
1565    pub async fn miner_get_base_info(
1566        self: &Arc<Self>,
1567        beacon_schedule: &BeaconSchedule,
1568        tipset: Tipset,
1569        addr: Address,
1570        epoch: ChainEpoch,
1571    ) -> anyhow::Result<Option<MiningBaseInfo>> {
1572        let prev_beacon = self
1573            .chain_store()
1574            .chain_index()
1575            .latest_beacon_entry(tipset.clone())?;
1576
1577        let entries: Vec<BeaconEntry> = beacon_schedule
1578            .beacon_entries_for_block(
1579                self.chain_config().network_version(epoch),
1580                epoch,
1581                tipset.epoch(),
1582                &prev_beacon,
1583            )
1584            .await?;
1585
1586        let base = entries.last().unwrap_or(&prev_beacon);
1587
1588        let (lb_tipset, lb_state_root) = ChainStore::get_lookback_tipset_for_round(
1589            self.chain_index(),
1590            self.chain_config(),
1591            &tipset,
1592            epoch,
1593        )?;
1594
1595        // If the miner actor doesn't exist in the current tipset, it is a
1596        // user-error and we must return an error message. If the miner exists
1597        // in the current tipset but not in the lookback tipset, we may not
1598        // error and should instead return None.
1599        let actor = self.get_required_actor(&addr, *tipset.parent_state())?;
1600        if self.get_actor(&addr, lb_state_root)?.is_none() {
1601            return Ok(None);
1602        }
1603
1604        let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1605
1606        let addr_buf = to_vec(&addr)?;
1607        let rand = draw_randomness(
1608            base.signature(),
1609            DomainSeparationTag::WinningPoStChallengeSeed as i64,
1610            epoch,
1611            &addr_buf,
1612        )?;
1613
1614        let network_version = self.chain_config().network_version(tipset.epoch());
1615        let sectors = self.get_sectors_for_winning_post(
1616            &lb_state_root,
1617            network_version,
1618            &addr,
1619            Randomness::new(rand.to_vec()),
1620        )?;
1621
1622        if sectors.is_empty() {
1623            return Ok(None);
1624        }
1625
1626        let (miner_power, total_power) = self
1627            .get_power(&lb_state_root, Some(&addr))?
1628            .context("failed to get power")?;
1629
1630        let info = miner_state.info(self.blockstore())?;
1631
1632        let worker_key = self
1633            .resolve_to_deterministic_address(info.worker, &tipset)
1634            .await?;
1635        let eligible = self.eligible_to_mine(&addr, &tipset, &lb_tipset)?;
1636
1637        Ok(Some(MiningBaseInfo {
1638            miner_power: miner_power.quality_adj_power,
1639            network_power: total_power.quality_adj_power,
1640            sectors,
1641            worker_key,
1642            sector_size: info.sector_size,
1643            prev_beacon_entry: prev_beacon,
1644            beacon_entries: entries,
1645            eligible_for_mining: eligible,
1646        }))
1647    }
1648
1649    /// Checks power actor state for if miner meets consensus minimum
1650    /// requirements.
1651    pub fn miner_has_min_power(
1652        &self,
1653        policy: &Policy,
1654        addr: &Address,
1655        ts: &Tipset,
1656    ) -> anyhow::Result<bool> {
1657        let actor = self
1658            .get_actor(&Address::POWER_ACTOR, *ts.parent_state())?
1659            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
1660        let ps = power::State::load(self.blockstore(), actor.code, actor.state)?;
1661
1662        ps.miner_nominal_power_meets_consensus_minimum(policy, self.blockstore(), addr)
1663    }
1664
1665    /// Validates all tipsets at epoch `start..=end` behind the heaviest tipset.
1666    ///
1667    /// Tipsets are processed sequentially. The compute-intensive work inside each
1668    /// tipset (`bellperson` proof verification, FVM batch seal verification, etc.)
1669    /// is already heavily rayon-parallelized. Parallelizing the outer loop actually introduces
1670    /// some issues due to locks in the aforementioned crates. So don't do it.
1671    ///
1672    /// # What is validation?
1673    /// Every state transition returns a new _state root_, which is typically retained in, e.g., snapshots.
1674    /// For "full" snapshots, all state roots are retained.
1675    /// For standard snapshots, the last 2000 or so state roots are retained.
1676    ///
1677    /// _receipts_ meanwhile, are typically ephemeral, but each tipset knows the _receipt root_
1678    /// (hash) of the previous tipset.
1679    ///
1680    /// This function takes advantage of that fact to validate tipsets:
1681    /// - `tipset[N]` claims that `receipt_root[N-1]` should be `0xDEADBEEF`
1682    /// - find `tipset[N-1]`, and perform its state transition to get the actual `receipt_root`
1683    /// - assert that they match
1684    ///
1685    /// See [`Self::compute_tipset_state_blocking`] for an explanation of state transitions.
1686    #[tracing::instrument(skip(self))]
1687    pub fn validate_range(&self, epochs: RangeInclusive<i64>) -> anyhow::Result<()> {
1688        let heaviest = self.heaviest_tipset();
1689        let heaviest_epoch = heaviest.epoch();
1690        let end = self.chain_index().load_required_tipset_by_height(
1691            *epochs.end(),
1692            heaviest,
1693            ResolveNullTipset::TakeOlder,
1694        ).with_context(|| {
1695            format!(
1696        "couldn't get a tipset at height {} behind heaviest tipset at height {heaviest_epoch}",
1697        *epochs.end(),
1698    )})?;
1699
1700        // lookup tipset parents as we go along, iterating DOWN from `end`
1701        let tipsets = end
1702            .chain(self.blockstore())
1703            .take_while(|ts| ts.epoch() >= *epochs.start());
1704
1705        self.validate_tipsets(tipsets)
1706    }
1707
1708    pub fn validate_tipsets<T>(&self, tipsets: T) -> anyhow::Result<()>
1709    where
1710        T: Iterator<Item = Tipset> + Send,
1711    {
1712        let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1713        validate_tipsets(
1714            genesis_timestamp,
1715            self.chain_index(),
1716            self.chain_config(),
1717            self.beacon_schedule(),
1718            &self.engine,
1719            tipsets,
1720        )
1721    }
1722
1723    pub fn get_verified_registry_actor_state(
1724        &self,
1725        ts: &Tipset,
1726    ) -> anyhow::Result<verifreg::State> {
1727        let act = self
1728            .get_actor(&Address::VERIFIED_REGISTRY_ACTOR, *ts.parent_state())
1729            .map_err(Error::state)?
1730            .ok_or_else(|| Error::state("actor not found"))?;
1731        verifreg::State::load(self.blockstore(), act.code, act.state)
1732    }
1733    pub fn get_claim(
1734        &self,
1735        addr: &Address,
1736        ts: &Tipset,
1737        claim_id: ClaimID,
1738    ) -> anyhow::Result<Option<Claim>> {
1739        let id_address = self.lookup_required_id(addr, ts)?;
1740        let state = self.get_verified_registry_actor_state(ts)?;
1741        state.get_claim(self.blockstore(), id_address, claim_id)
1742    }
1743
1744    pub fn get_all_claims(&self, ts: &Tipset) -> anyhow::Result<HashMap<ClaimID, Claim>> {
1745        let state = self.get_verified_registry_actor_state(ts)?;
1746        state.get_all_claims(self.blockstore())
1747    }
1748
1749    pub fn get_allocation(
1750        &self,
1751        addr: &Address,
1752        ts: &Tipset,
1753        allocation_id: AllocationID,
1754    ) -> anyhow::Result<Option<Allocation>> {
1755        let id_address = self.lookup_required_id(addr, ts)?;
1756        let state = self.get_verified_registry_actor_state(ts)?;
1757        state.get_allocation(self.blockstore(), id_address.id()?, allocation_id)
1758    }
1759
1760    pub fn get_all_allocations(
1761        &self,
1762        ts: &Tipset,
1763    ) -> anyhow::Result<HashMap<AllocationID, Allocation>> {
1764        let state = self.get_verified_registry_actor_state(ts)?;
1765        state.get_all_allocations(self.blockstore())
1766    }
1767
1768    pub fn verified_client_status(
1769        &self,
1770        addr: &Address,
1771        ts: &Tipset,
1772    ) -> anyhow::Result<Option<DataCap>> {
1773        let id = self.lookup_required_id(addr, ts)?;
1774        let network_version = self.get_network_version(ts.epoch());
1775
1776        // This is a copy of Lotus code, we need to treat all the actors below version 9
1777        // differently. Which maps to network below version 17.
1778        // Original: https://github.com/filecoin-project/lotus/blob/5e76b05b17771da6939c7b0bf65127c3dc70ee23/node/impl/full/state.go#L1627-L1664.
1779        if (u32::from(network_version.0)) < 17 {
1780            let state = self.get_verified_registry_actor_state(ts)?;
1781            return state.verified_client_data_cap(self.blockstore(), id);
1782        }
1783
1784        let act = self
1785            .get_actor(&Address::DATACAP_TOKEN_ACTOR, *ts.parent_state())
1786            .map_err(Error::state)?
1787            .ok_or_else(|| Error::state("Miner actor not found"))?;
1788
1789        let state = datacap::State::load(self.blockstore(), act.code, act.state)?;
1790
1791        state.verified_client_data_cap(self.blockstore(), id)
1792    }
1793
1794    /// Similar to [`StateTree::resolve_to_deterministic_addr`] but does not allow [`crate::shim::address::Protocol::Actor`] type of addresses.
1795    /// Uses the [`Tipset`] `ts` to generate the VM state.
1796    pub async fn resolve_to_deterministic_address(
1797        self: &Arc<Self>,
1798        address: Address,
1799        ts: &Tipset,
1800    ) -> anyhow::Result<Address> {
1801        use crate::shim::address::Protocol::*;
1802        match address.protocol() {
1803            BLS | Secp256k1 | Delegated => Ok(address),
1804            Actor => anyhow::bail!("cannot resolve actor address to key address"),
1805            ID => {
1806                let id = address.id()?;
1807                if let Some(cached) = self.id_to_deterministic_address_cache.get_cloned(&id) {
1808                    return Ok(cached);
1809                }
1810                // First try to resolve the actor in the parent state, so we don't have to compute anything.
1811                let resolved = if let Ok(state) =
1812                    StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1813                    && let Ok(address) = state
1814                        .resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
1815                {
1816                    address
1817                } else {
1818                    // If that fails, compute the tip-set and try again.
1819                    let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?;
1820                    let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
1821                    state.resolve_to_deterministic_addr(self.chain_store().blockstore(), address)?
1822                };
1823                self.id_to_deterministic_address_cache.push(id, resolved);
1824                Ok(resolved)
1825            }
1826        }
1827    }
1828
1829    pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec<ApiInvocResult>)> {
1830        let mut invoc_trace = vec![];
1831
1832        let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1833
1834        let callback = |ctx: MessageCallbackCtx<'_>| {
1835            match ctx.at {
1836                CalledAt::Applied | CalledAt::Reward => {
1837                    invoc_trace.push(ApiInvocResult {
1838                        msg_cid: ctx.message.cid(),
1839                        msg: ctx.message.message().clone(),
1840                        msg_rct: Some(ctx.apply_ret.msg_receipt()),
1841                        error: ctx.apply_ret.failure_info().unwrap_or_default(),
1842                        duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
1843                        gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
1844                        execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
1845                            .unwrap_or_default(),
1846                    });
1847                    Ok(())
1848                }
1849                _ => Ok(()), // ignored
1850            }
1851        };
1852
1853        let ExecutedTipset { state_root, .. } = apply_block_messages(
1854            genesis_timestamp,
1855            self.chain_index().shallow_clone(),
1856            self.chain_config().shallow_clone(),
1857            self.beacon_schedule().shallow_clone(),
1858            &self.engine,
1859            tipset.shallow_clone(),
1860            Some(callback),
1861            VMTrace::Traced,
1862        )?;
1863
1864        Ok((state_root, invoc_trace))
1865    }
1866}
1867
1868pub fn validate_tipsets<DB, T>(
1869    genesis_timestamp: u64,
1870    chain_index: &ChainIndex<DB>,
1871    chain_config: &Arc<ChainConfig>,
1872    beacon: &Arc<BeaconSchedule>,
1873    engine: &MultiEngine,
1874    tipsets: T,
1875) -> anyhow::Result<()>
1876where
1877    DB: Blockstore + Send + Sync + 'static,
1878    T: Iterator<Item = Tipset> + Send,
1879{
1880    // Validate one tipset at a time. Parallelizing the outer loop across tipsets
1881    // might wedge the global rayon pool.
1882    // Sequential outer iteration leaves the entire rayon pool free for that
1883    // already-rich inner parallelism.
1884    for (child, parent) in tipsets.tuple_windows() {
1885        info!(height = parent.epoch(), "compute parent state");
1886        let ExecutedTipset {
1887            state_root: actual_state,
1888            receipt_root: actual_receipt,
1889            ..
1890        } = apply_block_messages(
1891            genesis_timestamp,
1892            chain_index.shallow_clone(),
1893            chain_config.shallow_clone(),
1894            beacon.shallow_clone(),
1895            engine,
1896            parent,
1897            NO_CALLBACK,
1898            VMTrace::NotTraced,
1899        )
1900        .context("couldn't compute tipset state")?;
1901        let expected_receipt = child.min_ticket_block().message_receipts;
1902        let expected_state = child.parent_state();
1903        if (expected_state, expected_receipt) != (&actual_state, actual_receipt) {
1904            error!(
1905                height = child.epoch(),
1906                ?expected_state,
1907                ?expected_receipt,
1908                ?actual_state,
1909                ?actual_receipt,
1910                "state mismatch"
1911            );
1912            bail!("state mismatch");
1913        }
1914    }
1915    Ok(())
1916}
1917
1918/// Shared context for creating VMs and preparing tipset state.
1919///
1920/// Encapsulates randomness source, genesis info, VM construction,
1921/// null-epoch cron handling, and state migrations.
1922struct TipsetExecutor<'a, DB: Blockstore + Send + Sync + 'static> {
1923    tipset: Tipset,
1924    rand: ChainRand<DB>,
1925    chain_config: Arc<ChainConfig>,
1926    chain_index: ChainIndex<DB>,
1927    genesis_info: GenesisInfo,
1928    engine: &'a MultiEngine,
1929}
1930
1931impl<'a, DB: Blockstore + Send + Sync + 'static> TipsetExecutor<'a, DB> {
1932    fn new(
1933        chain_index: ChainIndex<DB>,
1934        chain_config: Arc<ChainConfig>,
1935        beacon: Arc<BeaconSchedule>,
1936        engine: &'a MultiEngine,
1937        tipset: Tipset,
1938    ) -> Self {
1939        let rand = ChainRand::new(
1940            chain_config.shallow_clone(),
1941            tipset.shallow_clone(),
1942            chain_index.shallow_clone(),
1943            beacon,
1944        );
1945        let genesis_info = GenesisInfo::from_chain_config(chain_config.shallow_clone());
1946        Self {
1947            tipset,
1948            rand,
1949            chain_config,
1950            chain_index,
1951            genesis_info,
1952            engine,
1953        }
1954    }
1955
1956    fn create_vm(
1957        &self,
1958        state_root: Cid,
1959        epoch: ChainEpoch,
1960        timestamp: u64,
1961        trace: VMTrace,
1962    ) -> anyhow::Result<VM<DB>> {
1963        let circ_supply = self.genesis_info.get_vm_circulating_supply(
1964            epoch,
1965            self.chain_index.db(),
1966            &state_root,
1967        )?;
1968        VM::new(
1969            ExecutionContext {
1970                heaviest_tipset: self.tipset.shallow_clone(),
1971                state_tree_root: state_root,
1972                epoch,
1973                rand: Box::new(self.rand.shallow_clone()),
1974                base_fee: self.tipset.min_ticket_block().parent_base_fee.clone(),
1975                circ_supply,
1976                chain_config: self.chain_config.shallow_clone(),
1977                chain_index: self.chain_index.shallow_clone(),
1978                timestamp,
1979            },
1980            self.engine,
1981            trace,
1982        )
1983    }
1984
1985    /// Produces the state root ready for message execution by running
1986    /// null-epoch `crons` and any pending state migrations.
1987    fn prepare_parent_state<F>(
1988        &self,
1989        genesis_timestamp: u64,
1990        null_epoch_trace: VMTrace,
1991        cron_callback: &mut Option<F>,
1992    ) -> anyhow::Result<(Cid, ChainEpoch, Vec<BlockMessages>)>
1993    where
1994        F: FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>,
1995    {
1996        use crate::shim::clock::EPOCH_DURATION_SECONDS;
1997
1998        let mut parent_state = *self.tipset.parent_state();
1999        let parent_epoch = self
2000            .chain_index
2001            .load_required_tipset(self.tipset.parents())?
2002            .epoch();
2003        let epoch = self.tipset.epoch();
2004
2005        for epoch_i in parent_epoch..epoch {
2006            if epoch_i > parent_epoch {
2007                let timestamp = genesis_timestamp + ((EPOCH_DURATION_SECONDS * epoch_i) as u64);
2008                parent_state = stacker::grow(64 << 20, || -> anyhow::Result<Cid> {
2009                    let mut vm =
2010                        self.create_vm(parent_state, epoch_i, timestamp, null_epoch_trace)?;
2011                    if let Err(e) = vm.run_cron(epoch_i, cron_callback.as_mut()) {
2012                        error!("Beginning of epoch cron failed to run: {e:#}");
2013                        return Err(e);
2014                    }
2015                    vm.flush()
2016                })?;
2017            }
2018            if let Some(new_state) = run_state_migrations(
2019                epoch_i,
2020                &self.chain_config,
2021                self.chain_index.db(),
2022                &parent_state,
2023            )? {
2024                parent_state = new_state;
2025            }
2026        }
2027
2028        let block_messages = BlockMessages::for_tipset(self.chain_index.db(), &self.tipset)?;
2029        Ok((parent_state, epoch, block_messages))
2030    }
2031}
2032
2033/// Messages are transactions that produce new states. The state (usually
2034/// referred to as the 'state-tree') is a mapping from actor addresses to actor
2035/// states. Each block contains the hash of the state-tree that should be used
2036/// as the starting state when executing the block messages.
2037///
2038/// # Execution environment
2039///
2040/// Transaction execution has the following inputs:
2041/// - a current state-tree (stored as IPLD in a key-value database). This
2042///   reference is in [`Tipset::parent_state`].
2043/// - up to 900 past state-trees. See
2044///   <https://docs.filecoin.io/reference/general/glossary/#finality>.
2045/// - up to 900 past tipset IDs.
2046/// - a deterministic source of randomness.
2047/// - the circulating supply of FIL (see
2048///   <https://filecoin.io/blog/filecoin-circulating-supply/>). The circulating
2049///   supply is determined by the epoch and the states of a few key actors.
2050/// - the base fee (see <https://spec.filecoin.io/systems/filecoin_vm/gas_fee/>).
2051///   This value is defined by `tipset.parent_base_fee`.
2052/// - the genesis timestamp (UNIX epoch time when the first block was
2053///   mined/created).
2054/// - a chain configuration (maps epoch to network version, has chain specific
2055///   settings).
2056///
2057/// The result of running a set of block messages is an index to the final
2058/// state-tree and an index to an array of message receipts (listing gas used,
2059/// return codes, etc).
2060///
2061/// # Cron and null tipsets
2062///
2063/// Once per epoch, after all messages have run, a special 'cron' transaction
2064/// must be executed. The tasks of the 'cron' transaction include running batch
2065/// jobs and keeping the state up-to-date with the current epoch.
2066///
2067/// It can happen that no blocks are mined in an epoch. The tipset for such an
2068/// epoch is called a null tipset. A null tipset has no identity and cannot be
2069/// directly executed. This is a problem for 'cron' which must run for every
2070/// epoch, even if there are no messages. The fix is to run 'cron' if there are
2071/// any null tipsets between the current epoch and the parent epoch.
2072///
2073/// Imagine the blockchain looks like this with a null tipset at epoch 9:
2074///
2075/// ```text
2076/// ┌────────┐ ┌────┐ ┌───────┐  ┌───────┐
2077/// │Epoch 10│ │Null│ │Epoch 8├──►Epoch 7├─►
2078/// └───┬────┘ └────┘ └───▲───┘  └───────┘
2079///     └─────────────────┘
2080/// ```
2081///
2082/// The parent of tipset-epoch-10 is tipset-epoch-8. Before executing the
2083/// messages in epoch 10, we have to run cron for epoch 9. However, running
2084/// 'cron' requires the timestamp of the youngest block in the tipset (which
2085/// doesn't exist because there are no blocks in the tipset). Lotus dictates that
2086/// the timestamp of a null tipset is `30s * epoch` after the genesis timestamp.
2087/// So, in the above example, if the genesis block was mined at time `X`, the
2088/// null tipset for epoch 9 will have timestamp `X + 30 * 9`.
2089///
2090/// # Migrations
2091///
2092/// Migrations happen between network upgrades and modify the state tree. If a
2093/// migration is scheduled for epoch 10, it will be run _after_ the messages for
2094/// epoch 10. The tipset for epoch 11 will link the state-tree produced by the
2095/// migration.
2096///
2097/// Example timeline with a migration at epoch 10:
2098///   1. Tipset-epoch-10 executes, producing state-tree A.
2099///   2. Migration consumes state-tree A and produces state-tree B.
2100///   3. Tipset-epoch-11 executes, consuming state-tree B (rather than A).
2101///
2102/// Note: The migration actually happens when tipset-epoch-11 executes. This is
2103///       because tipset-epoch-10 may be null and therefore not executed at all.
2104///
2105/// # Caching
2106///
2107/// Scanning the blockchain to find past tipsets and state-trees may be slow.
2108/// The `ChainStore` caches recent tipsets to make these scans faster.
2109#[allow(clippy::too_many_arguments)]
2110pub fn apply_block_messages<DB>(
2111    genesis_timestamp: u64,
2112    chain_index: ChainIndex<DB>,
2113    chain_config: Arc<ChainConfig>,
2114    beacon: Arc<BeaconSchedule>,
2115    engine: &MultiEngine,
2116    tipset: Tipset,
2117    mut callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
2118    enable_tracing: VMTrace,
2119) -> anyhow::Result<ExecutedTipset>
2120where
2121    DB: Blockstore + Send + Sync + 'static,
2122{
2123    // This function will:
2124    // 1. handle the genesis block as a special case
2125    // 2. run 'cron' for any null-tipsets between the current tipset and our parent tipset
2126    // 3. run migrations
2127    // 4. execute block messages
2128    // 5. write the state-tree to the DB and return the CID
2129
2130    // step 1: special case for genesis block
2131    if tipset.epoch() == 0 {
2132        // NB: This is here because the process that executes blocks requires that the
2133        // block miner reference a valid miner in the state tree. Unless we create some
2134        // magical genesis miner, this won't work properly, so we short circuit here
2135        // This avoids the question of 'who gets paid the genesis block reward'
2136        let message_receipts = tipset.min_ticket_block().message_receipts;
2137        return Ok(ExecutedTipset {
2138            state_root: *tipset.parent_state(),
2139            receipt_root: message_receipts,
2140            executed_messages: vec![].into(),
2141        });
2142    }
2143
2144    let exec = TipsetExecutor::new(
2145        chain_index.shallow_clone(),
2146        chain_config,
2147        beacon,
2148        engine,
2149        tipset.shallow_clone(),
2150    );
2151
2152    // step 2: running cron for any null-tipsets
2153    // step 3: run migrations
2154    let (parent_state, epoch, block_messages) =
2155        exec.prepare_parent_state(genesis_timestamp, enable_tracing, &mut callback)?;
2156
2157    // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from
2158    // FVM, but that introduces some constraints, and possible deadlocks.
2159    stacker::grow(64 << 20, || -> anyhow::Result<ExecutedTipset> {
2160        let mut vm = exec.create_vm(parent_state, epoch, tipset.min_timestamp(), enable_tracing)?;
2161
2162        // step 4: apply tipset messages
2163        let (receipts, events, events_roots) =
2164            vm.apply_block_messages(&block_messages, epoch, callback)?;
2165
2166        // step 5: construct receipt root from receipts
2167        let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts.iter())?;
2168
2169        // step 6: store events AMTs in the blockstore
2170        for (events, events_root) in events.iter().zip(events_roots.iter()) {
2171            if let Some(events) = events {
2172                let event_root =
2173                    events_root.context("events root should be present when events present")?;
2174                // Store the events AMT - the root CID should match the one computed by FVM
2175                let derived_event_root = Amt::new_from_iter_with_bit_width(
2176                    chain_index.db(),
2177                    EVENTS_AMT_BITWIDTH,
2178                    events.iter(),
2179                )
2180                .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;
2181
2182                // Verify the stored root matches the FVM-computed root
2183                ensure!(
2184                    derived_event_root == event_root,
2185                    "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
2186                );
2187            }
2188        }
2189
2190        let state_root = vm.flush()?;
2191
2192        // Update executed tipset cache
2193        let messages: Vec<ChainMessage> = block_messages
2194            .into_iter()
2195            .flat_map(|bm| bm.messages)
2196            .collect_vec();
2197        anyhow::ensure!(
2198            messages.len() == receipts.len() && messages.len() == events.len(),
2199            "length of messages, receipts, and events should match",
2200        );
2201        Ok(ExecutedTipset {
2202            state_root,
2203            receipt_root,
2204            executed_messages: messages
2205                .into_iter()
2206                .zip(receipts)
2207                .zip(events)
2208                .map(|((message, receipt), events)| ExecutedMessage {
2209                    message,
2210                    receipt,
2211                    events,
2212                })
2213                .collect_vec()
2214                .into(),
2215        })
2216    })
2217}
2218
2219#[allow(clippy::too_many_arguments)]
2220pub fn compute_state<DB>(
2221    _height: ChainEpoch,
2222    messages: Vec<Message>,
2223    tipset: Tipset,
2224    genesis_timestamp: u64,
2225    chain_index: ChainIndex<DB>,
2226    chain_config: Arc<ChainConfig>,
2227    beacon: Arc<BeaconSchedule>,
2228    engine: &MultiEngine,
2229    callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
2230    enable_tracing: VMTrace,
2231) -> anyhow::Result<ExecutedTipset>
2232where
2233    DB: Blockstore + Send + Sync + 'static,
2234{
2235    if !messages.is_empty() {
2236        anyhow::bail!("Applying messages is not yet implemented.");
2237    }
2238
2239    let output = apply_block_messages(
2240        genesis_timestamp,
2241        chain_index,
2242        chain_config,
2243        beacon,
2244        engine,
2245        tipset,
2246        callback,
2247        enable_tracing,
2248    )?;
2249
2250    Ok(output)
2251}
2252
2253/// Controls whether the VM should flush its state after execution
2254#[derive(Debug, Copy, Clone, Default)]
2255pub enum VMFlush {
2256    Flush,
2257    #[default]
2258    Skip,
2259}