Skip to main content

forest/state_manager/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4#[cfg(test)]
5mod tests;
6
7mod cache;
8pub mod chain_rand;
9pub mod circulating_supply;
10mod errors;
11pub mod utils;
12
13pub use self::errors::*;
14use self::utils::structured;
15
16use crate::beacon::{BeaconEntry, BeaconSchedule};
17use crate::blocks::{Tipset, TipsetKey};
18use crate::chain::{
19    ChainStore,
20    index::{ChainIndex, ResolveNullTipset},
21};
22use crate::interpreter::{
23    BlockMessages, CalledAt, ExecutionContext, IMPLICIT_MESSAGE_GAS_LIMIT, VM, resolve_to_key_addr,
24};
25use crate::interpreter::{MessageCallbackCtx, VMTrace};
26use crate::lotus_json::{LotusJson, lotus_json_with_self};
27use crate::message::{ChainMessage, MessageRead as _, MessageReadWrite as _, SignedMessage};
28use crate::networks::ChainConfig;
29use crate::rpc::state::{ApiInvocResult, InvocResult, MessageGasCost};
30use crate::rpc::types::{MiningBaseInfo, SectorOnChainInfo};
31use crate::shim::actors::init::{self, State};
32use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition};
33use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim};
34use crate::shim::actors::*;
35use crate::shim::address::AddressId;
36use crate::shim::crypto::{Signature, SignatureType};
37use crate::shim::{
38    actors::{
39        LoadActorStateFromBlockstore, miner::ext::MinerStateExt as _,
40        verifreg::ext::VerifiedRegistryStateExt as _,
41    },
42    executor::{ApplyRet, Receipt, StampedEvent},
43};
44use crate::shim::{
45    address::{Address, Payload, Protocol},
46    clock::ChainEpoch,
47    econ::TokenAmount,
48    machine::{GLOBAL_MULTI_ENGINE, MultiEngine},
49    message::Message,
50    randomness::Randomness,
51    runtime::Policy,
52    state_tree::{ActorState, StateTree},
53    version::NetworkVersion,
54};
55use crate::state_manager::cache::TipsetStateCache;
56use crate::state_manager::chain_rand::draw_randomness;
57use crate::state_migration::run_state_migrations;
58use crate::utils::ShallowClone as _;
59use crate::utils::cache::SizeTrackingLruCache;
60use crate::utils::get_size::{GetSize, vec_heap_size_helper};
61use ahash::{HashMap, HashMapExt};
62use anyhow::{Context as _, bail, ensure};
63use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _};
64use chain_rand::ChainRand;
65use cid::Cid;
66pub use circulating_supply::GenesisInfo;
67use fil_actor_verifreg_state::v12::DataCap;
68use fil_actor_verifreg_state::v13::ClaimID;
69use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0};
70use fil_actors_shared::fvm_ipld_bitfield::BitField;
71use fil_actors_shared::v12::runtime::DomainSeparationTag;
72use futures::{FutureExt, channel::oneshot, select};
73use fvm_ipld_blockstore::Blockstore;
74use fvm_ipld_encoding::to_vec;
75use fvm_shared4::crypto::signature::SECP_SIG_LEN;
76use itertools::Itertools as _;
77use nonzero_ext::nonzero;
78use num::BigInt;
79use num_traits::identities::Zero;
80use schemars::JsonSchema;
81use serde::{Deserialize, Serialize};
82use std::ops::RangeInclusive;
83use std::time::Duration;
84use std::{num::NonZeroUsize, sync::Arc};
85use tokio::sync::{RwLock, broadcast::error::RecvError};
86use tracing::{error, info, instrument, warn};
87
88const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
89const DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
90pub const EVENTS_AMT_BITWIDTH: u32 = 5;
91pub type IdToAddressCache = SizeTrackingLruCache<AddressId, Address>;
92
93/// Result of executing an individual chain message in a tipset.
94///
95/// Includes the executed message itself, the execution receipt, and
96/// optional events emitted by the actor during execution.
97#[derive(Debug, Clone)]
98pub struct ExecutedMessage {
99    pub message: ChainMessage,
100    pub receipt: Receipt,
101    pub events: Option<Vec<StampedEvent>>,
102}
103
104impl GetSize for ExecutedMessage {
105    fn get_heap_size(&self) -> usize {
106        self.message.get_heap_size()
107            + self.receipt.get_heap_size()
108            + self
109                .events
110                .as_ref()
111                .map(vec_heap_size_helper)
112                .unwrap_or_default()
113    }
114}
115
116/// Aggregated execution result for a tipset.
117#[derive(Debug, Clone, GetSize)]
118pub struct ExecutedTipset {
119    /// Resulting state tree root after message execution
120    #[get_size(ignore)]
121    pub state_root: Cid,
122    /// Resulting message receipts root after message execution
123    #[get_size(ignore)]
124    pub receipt_root: Cid,
125    /// Per-message execution details.
126    /// Wrapped in an `Arc` to reduce cloning cost, as this can be quite large.
127    pub executed_messages: Arc<Vec<ExecutedMessage>>,
128}
129
130/// Basic execution result for a tipset.
131#[derive(Debug, Clone, GetSize)]
132pub struct TipsetState {
133    /// Resulting state tree root after message execution
134    #[get_size(ignore)]
135    pub state_root: Cid,
136    /// Resulting message receipts root after message execution
137    #[allow(dead_code)]
138    #[get_size(ignore)]
139    pub receipt_root: Cid,
140}
141
142impl From<ExecutedTipset> for TipsetState {
143    fn from(
144        ExecutedTipset {
145            state_root,
146            receipt_root,
147            ..
148        }: ExecutedTipset,
149    ) -> Self {
150        Self {
151            state_root,
152            receipt_root,
153        }
154    }
155}
156
157impl From<&ExecutedTipset> for TipsetState {
158    fn from(
159        ExecutedTipset {
160            state_root,
161            receipt_root,
162            ..
163        }: &ExecutedTipset,
164    ) -> Self {
165        Self {
166            state_root: *state_root,
167            receipt_root: *receipt_root,
168        }
169    }
170}
171
172/// External format for returning market balance from state.
173#[derive(
174    Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema,
175)]
176#[serde(rename_all = "PascalCase")]
177pub struct MarketBalance {
178    #[schemars(with = "LotusJson<TokenAmount>")]
179    #[serde(with = "crate::lotus_json")]
180    pub escrow: TokenAmount,
181    #[schemars(with = "LotusJson<TokenAmount>")]
182    #[serde(with = "crate::lotus_json")]
183    pub locked: TokenAmount,
184}
185lotus_json_with_self!(MarketBalance);
186
187/// State manager handles all interactions with the internal Filecoin actors
188/// state. This encapsulates the [`ChainStore`] functionality, which only
189/// handles chain data, to allow for interactions with the underlying state of
190/// the chain. The state manager not only allows interfacing with state, but
191/// also is used when performing state transitions.
192pub struct StateManager<DB> {
193    /// Chain store
194    cs: Arc<ChainStore<DB>>,
195    /// This is a cache which indexes tipsets to their calculated state output (state root, receipt root).
196    cache: TipsetStateCache<ExecutedTipset>,
197    id_to_deterministic_address_cache: IdToAddressCache,
198    beacon: Arc<crate::beacon::BeaconSchedule>,
199    engine: Arc<MultiEngine>,
200}
201
202#[allow(clippy::type_complexity)]
203pub const NO_CALLBACK: Option<fn(MessageCallbackCtx<'_>) -> anyhow::Result<()>> = None;
204
205impl<DB> StateManager<DB>
206where
207    DB: Blockstore,
208{
209    pub fn new(cs: Arc<ChainStore<DB>>) -> anyhow::Result<Self> {
210        Self::new_with_engine(cs, GLOBAL_MULTI_ENGINE.clone())
211    }
212
213    pub fn new_with_engine(
214        cs: Arc<ChainStore<DB>>,
215        engine: Arc<MultiEngine>,
216    ) -> anyhow::Result<Self> {
217        let genesis = cs.genesis_block_header();
218        let beacon = Arc::new(cs.chain_config().get_beacon_schedule(genesis.timestamp));
219
220        Ok(Self {
221            cs,
222            cache: TipsetStateCache::new("executed_tipset"), // For StateOutput
223            beacon,
224            engine,
225            id_to_deterministic_address_cache: SizeTrackingLruCache::new_with_metrics(
226                "id_to_deterministic_address".into(),
227                DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE,
228            ),
229        })
230    }
231
232    /// Returns the currently tracked heaviest tipset.
233    pub fn heaviest_tipset(&self) -> Tipset {
234        self.chain_store().heaviest_tipset()
235    }
236
237    /// Returns the currently tracked heaviest tipset and rewind to a most recent valid one if necessary.
238    /// A valid head has
239    ///     - state tree in the blockstore
240    ///     - actor bundle version in the state tree that matches chain configuration
241    pub fn maybe_rewind_heaviest_tipset(&self) -> anyhow::Result<()> {
242        while self.maybe_rewind_heaviest_tipset_once()? {}
243        Ok(())
244    }
245
246    fn maybe_rewind_heaviest_tipset_once(&self) -> anyhow::Result<bool> {
247        let head = self.heaviest_tipset();
248        if let Some(info) = self
249            .chain_config()
250            .network_height_with_actor_bundle(head.epoch())
251        {
252            let expected_height_info = info.info;
253            let expected_bundle = info.manifest(self.blockstore())?;
254            let expected_bundle_metadata = expected_bundle.metadata()?;
255            let state = self.get_state_tree(head.parent_state())?;
256            let bundle_metadata = state.get_actor_bundle_metadata()?;
257            if expected_bundle_metadata != bundle_metadata {
258                let current_epoch = head.epoch();
259                let target_head = self.chain_index().tipset_by_height(
260                    (expected_height_info.epoch - 1).max(0),
261                    head,
262                    ResolveNullTipset::TakeOlder,
263                )?;
264                let target_epoch = target_head.epoch();
265                let bundle_version = &bundle_metadata.version;
266                let expected_bundle_version = &expected_bundle_metadata.version;
267                if target_epoch < current_epoch {
268                    tracing::warn!(
269                        "rewinding chain head from {current_epoch} to {target_epoch}, actor bundle: {bundle_version}, expected: {expected_bundle_version}"
270                    );
271                    if self.blockstore().has(target_head.parent_state())? {
272                        self.chain_store().set_heaviest_tipset(target_head)?;
273                        return Ok(true);
274                    } else {
275                        anyhow::bail!(
276                            "failed to rewind, state tree @ {target_epoch} is missing from blockstore: {}",
277                            target_head.parent_state()
278                        );
279                    }
280                }
281            }
282        }
283        Ok(false)
284    }
285
286    pub fn beacon_schedule(&self) -> &Arc<BeaconSchedule> {
287        &self.beacon
288    }
289
290    /// Returns network version for the given epoch.
291    pub fn get_network_version(&self, epoch: ChainEpoch) -> NetworkVersion {
292        self.chain_config().network_version(epoch)
293    }
294
295    /// Gets the state tree
296    pub fn get_state_tree(&self, state_cid: &Cid) -> anyhow::Result<StateTree<DB>> {
297        StateTree::new_from_root(self.blockstore_owned(), state_cid)
298    }
299
300    /// Gets actor from given [`Cid`], if it exists.
301    pub fn get_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<Option<ActorState>> {
302        let state = self.get_state_tree(&state_cid)?;
303        state.get_actor(addr)
304    }
305
306    /// Gets actor state from implicit actor address
307    pub fn get_actor_state<S: LoadActorStateFromBlockstore>(
308        &self,
309        ts: &Tipset,
310    ) -> anyhow::Result<S> {
311        let state_tree = self.get_state_tree(ts.parent_state())?;
312        state_tree.get_actor_state()
313    }
314
315    /// Gets actor state from explicit actor address
316    pub fn get_actor_state_from_address<S: LoadActorStateFromBlockstore>(
317        &self,
318        ts: &Tipset,
319        actor_address: &Address,
320    ) -> anyhow::Result<S> {
321        let state_tree = self.get_state_tree(ts.parent_state())?;
322        state_tree.get_actor_state_from_address(actor_address)
323    }
324
325    /// Gets required actor from given [`Cid`].
326    pub fn get_required_actor(&self, addr: &Address, state_cid: Cid) -> anyhow::Result<ActorState> {
327        let state = self.get_state_tree(&state_cid)?;
328        state.get_actor(addr)?.with_context(|| {
329            format!("Failed to load actor with addr={addr}, state_cid={state_cid}")
330        })
331    }
332
333    /// Returns a reference to the state manager's [`Blockstore`].
334    pub fn blockstore(&self) -> &Arc<DB> {
335        self.cs.blockstore()
336    }
337
338    pub fn blockstore_owned(&self) -> Arc<DB> {
339        self.blockstore().clone()
340    }
341
342    /// Returns reference to the state manager's [`ChainStore`].
343    pub fn chain_store(&self) -> &Arc<ChainStore<DB>> {
344        &self.cs
345    }
346
347    /// Returns reference to the state manager's [`ChainIndex`].
348    pub fn chain_index(&self) -> &ChainIndex<DB> {
349        self.cs.chain_index()
350    }
351
352    /// Returns reference to the state manager's [`ChainConfig`].
353    pub fn chain_config(&self) -> &Arc<ChainConfig> {
354        self.cs.chain_config()
355    }
356
357    pub fn chain_rand(&self, tipset: Tipset) -> ChainRand<DB> {
358        ChainRand::new(
359            self.chain_config().shallow_clone(),
360            tipset,
361            self.chain_index().shallow_clone(),
362            self.beacon.shallow_clone(),
363        )
364    }
365
366    /// Returns the internal, protocol-level network chain from the state.
367    pub fn get_network_state_name(
368        &self,
369        state_cid: Cid,
370    ) -> anyhow::Result<crate::networks::StateNetworkName> {
371        let init_act = self
372            .get_actor(&init::ADDRESS.into(), state_cid)?
373            .ok_or_else(|| Error::state("Init actor address could not be resolved"))?;
374        Ok(
375            State::load(self.blockstore(), init_act.code, init_act.state)?
376                .into_network_name()
377                .into(),
378        )
379    }
380
381    /// Returns true if miner has been slashed or is considered invalid.
382    pub fn is_miner_slashed(&self, addr: &Address, state_cid: &Cid) -> anyhow::Result<bool, Error> {
383        let actor = self
384            .get_actor(&Address::POWER_ACTOR, *state_cid)?
385            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
386
387        let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
388
389        Ok(spas.miner_power(self.blockstore(), addr)?.is_none())
390    }
391
392    /// Returns raw work address of a miner given the state root.
393    pub fn get_miner_work_addr(&self, state_cid: Cid, addr: &Address) -> Result<Address, Error> {
394        let state =
395            StateTree::new_from_root(self.blockstore_owned(), &state_cid).map_err(Error::other)?;
396        let ms: miner::State = state.get_actor_state_from_address(addr)?;
397        let info = ms.info(self.blockstore()).map_err(|e| e.to_string())?;
398        let addr = resolve_to_key_addr(&state, self.blockstore(), &info.worker())?;
399        Ok(addr)
400    }
401
402    /// Returns specified actor's claimed power and total network power as a
403    /// tuple.
404    pub fn get_power(
405        &self,
406        state_cid: &Cid,
407        addr: Option<&Address>,
408    ) -> anyhow::Result<Option<(power::Claim, power::Claim)>, Error> {
409        let actor = self
410            .get_actor(&Address::POWER_ACTOR, *state_cid)?
411            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
412
413        let spas = power::State::load(self.blockstore(), actor.code, actor.state)?;
414
415        let t_pow = spas.total_power();
416
417        if let Some(maddr) = addr {
418            let m_pow = spas
419                .miner_power(self.blockstore(), maddr)?
420                .ok_or_else(|| Error::state(format!("Miner for address {maddr} not found")))?;
421
422            let min_pow = spas.miner_nominal_power_meets_consensus_minimum(
423                &self.chain_config().policy,
424                self.blockstore(),
425                maddr,
426            )?;
427            if min_pow {
428                return Ok(Some((m_pow, t_pow)));
429            }
430        }
431
432        Ok(None)
433    }
434
435    // Returns all sectors
436    pub fn get_all_sectors(
437        &self,
438        addr: &Address,
439        ts: &Tipset,
440    ) -> anyhow::Result<Vec<SectorOnChainInfo>> {
441        let actor = self
442            .get_actor(addr, *ts.parent_state())?
443            .ok_or_else(|| Error::state("Miner actor not found"))?;
444        let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
445        state.load_sectors_ext(self.blockstore(), None)
446    }
447}
448
449impl<DB> StateManager<DB>
450where
451    DB: Blockstore + Send + Sync + 'static,
452{
453    /// Load the state of a tipset, including state root, message receipts
454    pub async fn load_tipset_state(self: &Arc<Self>, ts: &Tipset) -> anyhow::Result<TipsetState> {
455        if let Some(state) = self.cache.get_map(ts.key(), |et| et.into()) {
456            Ok(state)
457        } else if let Ok(receipt_ts) = self.chain_store().load_child_tipset(ts) {
458            Ok(TipsetState {
459                state_root: *receipt_ts.parent_state(),
460                receipt_root: *receipt_ts.parent_message_receipts(),
461            })
462        } else {
463            Ok(self.load_executed_tipset(ts).await?.into())
464        }
465    }
466
467    /// Load an executed tipset, including state root, message receipts and events with caching.
468    pub async fn load_executed_tipset(
469        self: &Arc<Self>,
470        ts: &Tipset,
471    ) -> anyhow::Result<ExecutedTipset> {
472        // validate the existence of state trees for post-chain-head-epoch tipsets in case chain head is reset(e.g. manually or via GC).
473        if ts.epoch() >= self.heaviest_tipset().epoch()
474            && let Some(cached) = self.cache.get(ts.key())
475        {
476            if StateTree::new_from_root(self.blockstore_owned(), &cached.state_root).is_ok() {
477                return Ok(cached);
478            } else {
479                self.cache.remove(ts.key());
480            }
481        }
482        self.cache
483            .get_or_else(ts.key(), || async move {
484                let receipt_ts = self.chain_store().load_child_tipset(ts).ok();
485                self.load_executed_tipset_inner(ts, receipt_ts.as_ref())
486                    .await
487            })
488            .await
489    }
490
491    async fn load_executed_tipset_inner(
492        self: &Arc<Self>,
493        msg_ts: &Tipset,
494        // when `msg_ts` is the current head, `receipt_ts` is `None`
495        receipt_ts: Option<&Tipset>,
496    ) -> anyhow::Result<ExecutedTipset> {
497        if let Some(receipt_ts) = receipt_ts {
498            anyhow::ensure!(
499                msg_ts.key() == receipt_ts.parents(),
500                "message tipset should be the parent of message receipt tipset"
501            );
502        }
503        let mut recomputed = false;
504        let (state_root, receipt_root, receipts) = match receipt_ts.and_then(|ts| {
505            let receipt_root = *ts.parent_message_receipts();
506            Receipt::get_receipts(self.cs.blockstore(), receipt_root)
507                .ok()
508                .map(|r| (*ts.parent_state(), receipt_root, r))
509        }) {
510            Some((state_root, receipt_root, receipts)) => (state_root, receipt_root, receipts),
511            None => {
512                let state_output = self
513                    .compute_tipset_state(msg_ts.shallow_clone(), NO_CALLBACK, VMTrace::NotTraced)
514                    .await?;
515                recomputed = true;
516                (
517                    state_output.state_root,
518                    state_output.receipt_root,
519                    Receipt::get_receipts(self.cs.blockstore(), state_output.receipt_root)?,
520                )
521            }
522        };
523
524        let messages = self.chain_store().messages_for_tipset(msg_ts)?;
525        anyhow::ensure!(
526            messages.len() == receipts.len(),
527            "mismatching message and receipt counts ({} messages, {} receipts)",
528            messages.len(),
529            receipts.len()
530        );
531        let mut executed_messages = Vec::with_capacity(messages.len());
532        for (message, receipt) in messages.iter().cloned().zip(receipts) {
533            let events = if let Some(events_root) = receipt.events_root() {
534                Some(
535                    match StampedEvent::get_events(self.cs.blockstore(), &events_root) {
536                        Ok(events) => events,
537                        Err(e) if recomputed => return Err(e),
538                        Err(_) => {
539                            self.compute_tipset_state(
540                                msg_ts.shallow_clone(),
541                                NO_CALLBACK,
542                                VMTrace::NotTraced,
543                            )
544                            .await?;
545                            recomputed = true;
546                            StampedEvent::get_events(self.cs.blockstore(), &events_root)?
547                        }
548                    },
549                )
550            } else {
551                None
552            };
553            executed_messages.push(ExecutedMessage {
554                message,
555                receipt,
556                events,
557            });
558        }
559        Ok(ExecutedTipset {
560            state_root,
561            receipt_root,
562            executed_messages: Arc::new(executed_messages),
563        })
564    }
565
566    #[instrument(skip(self, rand))]
567    fn call_raw(
568        &self,
569        state_cid: Option<Cid>,
570        msg: &Message,
571        rand: ChainRand<DB>,
572        tipset: &Tipset,
573    ) -> Result<ApiInvocResult, Error> {
574        let mut msg = msg.clone();
575
576        let state_cid = state_cid.unwrap_or(*tipset.parent_state());
577
578        let tipset_messages = self
579            .chain_store()
580            .messages_for_tipset(tipset)
581            .map_err(|err| Error::Other(err.to_string()))?;
582
583        let prior_messsages = tipset_messages
584            .iter()
585            .filter(|ts_msg| ts_msg.message().from() == msg.from());
586
587        // Handle state forks
588
589        let height = tipset.epoch();
590        let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
591        let mut vm = VM::new(
592            ExecutionContext {
593                heaviest_tipset: tipset.shallow_clone(),
594                state_tree_root: state_cid,
595                epoch: height,
596                rand: Box::new(rand),
597                base_fee: tipset.block_headers().first().parent_base_fee.clone(),
598                circ_supply: genesis_info.get_vm_circulating_supply(
599                    height,
600                    self.blockstore(),
601                    &state_cid,
602                )?,
603                chain_config: self.chain_config().shallow_clone(),
604                chain_index: self.chain_index().shallow_clone(),
605                timestamp: tipset.min_timestamp(),
606            },
607            &self.engine,
608            VMTrace::Traced,
609        )?;
610
611        for m in prior_messsages {
612            vm.apply_message(m)?;
613        }
614
615        // We flush to get the VM's view of the state tree after applying the above messages
616        // This is needed to get the correct nonce from the actor state to match the VM
617        let state_cid = vm.flush()?;
618
619        let state = StateTree::new_from_root(self.blockstore_owned(), &state_cid)?;
620
621        let from_actor = state
622            .get_actor(&msg.from())?
623            .ok_or_else(|| anyhow::anyhow!("actor not found"))?;
624        msg.set_sequence(from_actor.sequence);
625
626        // Implicit messages need to set a special gas limit
627        let mut msg = msg.clone();
628        msg.gas_limit = IMPLICIT_MESSAGE_GAS_LIMIT as u64;
629
630        let (apply_ret, duration) = vm.apply_implicit_message(&msg)?;
631
632        Ok(ApiInvocResult {
633            msg: msg.clone(),
634            msg_rct: Some(apply_ret.msg_receipt()),
635            msg_cid: msg.cid(),
636            error: apply_ret.failure_info().unwrap_or_default(),
637            duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
638            gas_cost: MessageGasCost::default(),
639            execution_trace: structured::parse_events(apply_ret.exec_trace()).unwrap_or_default(),
640        })
641    }
642
643    /// runs the given message and returns its result without any persisted
644    /// changes.
645    pub fn call(&self, message: &Message, tipset: Option<Tipset>) -> Result<ApiInvocResult, Error> {
646        let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
647        let chain_rand = self.chain_rand(ts.shallow_clone());
648        self.call_raw(None, message, chain_rand, &ts)
649    }
650
651    /// Same as [`StateManager::call`] but runs the message on the given state and not
652    /// on the parent state of the tipset.
653    pub fn call_on_state(
654        &self,
655        state_cid: Cid,
656        message: &Message,
657        tipset: Option<Tipset>,
658    ) -> Result<ApiInvocResult, Error> {
659        let ts = tipset.unwrap_or_else(|| self.cs.heaviest_tipset());
660        let chain_rand = self.chain_rand(ts.shallow_clone());
661        self.call_raw(Some(state_cid), message, chain_rand, &ts)
662    }
663
664    pub async fn apply_on_state_with_gas(
665        self: &Arc<Self>,
666        tipset: Option<Tipset>,
667        msg: Message,
668        vm_flush: VMFlush,
669    ) -> anyhow::Result<(ApiInvocResult, Option<Cid>)> {
670        let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
671
672        let from_a = self.resolve_to_key_addr(&msg.from, &ts).await?;
673
674        // Pretend that the message is signed. This has an influence on the gas
675        // cost. We obviously can't generate a valid signature. Instead, we just
676        // fill the signature with zeros. The validity is not checked.
677        let mut chain_msg = match from_a.protocol() {
678            Protocol::Secp256k1 => SignedMessage::new_unchecked(
679                msg.clone(),
680                Signature::new_secp256k1(vec![0; SECP_SIG_LEN]),
681            )
682            .into(),
683            Protocol::Delegated => SignedMessage::new_unchecked(
684                msg.clone(),
685                // In Lotus, delegated signatures have the same length as SECP256k1.
686                // This may or may not change in the future.
687                Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]),
688            )
689            .into(),
690            _ => msg.clone().into(),
691        };
692
693        let (_invoc_res, apply_ret, duration, state_root) = self
694            .call_with_gas(&mut chain_msg, &[], Some(ts), vm_flush)
695            .await?;
696
697        Ok((
698            ApiInvocResult {
699                msg_cid: msg.cid(),
700                msg,
701                msg_rct: Some(apply_ret.msg_receipt()),
702                error: apply_ret.failure_info().unwrap_or_default(),
703                duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
704                gas_cost: MessageGasCost::default(),
705                execution_trace: structured::parse_events(apply_ret.exec_trace())
706                    .unwrap_or_default(),
707            },
708            state_root,
709        ))
710    }
711
712    /// Computes message on the given [Tipset] state, after applying other
713    /// messages and returns the values computed in the VM.
714    pub async fn call_with_gas(
715        self: &Arc<Self>,
716        message: &mut ChainMessage,
717        prior_messages: &[ChainMessage],
718        tipset: Option<Tipset>,
719        vm_flush: VMFlush,
720    ) -> Result<(InvocResult, ApplyRet, Duration, Option<Cid>), Error> {
721        let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
722        let TipsetState { state_root, .. } = self
723            .load_tipset_state(&ts)
724            .await
725            .map_err(|e| Error::Other(format!("Could not load tipset state: {e:#}")))?;
726        let chain_rand = self.chain_rand(ts.clone());
727
728        // Since we're simulating a future message, pretend we're applying it in the
729        // "next" tipset
730        let epoch = ts.epoch() + 1;
731        let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
732        // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from
733        // FVM, but that introduces some constraints, and possible deadlocks.
734        let (ret, duration, state_cid) = stacker::grow(64 << 20, || -> anyhow::Result<_> {
735            let mut vm = VM::new(
736                ExecutionContext {
737                    heaviest_tipset: ts.clone(),
738                    state_tree_root: state_root,
739                    epoch,
740                    rand: Box::new(chain_rand),
741                    base_fee: ts.block_headers().first().parent_base_fee.clone(),
742                    circ_supply: genesis_info.get_vm_circulating_supply(
743                        epoch,
744                        self.blockstore(),
745                        &state_root,
746                    )?,
747                    chain_config: self.chain_config().shallow_clone(),
748                    chain_index: self.chain_index().shallow_clone(),
749                    timestamp: ts.min_timestamp(),
750                },
751                &self.engine,
752                VMTrace::NotTraced,
753            )?;
754
755            for msg in prior_messages {
756                vm.apply_message(msg)?;
757            }
758            let from_actor = vm
759                .get_actor(&message.from())
760                .map_err(|e| Error::Other(format!("Could not get actor from state: {e:#}")))?
761                .ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?;
762
763            message.set_sequence(from_actor.sequence);
764            let (ret, duration) = vm.apply_message(message)?;
765            let state_root = match vm_flush {
766                VMFlush::Flush => Some(vm.flush()?),
767                VMFlush::Skip => None,
768            };
769            Ok((ret, duration, state_root))
770        })?;
771
772        Ok((
773            InvocResult::new(message.message().clone(), &ret),
774            ret,
775            duration,
776            state_cid,
777        ))
778    }
779
780    /// Replays the given message and returns the result of executing the
781    /// indicated message, assuming it was executed in the indicated tipset.
782    pub async fn replay(self: &Arc<Self>, ts: Tipset, mcid: Cid) -> Result<ApiInvocResult, Error> {
783        let this = Arc::clone(self);
784        tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid)).await?
785    }
786
787    /// Blocking version of `replay`
788    pub fn replay_blocking(
789        self: &Arc<Self>,
790        ts: Tipset,
791        mcid: Cid,
792    ) -> Result<ApiInvocResult, Error> {
793        const REPLAY_HALT: &str = "replay_halt";
794
795        let mut api_invoc_result = None;
796        let callback = |ctx: MessageCallbackCtx<'_>| {
797            match ctx.at {
798                CalledAt::Applied | CalledAt::Reward
799                    if api_invoc_result.is_none() && ctx.cid == mcid =>
800                {
801                    api_invoc_result = Some(ApiInvocResult {
802                        msg_cid: ctx.message.cid(),
803                        msg: ctx.message.message().clone(),
804                        msg_rct: Some(ctx.apply_ret.msg_receipt()),
805                        error: ctx.apply_ret.failure_info().unwrap_or_default(),
806                        duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
807                        gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
808                        execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
809                            .unwrap_or_default(),
810                    });
811                    anyhow::bail!(REPLAY_HALT);
812                }
813                _ => Ok(()), // ignored
814            }
815        };
816        let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced);
817        if let Err(error_message) = result
818            && error_message.to_string() != REPLAY_HALT
819        {
820            return Err(Error::Other(format!(
821                "unexpected error during execution : {error_message:}"
822            )));
823        }
824        api_invoc_result.ok_or_else(|| Error::Other("failed to replay".into()))
825    }
826
827    /// Replays a tipset up to a target message, capturing the state root before
828    /// and after execution.
829    pub async fn replay_for_prestate(
830        self: &Arc<Self>,
831        ts: Tipset,
832        target_message_cid: Cid,
833    ) -> Result<(Cid, ApiInvocResult, Cid), Error> {
834        let this = Arc::clone(self);
835        tokio::task::spawn_blocking(move || {
836            this.replay_for_prestate_blocking(ts, target_message_cid)
837        })
838        .await
839        .map_err(|e| Error::Other(format!("{e}")))?
840    }
841
842    fn replay_for_prestate_blocking(
843        self: &Arc<Self>,
844        ts: Tipset,
845        target_msg_cid: Cid,
846    ) -> Result<(Cid, ApiInvocResult, Cid), Error> {
847        if ts.epoch() == 0 {
848            return Err(Error::Other(
849                "cannot trace messages in the genesis block".into(),
850            ));
851        }
852
853        let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
854        let exec = TipsetExecutor::new(
855            self.chain_index().shallow_clone(),
856            self.chain_config().shallow_clone(),
857            self.beacon_schedule().shallow_clone(),
858            &self.engine,
859            ts.shallow_clone(),
860        );
861        let mut no_cb = NO_CALLBACK;
862        let (parent_state, epoch, block_messages) =
863            exec.prepare_parent_state(genesis_timestamp, VMTrace::NotTraced, &mut no_cb)?;
864
865        Ok(stacker::grow(64 << 20, || {
866            let mut vm =
867                exec.create_vm(parent_state, epoch, ts.min_timestamp(), VMTrace::NotTraced)?;
868            let mut processed = ahash::HashSet::default();
869
870            for block in block_messages.iter() {
871                let mut penalty = TokenAmount::zero();
872                let mut gas_reward = TokenAmount::zero();
873
874                for msg in block.messages.iter() {
875                    let cid = msg.cid();
876                    if processed.contains(&cid) {
877                        continue;
878                    }
879
880                    processed.insert(cid);
881
882                    if cid == target_msg_cid {
883                        let pre_root = vm.flush()?;
884                        let mut traced_vm =
885                            exec.create_vm(pre_root, epoch, ts.min_timestamp(), VMTrace::Traced)?;
886                        let (ret, duration) = traced_vm.apply_message(msg)?;
887                        let post_root = traced_vm.flush()?;
888
889                        return Ok((
890                            pre_root,
891                            ApiInvocResult {
892                                msg_cid: cid,
893                                msg: msg.message().clone(),
894                                msg_rct: Some(ret.msg_receipt()),
895                                error: ret.failure_info().unwrap_or_default(),
896                                duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
897                                gas_cost: MessageGasCost::default(),
898                                execution_trace: structured::parse_events(ret.exec_trace())
899                                    .unwrap_or_default(),
900                            },
901                            post_root,
902                        ));
903                    }
904
905                    let (ret, _) = vm.apply_message(msg)?;
906                    gas_reward += ret.miner_tip();
907                    penalty += ret.penalty();
908                }
909
910                if let Some(rew_msg) =
911                    vm.reward_message(epoch, block.miner, block.win_count, penalty, gas_reward)?
912                {
913                    let (ret, _) = vm.apply_implicit_message(&rew_msg)?;
914                    if let Some(err) = ret.failure_info() {
915                        bail!(
916                            "failed to apply reward message for miner {}: {err}",
917                            block.miner
918                        );
919                    }
920
921                    // This is more of a sanity check, this should not be able to be hit.
922                    if !ret.msg_receipt().exit_code().is_success() {
923                        bail!(
924                            "reward application message failed (exit: {:?})",
925                            ret.msg_receipt().exit_code()
926                        );
927                    }
928                }
929            }
930
931            bail!("message {target_msg_cid} not found in tipset")
932        })?)
933    }
934
935    /// Checks the eligibility of the miner. This is used in the validation that
936    /// a block's miner has the requirements to mine a block.
937    pub fn eligible_to_mine(
938        &self,
939        address: &Address,
940        base_tipset: &Tipset,
941        lookback_tipset: &Tipset,
942    ) -> anyhow::Result<bool, Error> {
943        let hmp =
944            self.miner_has_min_power(&self.chain_config().policy, address, lookback_tipset)?;
945        let version = self.get_network_version(base_tipset.epoch());
946
947        if version <= NetworkVersion::V3 {
948            return Ok(hmp);
949        }
950
951        if !hmp {
952            return Ok(false);
953        }
954
955        let actor = self
956            .get_actor(&Address::POWER_ACTOR, *base_tipset.parent_state())?
957            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
958
959        let power_state = power::State::load(self.blockstore(), actor.code, actor.state)?;
960
961        let actor = self
962            .get_actor(address, *base_tipset.parent_state())?
963            .ok_or_else(|| Error::state("Miner actor address could not be resolved"))?;
964
965        let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
966
967        // Non-empty power claim.
968        let claim = power_state
969            .miner_power(self.blockstore(), address)?
970            .ok_or_else(|| Error::Other("Could not get claim".to_string()))?;
971        if claim.quality_adj_power <= BigInt::zero() {
972            return Ok(false);
973        }
974
975        // No fee debt.
976        if !miner_state.fee_debt().is_zero() {
977            return Ok(false);
978        }
979
980        // No active consensus faults.
981        let info = miner_state.info(self.blockstore())?;
982        if base_tipset.epoch() <= info.consensus_fault_elapsed {
983            return Ok(false);
984        }
985
986        Ok(true)
987    }
988
989    /// Conceptually, a [`Tipset`] consists of _blocks_ which share an _epoch_.
990    /// Each _block_ contains _messages_, which are executed by the _Filecoin Virtual Machine_.
991    ///
992    /// VM message execution essentially looks like this:
993    /// ```text
994    /// state[N-900..N] * message = state[N+1]
995    /// ```
996    ///
997    /// The `state`s above are stored in the `IPLD Blockstore`, and can be referred to by
998    /// a [`Cid`] - the _state root_.
999    /// The previous 900 states (configurable, see
1000    /// <https://docs.filecoin.io/reference/general/glossary/#finality>) can be
1001    /// queried when executing a message, so a store needs at least that many.
1002    /// (a snapshot typically contains 2000, for example).
1003    ///
1004    /// Each message costs FIL to execute - this is _gas_.
1005    /// After execution, the message has a _receipt_, showing how much gas was spent.
1006    /// This is similarly a [`Cid`] into the block store.
1007    ///
1008    /// For details, see the documentation for [`apply_block_messages`].
1009    ///
1010    pub async fn compute_tipset_state(
1011        self: &Arc<Self>,
1012        tipset: Tipset,
1013        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
1014        enable_tracing: VMTrace,
1015    ) -> Result<ExecutedTipset, Error> {
1016        let this = Arc::clone(self);
1017        tokio::task::spawn_blocking(move || {
1018            this.compute_tipset_state_blocking(tipset, callback, enable_tracing)
1019        })
1020        .await?
1021    }
1022
1023    /// Blocking version of `compute_tipset_state`
1024    pub fn compute_tipset_state_blocking(
1025        &self,
1026        tipset: Tipset,
1027        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
1028        enable_tracing: VMTrace,
1029    ) -> Result<ExecutedTipset, Error> {
1030        let epoch = tipset.epoch();
1031        let has_callback = callback.is_some();
1032        info!(
1033            "Evaluating tipset: EPOCH={epoch}, blocks={}, tsk={}",
1034            tipset.len(),
1035            tipset.key(),
1036        );
1037        Ok(apply_block_messages(
1038            self.chain_store().genesis_block_header().timestamp,
1039            self.chain_index().shallow_clone(),
1040            self.chain_config().shallow_clone(),
1041            self.beacon_schedule().shallow_clone(),
1042            &self.engine,
1043            tipset,
1044            callback,
1045            enable_tracing,
1046        )
1047        .map_err(|e| {
1048            if has_callback {
1049                e
1050            } else {
1051                e.context(format!("Failed to compute tipset state@{epoch}"))
1052            }
1053        })?)
1054    }
1055
1056    #[instrument(skip_all)]
1057    pub async fn compute_state(
1058        self: &Arc<Self>,
1059        height: ChainEpoch,
1060        messages: Vec<Message>,
1061        tipset: Tipset,
1062        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
1063        enable_tracing: VMTrace,
1064    ) -> Result<ExecutedTipset, Error> {
1065        let this = Arc::clone(self);
1066        tokio::task::spawn_blocking(move || {
1067            this.compute_state_blocking(height, messages, tipset, callback, enable_tracing)
1068        })
1069        .await?
1070    }
1071
1072    /// Blocking version of `compute_state`
1073    #[tracing::instrument(skip_all)]
1074    pub fn compute_state_blocking(
1075        &self,
1076        height: ChainEpoch,
1077        messages: Vec<Message>,
1078        tipset: Tipset,
1079        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
1080        enable_tracing: VMTrace,
1081    ) -> Result<ExecutedTipset, Error> {
1082        Ok(compute_state(
1083            height,
1084            messages,
1085            tipset,
1086            self.chain_store().genesis_block_header().timestamp,
1087            self.chain_index().shallow_clone(),
1088            self.chain_config().shallow_clone(),
1089            self.beacon_schedule().shallow_clone(),
1090            &self.engine,
1091            callback,
1092            enable_tracing,
1093        )?)
1094    }
1095
1096    /// Check if tipset had executed the message, by loading the receipt based
1097    /// on the index of the message in the block.
1098    fn tipset_executed_message(
1099        &self,
1100        tipset: &Tipset,
1101        message: &ChainMessage,
1102        allow_replaced: bool,
1103    ) -> Result<Option<Receipt>, Error> {
1104        if tipset.epoch() == 0 {
1105            return Ok(None);
1106        }
1107        let message_from_address = message.from();
1108        let message_sequence = message.sequence();
1109        // Load parent state.
1110        let pts = self
1111            .chain_index()
1112            .load_required_tipset(tipset.parents())
1113            .map_err(|err| Error::Other(format!("Failed to load tipset: {err}")))?;
1114        let messages = self
1115            .cs
1116            .messages_for_tipset(&pts)
1117            .map_err(|err| Error::Other(format!("Failed to load messages for tipset: {err}")))?;
1118        messages
1119            .iter()
1120            .enumerate()
1121            // iterate in reverse because we going backwards through the chain
1122            .rev()
1123            .filter(|(_, s)| {
1124                s.sequence() == message_sequence
1125                    && s.from() == message_from_address
1126                    && s.equal_call(message)
1127            })
1128            .map(|(index, m)| {
1129                // A replacing message is a message with a different CID,
1130                // any of Gas values, and different signature, but with all
1131                // other parameters matching (source/destination, nonce, params, etc.)
1132                if !allow_replaced && message.cid() != m.cid(){
1133                    Err(Error::Other(format!(
1134                        "found message with equal nonce and call params but different CID. wanted {}, found: {}, nonce: {}, from: {}",
1135                        message.cid(),
1136                        m.cid(),
1137                        message.sequence(),
1138                        message.from(),
1139                    )))
1140                } else {
1141                    let block_header = tipset.block_headers().first();
1142                    crate::chain::get_parent_receipt(
1143                        self.blockstore(),
1144                        block_header,
1145                        index,
1146                    )
1147                    .map_err(|err| Error::Other(format!("Failed to get parent receipt (message_receipts={}, index={index}, error={err})", block_header.message_receipts)))
1148                }
1149            })
1150            .next()
1151            .unwrap_or(Ok(None))
1152    }
1153
1154    fn check_search(
1155        &self,
1156        mut current: Tipset,
1157        message: &ChainMessage,
1158        lookback_max_epoch: ChainEpoch,
1159        allow_replaced: bool,
1160    ) -> Result<Option<(Tipset, Receipt)>, Error> {
1161        let message_from_address = message.from();
1162        let message_sequence = message.sequence();
1163        let mut current_actor_state = self
1164            .get_required_actor(&message_from_address, *current.parent_state())
1165            .map_err(Error::state)?;
1166        let message_from_id = self.lookup_required_id(&message_from_address, &current)?;
1167
1168        while current.epoch() >= lookback_max_epoch {
1169            let parent_tipset = self
1170                .chain_index()
1171                .load_required_tipset(current.parents())
1172                .map_err(|err| {
1173                    Error::Other(format!(
1174                        "failed to load tipset during msg wait searchback: {err:}"
1175                    ))
1176                })?;
1177
1178            let parent_actor_state = self
1179                .get_actor(&message_from_id, *parent_tipset.parent_state())
1180                .map_err(|e| Error::State(e.to_string()))?;
1181
1182            if parent_actor_state.is_none()
1183                || (current_actor_state.sequence > message_sequence
1184                    && parent_actor_state.as_ref().unwrap().sequence <= message_sequence)
1185            {
1186                let receipt = self
1187                    .tipset_executed_message(&current, message, allow_replaced)?
1188                    .context("Failed to get receipt with tipset_executed_message")?;
1189                return Ok(Some((current, receipt)));
1190            }
1191
1192            if let Some(parent_actor_state) = parent_actor_state {
1193                current = parent_tipset;
1194                current_actor_state = parent_actor_state;
1195            } else {
1196                break;
1197            }
1198        }
1199
1200        Ok(None)
1201    }
1202
1203    /// Searches backwards through the chain for a message receipt.
1204    fn search_back_for_message(
1205        &self,
1206        current: Tipset,
1207        message: &ChainMessage,
1208        look_back_limit: Option<i64>,
1209        allow_replaced: Option<bool>,
1210    ) -> Result<Option<(Tipset, Receipt)>, Error> {
1211        let current_epoch = current.epoch();
1212        let allow_replaced = allow_replaced.unwrap_or(true);
1213
1214        // Calculate the max lookback epoch (inclusive lower bound) for the search.
1215        let lookback_max_epoch = match look_back_limit {
1216            // No search: limit = 0 means search 0 epochs
1217            Some(0) => return Ok(None),
1218            // Limited search: calculate the inclusive lower bound, clamped to genesis
1219            // Example: limit=5 at epoch=1000 → min_epoch=996, searches [996,1000] = 5 epochs
1220            // Example: limit=2000 at epoch=1000 → min_epoch=0, searches [0,1000] = 1001 epochs (all available)
1221            Some(limit) if limit > 0 => (current_epoch - limit + 1).max(0),
1222            // Search all the way to genesis (epoch 0)
1223            _ => 0,
1224        };
1225
1226        self.check_search(current, message, lookback_max_epoch, allow_replaced)
1227    }
1228
1229    /// Returns a message receipt from a given tipset and message CID.
1230    pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result<Receipt, Error> {
1231        let m = crate::chain::get_chain_message(self.blockstore(), &msg)
1232            .map_err(|e| Error::Other(e.to_string()))?;
1233        let message_receipt = self.tipset_executed_message(&tipset, &m, true)?;
1234        if let Some(receipt) = message_receipt {
1235            return Ok(receipt);
1236        }
1237
1238        let maybe_tuple = self.search_back_for_message(tipset, &m, None, None)?;
1239        let message_receipt = maybe_tuple
1240            .ok_or_else(|| {
1241                Error::Other("Could not get receipt from search back message".to_string())
1242            })?
1243            .1;
1244        Ok(message_receipt)
1245    }
1246
1247    /// `WaitForMessage` blocks until a message appears on chain. It looks
1248    /// backwards in the chain to see if this has already happened. It
1249    /// guarantees that the message has been on chain for at least
1250    /// confidence epochs without being reverted before returning.
1251    pub async fn wait_for_message(
1252        self: &Arc<Self>,
1253        msg_cid: Cid,
1254        confidence: i64,
1255        look_back_limit: Option<ChainEpoch>,
1256        allow_replaced: Option<bool>,
1257    ) -> Result<(Option<Tipset>, Option<Receipt>), Error> {
1258        let mut head_changes_rx = self.cs.subscribe_head_changes();
1259        let (sender, mut receiver) = oneshot::channel::<()>();
1260        let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1261            .map_err(|err| Error::Other(format!("failed to load message {err:}")))?;
1262        let current_tipset = self.heaviest_tipset();
1263        let maybe_message_receipt =
1264            self.tipset_executed_message(&current_tipset, &message, true)?;
1265        if let Some(r) = maybe_message_receipt {
1266            return Ok((Some(current_tipset.shallow_clone()), Some(r)));
1267        }
1268
1269        let mut candidate_tipset: Option<Tipset> = None;
1270        let mut candidate_receipt: Option<Receipt> = None;
1271
1272        let sm_cloned = self.shallow_clone();
1273
1274        let message_for_task = message.clone();
1275        let height_of_head = current_tipset.epoch();
1276        let task = tokio::task::spawn(async move {
1277            let back_tuple = sm_cloned.search_back_for_message(
1278                current_tipset,
1279                &message_for_task,
1280                look_back_limit,
1281                allow_replaced,
1282            )?;
1283            sender
1284                .send(())
1285                .map_err(|e| Error::Other(format!("Could not send to channel {e:?}")))?;
1286            Ok::<_, Error>(back_tuple)
1287        });
1288
1289        let reverts: Arc<RwLock<HashMap<TipsetKey, bool>>> = Arc::new(RwLock::new(HashMap::new()));
1290        let block_revert = reverts.clone();
1291        let sm_cloned = Arc::clone(self);
1292
1293        // Wait for message to be included in head change.
1294        let mut subscriber_poll = tokio::task::spawn(async move {
1295            loop {
1296                match head_changes_rx.recv().await {
1297                    Ok(head_changes) => {
1298                        for tipset in head_changes.reverts {
1299                            if candidate_tipset
1300                                .as_ref()
1301                                .is_some_and(|candidate| candidate.key() == tipset.key())
1302                            {
1303                                candidate_tipset = None;
1304                                candidate_receipt = None;
1305                            }
1306                        }
1307                        for tipset in head_changes.applies {
1308                            if candidate_tipset
1309                                .as_ref()
1310                                .map(|s| tipset.epoch() >= s.epoch() + confidence)
1311                                .unwrap_or_default()
1312                            {
1313                                return Ok((candidate_tipset, candidate_receipt));
1314                            }
1315                            let poll_receiver = receiver.try_recv();
1316                            if let Ok(Some(_)) = poll_receiver {
1317                                block_revert
1318                                    .write()
1319                                    .await
1320                                    .insert(tipset.key().to_owned(), true);
1321                            }
1322
1323                            let maybe_receipt =
1324                                sm_cloned.tipset_executed_message(&tipset, &message, true)?;
1325                            if let Some(receipt) = maybe_receipt {
1326                                if confidence == 0 {
1327                                    return Ok((Some(tipset), Some(receipt)));
1328                                }
1329                                candidate_tipset = Some(tipset);
1330                                candidate_receipt = Some(receipt)
1331                            }
1332                        }
1333                    }
1334                    Err(RecvError::Lagged(i)) => {
1335                        warn!(
1336                            "wait for message head change subscriber lagged, skipped {} events",
1337                            i
1338                        );
1339                    }
1340                    Err(RecvError::Closed) => break,
1341                }
1342            }
1343            Ok((None, None))
1344        })
1345        .fuse();
1346
1347        // Search backwards for message.
1348        let mut search_back_poll = tokio::task::spawn(async move {
1349            let back_tuple = task.await.map_err(|e| {
1350                Error::Other(format!("Could not search backwards for message {e}"))
1351            })??;
1352            if let Some((back_tipset, back_receipt)) = back_tuple {
1353                let should_revert = *reverts
1354                    .read()
1355                    .await
1356                    .get(back_tipset.key())
1357                    .unwrap_or(&false);
1358                let larger_height_of_head = height_of_head >= back_tipset.epoch() + confidence;
1359                if !should_revert && larger_height_of_head {
1360                    return Ok::<_, Error>((Some(back_tipset), Some(back_receipt)));
1361                }
1362                return Ok((None, None));
1363            }
1364            Ok((None, None))
1365        })
1366        .fuse();
1367
1368        // Await on first future to finish.
1369        loop {
1370            select! {
1371                res = subscriber_poll => {
1372                    return res?
1373                }
1374                res = search_back_poll => {
1375                    if let Ok((Some(ts), Some(rct))) = res? {
1376                        return Ok((Some(ts), Some(rct)));
1377                    }
1378                }
1379            }
1380        }
1381    }
1382
1383    pub async fn search_for_message(
1384        &self,
1385        from: Option<Tipset>,
1386        msg_cid: Cid,
1387        look_back_limit: Option<i64>,
1388        allow_replaced: Option<bool>,
1389    ) -> Result<Option<(Tipset, Receipt)>, Error> {
1390        let from = from.unwrap_or_else(|| self.heaviest_tipset());
1391        let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid)
1392            .map_err(|err| Error::Other(format!("failed to load message {err}")))?;
1393        let current_tipset = self.heaviest_tipset();
1394        let maybe_message_receipt =
1395            self.tipset_executed_message(&from, &message, allow_replaced.unwrap_or(true))?;
1396        if let Some(r) = maybe_message_receipt {
1397            Ok(Some((from, r)))
1398        } else {
1399            self.search_back_for_message(current_tipset, &message, look_back_limit, allow_replaced)
1400        }
1401    }
1402
1403    /// Returns a BLS public key from provided address
1404    pub fn get_bls_public_key(
1405        db: &Arc<DB>,
1406        addr: &Address,
1407        state_cid: Cid,
1408    ) -> Result<BlsPublicKey, Error> {
1409        let state = StateTree::new_from_root(Arc::clone(db), &state_cid)
1410            .map_err(|e| Error::Other(e.to_string()))?;
1411        let kaddr =
1412            resolve_to_key_addr(&state, db, addr).context("Failed to resolve key address")?;
1413
1414        match kaddr.into_payload() {
1415            Payload::BLS(key) => BlsPublicKey::from_bytes(&key)
1416                .context("Failed to construct bls public key")
1417                .map_err(Error::from),
1418            _ => Err(Error::state(
1419                "Address must be BLS address to load bls public key",
1420            )),
1421        }
1422    }
1423
1424    /// Looks up ID [Address] from the state at the given [Tipset].
1425    pub fn lookup_id(&self, addr: &Address, ts: &Tipset) -> Result<Option<Address>, Error> {
1426        let state_tree = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1427            .map_err(|e| format!("{e:?}"))?;
1428        Ok(state_tree
1429            .lookup_id(addr)
1430            .map_err(|e| Error::Other(e.to_string()))?
1431            .map(Address::new_id))
1432    }
1433
1434    /// Looks up required ID [Address] from the state at the given [Tipset].
1435    pub fn lookup_required_id(&self, addr: &Address, ts: &Tipset) -> Result<Address, Error> {
1436        self.lookup_id(addr, ts)?
1437            .ok_or_else(|| Error::Other(format!("Failed to lookup the id address {addr}")))
1438    }
1439
1440    /// Retrieves market state
1441    pub fn market_state(&self, ts: &Tipset) -> Result<market::State, Error> {
1442        let actor = self.get_required_actor(&Address::MARKET_ACTOR, *ts.parent_state())?;
1443        let market_state = market::State::load(self.blockstore(), actor.code, actor.state)?;
1444        Ok(market_state)
1445    }
1446
1447    /// Retrieves market balance in escrow and locked tables.
1448    pub fn market_balance(&self, addr: &Address, ts: &Tipset) -> Result<MarketBalance, Error> {
1449        let market_state = self.market_state(ts)?;
1450        let new_addr = self.lookup_required_id(addr, ts)?;
1451        let out = MarketBalance {
1452            escrow: {
1453                market_state
1454                    .escrow_table(self.blockstore())?
1455                    .get(&new_addr)?
1456            },
1457            locked: {
1458                market_state
1459                    .locked_table(self.blockstore())?
1460                    .get(&new_addr)?
1461            },
1462        };
1463
1464        Ok(out)
1465    }
1466
1467    /// Retrieves miner info.
1468    pub fn miner_info(&self, addr: &Address, ts: &Tipset) -> Result<MinerInfo, Error> {
1469        let actor = self
1470            .get_actor(addr, *ts.parent_state())?
1471            .ok_or_else(|| Error::state("Miner actor not found"))?;
1472        let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1473
1474        Ok(state.info(self.blockstore())?)
1475    }
1476
1477    /// Retrieves miner faults.
1478    pub fn miner_faults(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1479        self.all_partition_sectors(addr, ts, |partition| partition.faulty_sectors().clone())
1480    }
1481
1482    /// Retrieves miner recoveries.
1483    pub fn miner_recoveries(&self, addr: &Address, ts: &Tipset) -> Result<BitField, Error> {
1484        self.all_partition_sectors(addr, ts, |partition| partition.recovering_sectors().clone())
1485    }
1486
1487    fn all_partition_sectors(
1488        &self,
1489        addr: &Address,
1490        ts: &Tipset,
1491        get_sector: impl Fn(Partition<'_>) -> BitField,
1492    ) -> Result<BitField, Error> {
1493        let actor = self
1494            .get_actor(addr, *ts.parent_state())?
1495            .ok_or_else(|| Error::state("Miner actor not found"))?;
1496
1497        let state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1498
1499        let mut partitions = Vec::new();
1500
1501        state.for_each_deadline(
1502            &self.chain_config().policy,
1503            self.blockstore(),
1504            |_, deadline| {
1505                deadline.for_each(self.blockstore(), |_, partition| {
1506                    partitions.push(get_sector(partition));
1507                    Ok(())
1508                })
1509            },
1510        )?;
1511
1512        Ok(BitField::union(partitions.iter()))
1513    }
1514
1515    /// Retrieves miner power.
1516    pub fn miner_power(&self, addr: &Address, ts: &Tipset) -> Result<MinerPower, Error> {
1517        if let Some((miner_power, total_power)) = self.get_power(ts.parent_state(), Some(addr))? {
1518            return Ok(MinerPower {
1519                miner_power,
1520                total_power,
1521                has_min_power: true,
1522            });
1523        }
1524
1525        Ok(MinerPower {
1526            has_min_power: false,
1527            miner_power: Default::default(),
1528            total_power: Default::default(),
1529        })
1530    }
1531
1532    /// Similar to `resolve_to_key_addr` in the `forest_vm` [`crate::state_manager`] but does not
1533    /// allow `Actor` type of addresses. Uses `ts` to generate the VM state.
1534    pub async fn resolve_to_key_addr(
1535        self: &Arc<Self>,
1536        addr: &Address,
1537        ts: &Tipset,
1538    ) -> anyhow::Result<Address> {
1539        match addr.protocol() {
1540            Protocol::BLS | Protocol::Secp256k1 | Protocol::Delegated => return Ok(*addr),
1541            Protocol::Actor => {
1542                return Err(Error::Other(
1543                    "cannot resolve actor address to key address".to_string(),
1544                )
1545                .into());
1546            }
1547            _ => {}
1548        };
1549
1550        // First try to resolve the actor in the parent state, so we don't have to
1551        // compute anything.
1552        let state = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())?;
1553        if let Ok(addr) = resolve_to_key_addr(&state, self.blockstore(), addr) {
1554            return Ok(addr);
1555        }
1556
1557        // If that fails, compute the tip-set and try again.
1558        let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?;
1559        let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
1560
1561        resolve_to_key_addr(&state, self.blockstore(), addr)
1562    }
1563
1564    pub async fn miner_get_base_info(
1565        self: &Arc<Self>,
1566        beacon_schedule: &BeaconSchedule,
1567        tipset: Tipset,
1568        addr: Address,
1569        epoch: ChainEpoch,
1570    ) -> anyhow::Result<Option<MiningBaseInfo>> {
1571        let prev_beacon = self
1572            .chain_store()
1573            .chain_index()
1574            .latest_beacon_entry(tipset.clone())?;
1575
1576        let entries: Vec<BeaconEntry> = beacon_schedule
1577            .beacon_entries_for_block(
1578                self.chain_config().network_version(epoch),
1579                epoch,
1580                tipset.epoch(),
1581                &prev_beacon,
1582            )
1583            .await?;
1584
1585        let base = entries.last().unwrap_or(&prev_beacon);
1586
1587        let (lb_tipset, lb_state_root) = ChainStore::get_lookback_tipset_for_round(
1588            self.chain_index(),
1589            self.chain_config(),
1590            &tipset,
1591            epoch,
1592        )?;
1593
1594        // If the miner actor doesn't exist in the current tipset, it is a
1595        // user-error and we must return an error message. If the miner exists
1596        // in the current tipset but not in the lookback tipset, we may not
1597        // error and should instead return None.
1598        let actor = self.get_required_actor(&addr, *tipset.parent_state())?;
1599        if self.get_actor(&addr, lb_state_root)?.is_none() {
1600            return Ok(None);
1601        }
1602
1603        let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?;
1604
1605        let addr_buf = to_vec(&addr)?;
1606        let rand = draw_randomness(
1607            base.signature(),
1608            DomainSeparationTag::WinningPoStChallengeSeed as i64,
1609            epoch,
1610            &addr_buf,
1611        )?;
1612
1613        let network_version = self.chain_config().network_version(tipset.epoch());
1614        let sectors = self.get_sectors_for_winning_post(
1615            &lb_state_root,
1616            network_version,
1617            &addr,
1618            Randomness::new(rand.to_vec()),
1619        )?;
1620
1621        if sectors.is_empty() {
1622            return Ok(None);
1623        }
1624
1625        let (miner_power, total_power) = self
1626            .get_power(&lb_state_root, Some(&addr))?
1627            .context("failed to get power")?;
1628
1629        let info = miner_state.info(self.blockstore())?;
1630
1631        let worker_key = self
1632            .resolve_to_deterministic_address(info.worker, &tipset)
1633            .await?;
1634        let eligible = self.eligible_to_mine(&addr, &tipset, &lb_tipset)?;
1635
1636        Ok(Some(MiningBaseInfo {
1637            miner_power: miner_power.quality_adj_power,
1638            network_power: total_power.quality_adj_power,
1639            sectors,
1640            worker_key,
1641            sector_size: info.sector_size,
1642            prev_beacon_entry: prev_beacon,
1643            beacon_entries: entries,
1644            eligible_for_mining: eligible,
1645        }))
1646    }
1647
1648    /// Checks power actor state for if miner meets consensus minimum
1649    /// requirements.
1650    pub fn miner_has_min_power(
1651        &self,
1652        policy: &Policy,
1653        addr: &Address,
1654        ts: &Tipset,
1655    ) -> anyhow::Result<bool> {
1656        let actor = self
1657            .get_actor(&Address::POWER_ACTOR, *ts.parent_state())?
1658            .ok_or_else(|| Error::state("Power actor address could not be resolved"))?;
1659        let ps = power::State::load(self.blockstore(), actor.code, actor.state)?;
1660
1661        ps.miner_nominal_power_meets_consensus_minimum(policy, self.blockstore(), addr)
1662    }
1663
1664    /// Validates all tipsets at epoch `start..=end` behind the heaviest tipset.
1665    ///
1666    /// Tipsets are processed sequentially. The compute-intensive work inside each
1667    /// tipset (`bellperson` proof verification, FVM batch seal verification, etc.)
1668    /// is already heavily rayon-parallelized. Parallelizing the outer loop actually introduces
1669    /// some issues due to locks in the aforementioned crates. So don't do it.
1670    ///
1671    /// # What is validation?
1672    /// Every state transition returns a new _state root_, which is typically retained in, e.g., snapshots.
1673    /// For "full" snapshots, all state roots are retained.
1674    /// For standard snapshots, the last 2000 or so state roots are retained.
1675    ///
1676    /// _receipts_ meanwhile, are typically ephemeral, but each tipset knows the _receipt root_
1677    /// (hash) of the previous tipset.
1678    ///
1679    /// This function takes advantage of that fact to validate tipsets:
1680    /// - `tipset[N]` claims that `receipt_root[N-1]` should be `0xDEADBEEF`
1681    /// - find `tipset[N-1]`, and perform its state transition to get the actual `receipt_root`
1682    /// - assert that they match
1683    ///
1684    /// See [`Self::compute_tipset_state_blocking`] for an explanation of state transitions.
1685    #[tracing::instrument(skip(self))]
1686    pub fn validate_range(&self, epochs: RangeInclusive<i64>) -> anyhow::Result<()> {
1687        let heaviest = self.heaviest_tipset();
1688        let heaviest_epoch = heaviest.epoch();
1689        let end = self
1690            .chain_index()
1691            .tipset_by_height(*epochs.end(), heaviest, ResolveNullTipset::TakeOlder)
1692            .with_context(|| {
1693                format!(
1694            "couldn't get a tipset at height {} behind heaviest tipset at height {heaviest_epoch}",
1695            *epochs.end(),
1696        )
1697            })?;
1698
1699        // lookup tipset parents as we go along, iterating DOWN from `end`
1700        let tipsets = end
1701            .chain(self.blockstore())
1702            .take_while(|ts| ts.epoch() >= *epochs.start());
1703
1704        self.validate_tipsets(tipsets)
1705    }
1706
1707    pub fn validate_tipsets<T>(&self, tipsets: T) -> anyhow::Result<()>
1708    where
1709        T: Iterator<Item = Tipset> + Send,
1710    {
1711        let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1712        validate_tipsets(
1713            genesis_timestamp,
1714            self.chain_index(),
1715            self.chain_config(),
1716            self.beacon_schedule(),
1717            &self.engine,
1718            tipsets,
1719        )
1720    }
1721
1722    pub fn get_verified_registry_actor_state(
1723        &self,
1724        ts: &Tipset,
1725    ) -> anyhow::Result<verifreg::State> {
1726        let act = self
1727            .get_actor(&Address::VERIFIED_REGISTRY_ACTOR, *ts.parent_state())
1728            .map_err(Error::state)?
1729            .ok_or_else(|| Error::state("actor not found"))?;
1730        verifreg::State::load(self.blockstore(), act.code, act.state)
1731    }
1732    pub fn get_claim(
1733        &self,
1734        addr: &Address,
1735        ts: &Tipset,
1736        claim_id: ClaimID,
1737    ) -> anyhow::Result<Option<Claim>> {
1738        let id_address = self.lookup_required_id(addr, ts)?;
1739        let state = self.get_verified_registry_actor_state(ts)?;
1740        state.get_claim(self.blockstore(), id_address, claim_id)
1741    }
1742
1743    pub fn get_all_claims(&self, ts: &Tipset) -> anyhow::Result<HashMap<ClaimID, Claim>> {
1744        let state = self.get_verified_registry_actor_state(ts)?;
1745        state.get_all_claims(self.blockstore())
1746    }
1747
1748    pub fn get_allocation(
1749        &self,
1750        addr: &Address,
1751        ts: &Tipset,
1752        allocation_id: AllocationID,
1753    ) -> anyhow::Result<Option<Allocation>> {
1754        let id_address = self.lookup_required_id(addr, ts)?;
1755        let state = self.get_verified_registry_actor_state(ts)?;
1756        state.get_allocation(self.blockstore(), id_address.id()?, allocation_id)
1757    }
1758
1759    pub fn get_all_allocations(
1760        &self,
1761        ts: &Tipset,
1762    ) -> anyhow::Result<HashMap<AllocationID, Allocation>> {
1763        let state = self.get_verified_registry_actor_state(ts)?;
1764        state.get_all_allocations(self.blockstore())
1765    }
1766
1767    pub fn verified_client_status(
1768        &self,
1769        addr: &Address,
1770        ts: &Tipset,
1771    ) -> anyhow::Result<Option<DataCap>> {
1772        let id = self.lookup_required_id(addr, ts)?;
1773        let network_version = self.get_network_version(ts.epoch());
1774
1775        // This is a copy of Lotus code, we need to treat all the actors below version 9
1776        // differently. Which maps to network below version 17.
1777        // Original: https://github.com/filecoin-project/lotus/blob/5e76b05b17771da6939c7b0bf65127c3dc70ee23/node/impl/full/state.go#L1627-L1664.
1778        if (u32::from(network_version.0)) < 17 {
1779            let state = self.get_verified_registry_actor_state(ts)?;
1780            return state.verified_client_data_cap(self.blockstore(), id);
1781        }
1782
1783        let act = self
1784            .get_actor(&Address::DATACAP_TOKEN_ACTOR, *ts.parent_state())
1785            .map_err(Error::state)?
1786            .ok_or_else(|| Error::state("Miner actor not found"))?;
1787
1788        let state = datacap::State::load(self.blockstore(), act.code, act.state)?;
1789
1790        state.verified_client_data_cap(self.blockstore(), id)
1791    }
1792
1793    /// Similar to [`StateTree::resolve_to_deterministic_addr`] but does not allow [`crate::shim::address::Protocol::Actor`] type of addresses.
1794    /// Uses the [`Tipset`] `ts` to generate the VM state.
1795    pub async fn resolve_to_deterministic_address(
1796        self: &Arc<Self>,
1797        address: Address,
1798        ts: &Tipset,
1799    ) -> anyhow::Result<Address> {
1800        use crate::shim::address::Protocol::*;
1801        match address.protocol() {
1802            BLS | Secp256k1 | Delegated => Ok(address),
1803            Actor => anyhow::bail!("cannot resolve actor address to key address"),
1804            ID => {
1805                let id = address.id()?;
1806                if let Some(cached) = self.id_to_deterministic_address_cache.get_cloned(&id) {
1807                    return Ok(cached);
1808                }
1809                // First try to resolve the actor in the parent state, so we don't have to compute anything.
1810                let resolved = if let Ok(state) =
1811                    StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
1812                    && let Ok(address) = state
1813                        .resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
1814                {
1815                    address
1816                } else {
1817                    // If that fails, compute the tip-set and try again.
1818                    let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?;
1819                    let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
1820                    state.resolve_to_deterministic_addr(self.chain_store().blockstore(), address)?
1821                };
1822                self.id_to_deterministic_address_cache.push(id, resolved);
1823                Ok(resolved)
1824            }
1825        }
1826    }
1827
1828    pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec<ApiInvocResult>)> {
1829        let mut invoc_trace = vec![];
1830
1831        let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
1832
1833        let callback = |ctx: MessageCallbackCtx<'_>| {
1834            match ctx.at {
1835                CalledAt::Applied | CalledAt::Reward => {
1836                    invoc_trace.push(ApiInvocResult {
1837                        msg_cid: ctx.message.cid(),
1838                        msg: ctx.message.message().clone(),
1839                        msg_rct: Some(ctx.apply_ret.msg_receipt()),
1840                        error: ctx.apply_ret.failure_info().unwrap_or_default(),
1841                        duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
1842                        gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
1843                        execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
1844                            .unwrap_or_default(),
1845                    });
1846                    Ok(())
1847                }
1848                _ => Ok(()), // ignored
1849            }
1850        };
1851
1852        let ExecutedTipset { state_root, .. } = apply_block_messages(
1853            genesis_timestamp,
1854            self.chain_index().shallow_clone(),
1855            self.chain_config().shallow_clone(),
1856            self.beacon_schedule().shallow_clone(),
1857            &self.engine,
1858            tipset.shallow_clone(),
1859            Some(callback),
1860            VMTrace::Traced,
1861        )?;
1862
1863        Ok((state_root, invoc_trace))
1864    }
1865}
1866
1867pub fn validate_tipsets<DB, T>(
1868    genesis_timestamp: u64,
1869    chain_index: &ChainIndex<DB>,
1870    chain_config: &Arc<ChainConfig>,
1871    beacon: &Arc<BeaconSchedule>,
1872    engine: &MultiEngine,
1873    tipsets: T,
1874) -> anyhow::Result<()>
1875where
1876    DB: Blockstore + Send + Sync + 'static,
1877    T: Iterator<Item = Tipset> + Send,
1878{
1879    // Validate one tipset at a time. Parallelizing the outer loop across tipsets
1880    // might wedge the global rayon pool.
1881    // Sequential outer iteration leaves the entire rayon pool free for that
1882    // already-rich inner parallelism.
1883    for (child, parent) in tipsets.tuple_windows() {
1884        info!(height = parent.epoch(), "compute parent state");
1885        let ExecutedTipset {
1886            state_root: actual_state,
1887            receipt_root: actual_receipt,
1888            ..
1889        } = apply_block_messages(
1890            genesis_timestamp,
1891            chain_index.shallow_clone(),
1892            chain_config.shallow_clone(),
1893            beacon.shallow_clone(),
1894            engine,
1895            parent,
1896            NO_CALLBACK,
1897            VMTrace::NotTraced,
1898        )
1899        .context("couldn't compute tipset state")?;
1900        let expected_receipt = child.min_ticket_block().message_receipts;
1901        let expected_state = child.parent_state();
1902        if (expected_state, expected_receipt) != (&actual_state, actual_receipt) {
1903            error!(
1904                height = child.epoch(),
1905                ?expected_state,
1906                ?expected_receipt,
1907                ?actual_state,
1908                ?actual_receipt,
1909                "state mismatch"
1910            );
1911            bail!("state mismatch");
1912        }
1913    }
1914    Ok(())
1915}
1916
1917/// Shared context for creating VMs and preparing tipset state.
1918///
1919/// Encapsulates randomness source, genesis info, VM construction,
1920/// null-epoch cron handling, and state migrations.
1921struct TipsetExecutor<'a, DB: Blockstore + Send + Sync + 'static> {
1922    tipset: Tipset,
1923    rand: ChainRand<DB>,
1924    chain_config: Arc<ChainConfig>,
1925    chain_index: ChainIndex<DB>,
1926    genesis_info: GenesisInfo,
1927    engine: &'a MultiEngine,
1928}
1929
1930impl<'a, DB: Blockstore + Send + Sync + 'static> TipsetExecutor<'a, DB> {
1931    fn new(
1932        chain_index: ChainIndex<DB>,
1933        chain_config: Arc<ChainConfig>,
1934        beacon: Arc<BeaconSchedule>,
1935        engine: &'a MultiEngine,
1936        tipset: Tipset,
1937    ) -> Self {
1938        let rand = ChainRand::new(
1939            chain_config.shallow_clone(),
1940            tipset.shallow_clone(),
1941            chain_index.shallow_clone(),
1942            beacon,
1943        );
1944        let genesis_info = GenesisInfo::from_chain_config(chain_config.shallow_clone());
1945        Self {
1946            tipset,
1947            rand,
1948            chain_config,
1949            chain_index,
1950            genesis_info,
1951            engine,
1952        }
1953    }
1954
1955    fn create_vm(
1956        &self,
1957        state_root: Cid,
1958        epoch: ChainEpoch,
1959        timestamp: u64,
1960        trace: VMTrace,
1961    ) -> anyhow::Result<VM<DB>> {
1962        let circ_supply = self.genesis_info.get_vm_circulating_supply(
1963            epoch,
1964            self.chain_index.db(),
1965            &state_root,
1966        )?;
1967        VM::new(
1968            ExecutionContext {
1969                heaviest_tipset: self.tipset.shallow_clone(),
1970                state_tree_root: state_root,
1971                epoch,
1972                rand: Box::new(self.rand.shallow_clone()),
1973                base_fee: self.tipset.min_ticket_block().parent_base_fee.clone(),
1974                circ_supply,
1975                chain_config: self.chain_config.shallow_clone(),
1976                chain_index: self.chain_index.shallow_clone(),
1977                timestamp,
1978            },
1979            self.engine,
1980            trace,
1981        )
1982    }
1983
1984    /// Produces the state root ready for message execution by running
1985    /// null-epoch `crons` and any pending state migrations.
1986    fn prepare_parent_state<F>(
1987        &self,
1988        genesis_timestamp: u64,
1989        null_epoch_trace: VMTrace,
1990        cron_callback: &mut Option<F>,
1991    ) -> anyhow::Result<(Cid, ChainEpoch, Vec<BlockMessages>)>
1992    where
1993        F: FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>,
1994    {
1995        use crate::shim::clock::EPOCH_DURATION_SECONDS;
1996
1997        let mut parent_state = *self.tipset.parent_state();
1998        let parent_epoch = self
1999            .chain_index
2000            .load_required_tipset(self.tipset.parents())?
2001            .epoch();
2002        let epoch = self.tipset.epoch();
2003
2004        for epoch_i in parent_epoch..epoch {
2005            if epoch_i > parent_epoch {
2006                let timestamp = genesis_timestamp + ((EPOCH_DURATION_SECONDS * epoch_i) as u64);
2007                parent_state = stacker::grow(64 << 20, || -> anyhow::Result<Cid> {
2008                    let mut vm =
2009                        self.create_vm(parent_state, epoch_i, timestamp, null_epoch_trace)?;
2010                    if let Err(e) = vm.run_cron(epoch_i, cron_callback.as_mut()) {
2011                        error!("Beginning of epoch cron failed to run: {e:#}");
2012                        return Err(e);
2013                    }
2014                    vm.flush()
2015                })?;
2016            }
2017            if let Some(new_state) = run_state_migrations(
2018                epoch_i,
2019                &self.chain_config,
2020                self.chain_index.db(),
2021                &parent_state,
2022            )? {
2023                parent_state = new_state;
2024            }
2025        }
2026
2027        let block_messages = BlockMessages::for_tipset(self.chain_index.db(), &self.tipset)?;
2028        Ok((parent_state, epoch, block_messages))
2029    }
2030}
2031
2032/// Messages are transactions that produce new states. The state (usually
2033/// referred to as the 'state-tree') is a mapping from actor addresses to actor
2034/// states. Each block contains the hash of the state-tree that should be used
2035/// as the starting state when executing the block messages.
2036///
2037/// # Execution environment
2038///
2039/// Transaction execution has the following inputs:
2040/// - a current state-tree (stored as IPLD in a key-value database). This
2041///   reference is in [`Tipset::parent_state`].
2042/// - up to 900 past state-trees. See
2043///   <https://docs.filecoin.io/reference/general/glossary/#finality>.
2044/// - up to 900 past tipset IDs.
2045/// - a deterministic source of randomness.
2046/// - the circulating supply of FIL (see
2047///   <https://filecoin.io/blog/filecoin-circulating-supply/>). The circulating
2048///   supply is determined by the epoch and the states of a few key actors.
2049/// - the base fee (see <https://spec.filecoin.io/systems/filecoin_vm/gas_fee/>).
2050///   This value is defined by `tipset.parent_base_fee`.
2051/// - the genesis timestamp (UNIX epoch time when the first block was
2052///   mined/created).
2053/// - a chain configuration (maps epoch to network version, has chain specific
2054///   settings).
2055///
2056/// The result of running a set of block messages is an index to the final
2057/// state-tree and an index to an array of message receipts (listing gas used,
2058/// return codes, etc).
2059///
2060/// # Cron and null tipsets
2061///
2062/// Once per epoch, after all messages have run, a special 'cron' transaction
2063/// must be executed. The tasks of the 'cron' transaction include running batch
2064/// jobs and keeping the state up-to-date with the current epoch.
2065///
2066/// It can happen that no blocks are mined in an epoch. The tipset for such an
2067/// epoch is called a null tipset. A null tipset has no identity and cannot be
2068/// directly executed. This is a problem for 'cron' which must run for every
2069/// epoch, even if there are no messages. The fix is to run 'cron' if there are
2070/// any null tipsets between the current epoch and the parent epoch.
2071///
2072/// Imagine the blockchain looks like this with a null tipset at epoch 9:
2073///
2074/// ```text
2075/// ┌────────┐ ┌────┐ ┌───────┐  ┌───────┐
2076/// │Epoch 10│ │Null│ │Epoch 8├──►Epoch 7├─►
2077/// └───┬────┘ └────┘ └───▲───┘  └───────┘
2078///     └─────────────────┘
2079/// ```
2080///
2081/// The parent of tipset-epoch-10 is tipset-epoch-8. Before executing the
2082/// messages in epoch 10, we have to run cron for epoch 9. However, running
2083/// 'cron' requires the timestamp of the youngest block in the tipset (which
2084/// doesn't exist because there are no blocks in the tipset). Lotus dictates that
2085/// the timestamp of a null tipset is `30s * epoch` after the genesis timestamp.
2086/// So, in the above example, if the genesis block was mined at time `X`, the
2087/// null tipset for epoch 9 will have timestamp `X + 30 * 9`.
2088///
2089/// # Migrations
2090///
2091/// Migrations happen between network upgrades and modify the state tree. If a
2092/// migration is scheduled for epoch 10, it will be run _after_ the messages for
2093/// epoch 10. The tipset for epoch 11 will link the state-tree produced by the
2094/// migration.
2095///
2096/// Example timeline with a migration at epoch 10:
2097///   1. Tipset-epoch-10 executes, producing state-tree A.
2098///   2. Migration consumes state-tree A and produces state-tree B.
2099///   3. Tipset-epoch-11 executes, consuming state-tree B (rather than A).
2100///
2101/// Note: The migration actually happens when tipset-epoch-11 executes. This is
2102///       because tipset-epoch-10 may be null and therefore not executed at all.
2103///
2104/// # Caching
2105///
2106/// Scanning the blockchain to find past tipsets and state-trees may be slow.
2107/// The `ChainStore` caches recent tipsets to make these scans faster.
2108#[allow(clippy::too_many_arguments)]
2109pub fn apply_block_messages<DB>(
2110    genesis_timestamp: u64,
2111    chain_index: ChainIndex<DB>,
2112    chain_config: Arc<ChainConfig>,
2113    beacon: Arc<BeaconSchedule>,
2114    engine: &MultiEngine,
2115    tipset: Tipset,
2116    mut callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
2117    enable_tracing: VMTrace,
2118) -> anyhow::Result<ExecutedTipset>
2119where
2120    DB: Blockstore + Send + Sync + 'static,
2121{
2122    // This function will:
2123    // 1. handle the genesis block as a special case
2124    // 2. run 'cron' for any null-tipsets between the current tipset and our parent tipset
2125    // 3. run migrations
2126    // 4. execute block messages
2127    // 5. write the state-tree to the DB and return the CID
2128
2129    // step 1: special case for genesis block
2130    if tipset.epoch() == 0 {
2131        // NB: This is here because the process that executes blocks requires that the
2132        // block miner reference a valid miner in the state tree. Unless we create some
2133        // magical genesis miner, this won't work properly, so we short circuit here
2134        // This avoids the question of 'who gets paid the genesis block reward'
2135        let message_receipts = tipset.min_ticket_block().message_receipts;
2136        return Ok(ExecutedTipset {
2137            state_root: *tipset.parent_state(),
2138            receipt_root: message_receipts,
2139            executed_messages: vec![].into(),
2140        });
2141    }
2142
2143    let exec = TipsetExecutor::new(
2144        chain_index.shallow_clone(),
2145        chain_config,
2146        beacon,
2147        engine,
2148        tipset.shallow_clone(),
2149    );
2150
2151    // step 2: running cron for any null-tipsets
2152    // step 3: run migrations
2153    let (parent_state, epoch, block_messages) =
2154        exec.prepare_parent_state(genesis_timestamp, enable_tracing, &mut callback)?;
2155
2156    // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from
2157    // FVM, but that introduces some constraints, and possible deadlocks.
2158    stacker::grow(64 << 20, || -> anyhow::Result<ExecutedTipset> {
2159        let mut vm = exec.create_vm(parent_state, epoch, tipset.min_timestamp(), enable_tracing)?;
2160
2161        // step 4: apply tipset messages
2162        let (receipts, events, events_roots) =
2163            vm.apply_block_messages(&block_messages, epoch, callback)?;
2164
2165        // step 5: construct receipt root from receipts
2166        let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts.iter())?;
2167
2168        // step 6: store events AMTs in the blockstore
2169        for (events, events_root) in events.iter().zip(events_roots.iter()) {
2170            if let Some(events) = events {
2171                let event_root =
2172                    events_root.context("events root should be present when events present")?;
2173                // Store the events AMT - the root CID should match the one computed by FVM
2174                let derived_event_root = Amt::new_from_iter_with_bit_width(
2175                    chain_index.db(),
2176                    EVENTS_AMT_BITWIDTH,
2177                    events.iter(),
2178                )
2179                .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;
2180
2181                // Verify the stored root matches the FVM-computed root
2182                ensure!(
2183                    derived_event_root == event_root,
2184                    "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
2185                );
2186            }
2187        }
2188
2189        let state_root = vm.flush()?;
2190
2191        // Update executed tipset cache
2192        let messages: Vec<ChainMessage> = block_messages
2193            .into_iter()
2194            .flat_map(|bm| bm.messages)
2195            .collect_vec();
2196        anyhow::ensure!(
2197            messages.len() == receipts.len() && messages.len() == events.len(),
2198            "length of messages, receipts, and events should match",
2199        );
2200        Ok(ExecutedTipset {
2201            state_root,
2202            receipt_root,
2203            executed_messages: messages
2204                .into_iter()
2205                .zip(receipts)
2206                .zip(events)
2207                .map(|((message, receipt), events)| ExecutedMessage {
2208                    message,
2209                    receipt,
2210                    events,
2211                })
2212                .collect_vec()
2213                .into(),
2214        })
2215    })
2216}
2217
2218#[allow(clippy::too_many_arguments)]
2219pub fn compute_state<DB>(
2220    _height: ChainEpoch,
2221    messages: Vec<Message>,
2222    tipset: Tipset,
2223    genesis_timestamp: u64,
2224    chain_index: ChainIndex<DB>,
2225    chain_config: Arc<ChainConfig>,
2226    beacon: Arc<BeaconSchedule>,
2227    engine: &MultiEngine,
2228    callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
2229    enable_tracing: VMTrace,
2230) -> anyhow::Result<ExecutedTipset>
2231where
2232    DB: Blockstore + Send + Sync + 'static,
2233{
2234    if !messages.is_empty() {
2235        anyhow::bail!("Applying messages is not yet implemented.");
2236    }
2237
2238    let output = apply_block_messages(
2239        genesis_timestamp,
2240        chain_index,
2241        chain_config,
2242        beacon,
2243        engine,
2244        tipset,
2245        callback,
2246        enable_tracing,
2247    )?;
2248
2249    Ok(output)
2250}
2251
2252/// Controls whether the VM should flush its state after execution
2253#[derive(Debug, Copy, Clone, Default)]
2254pub enum VMFlush {
2255    Flush,
2256    #[default]
2257    Skip,
2258}