Skip to main content

forest/state_manager/
state_computation.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::circulating_supply::GenesisInfo;
5use super::*;
6use crate::db::EthMappingsStore;
7use crate::interpreter::{BlockMessages, ExecutionContext, VM, VMTrace};
8use crate::shim::message::Message;
9use crate::state_migration::run_state_migrations;
10use crate::utils::ShallowClone as _;
11use anyhow::{Context as _, bail, ensure};
12use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0};
13use itertools::Itertools as _;
14use tracing::{error, info, instrument};
15
16impl<DB> StateManager<DB>
17where
18    DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
19{
20    /// Load the state of a tipset, including state root, message receipts
21    pub async fn load_tipset_state(self: &Arc<Self>, ts: &Tipset) -> anyhow::Result<TipsetState> {
22        if let Some(state) = self.cache.get_map(ts.key(), |et| et.into()) {
23            Ok(state)
24        } else {
25            match self.chain_store().load_child_tipset(ts)? {
26                Some(receipt_ts) => Ok(TipsetState {
27                    state_root: *receipt_ts.parent_state(),
28                    receipt_root: *receipt_ts.parent_message_receipts(),
29                }),
30                None => Ok(self.load_executed_tipset(ts).await?.into()),
31            }
32        }
33    }
34
35    /// Load an executed tipset, including state root, message receipts and events with caching.
36    pub async fn load_executed_tipset(
37        self: &Arc<Self>,
38        ts: &Tipset,
39    ) -> anyhow::Result<ExecutedTipset> {
40        // validate the existence of state trees for post-chain-head-epoch tipsets in case chain head is reset(e.g. manually or via GC).
41        if ts.epoch() >= self.heaviest_tipset().epoch()
42            && let Some(cached) = self.cache.get(ts.key())
43        {
44            if StateTree::new_from_root(self.blockstore_owned(), &cached.state_root).is_ok() {
45                return Ok(cached);
46            } else {
47                self.cache.remove(ts.key());
48            }
49        }
50        self.cache
51            .get_or_else(ts.key(), || async move {
52                let receipt_ts = self.chain_store().load_child_tipset(ts)?;
53                self.load_executed_tipset_inner(ts, receipt_ts.as_ref())
54                    .await
55            })
56            .await
57    }
58
59    async fn load_executed_tipset_inner(
60        self: &Arc<Self>,
61        msg_ts: &Tipset,
62        // when `msg_ts` is the current head, `receipt_ts` is `None`
63        receipt_ts: Option<&Tipset>,
64    ) -> anyhow::Result<ExecutedTipset> {
65        if let Some(receipt_ts) = receipt_ts {
66            anyhow::ensure!(
67                msg_ts.key() == receipt_ts.parents(),
68                "message tipset should be the parent of message receipt tipset"
69            );
70        }
71        let mut recomputed = false;
72        let (state_root, receipt_root, receipts) = match receipt_ts.and_then(|ts| {
73            let receipt_root = *ts.parent_message_receipts();
74            Receipt::get_receipts(self.cs.blockstore(), receipt_root)
75                .ok()
76                .map(|r| (*ts.parent_state(), receipt_root, r))
77        }) {
78            Some((state_root, receipt_root, receipts)) => (state_root, receipt_root, receipts),
79            None => {
80                let state_output = self
81                    .compute_tipset_state(msg_ts.shallow_clone(), NO_CALLBACK, VMTrace::NotTraced)
82                    .await?;
83                recomputed = true;
84                (
85                    state_output.state_root,
86                    state_output.receipt_root,
87                    Receipt::get_receipts(self.cs.blockstore(), state_output.receipt_root)?,
88                )
89            }
90        };
91
92        let messages = self.chain_store().messages_for_tipset(msg_ts)?;
93        anyhow::ensure!(
94            messages.len() == receipts.len(),
95            "mismatching message and receipt counts ({} messages, {} receipts)",
96            messages.len(),
97            receipts.len()
98        );
99        let mut executed_messages = Vec::with_capacity(messages.len());
100        for (message, receipt) in messages.iter().cloned().zip(receipts) {
101            let events = if let Some(events_root) = receipt.events_root() {
102                Some(
103                    match StampedEvent::get_events(self.cs.blockstore(), &events_root) {
104                        Ok(events) => events,
105                        Err(e) if recomputed => return Err(e),
106                        Err(_) => {
107                            self.compute_tipset_state(
108                                msg_ts.shallow_clone(),
109                                NO_CALLBACK,
110                                VMTrace::NotTraced,
111                            )
112                            .await?;
113                            recomputed = true;
114                            StampedEvent::get_events(self.cs.blockstore(), &events_root)?
115                        }
116                    },
117                )
118            } else {
119                None
120            };
121            executed_messages.push(ExecutedMessage {
122                message,
123                receipt,
124                events,
125            });
126        }
127        Ok(ExecutedTipset {
128            state_root,
129            receipt_root,
130            executed_messages: Arc::new(executed_messages),
131        })
132    }
133
134    /// Conceptually, a [`Tipset`] consists of _blocks_ which share an _epoch_.
135    /// Each _block_ contains _messages_, which are executed by the _Filecoin Virtual Machine_.
136    ///
137    /// VM message execution essentially looks like this:
138    /// ```text
139    /// state[N-900..N] * message = state[N+1]
140    /// ```
141    ///
142    /// The `state`s above are stored in the `IPLD Blockstore`, and can be referred to by
143    /// a [`Cid`] - the _state root_.
144    /// The previous 900 states (configurable, see
145    /// <https://docs.filecoin.io/reference/general/glossary/#finality>) can be
146    /// queried when executing a message, so a store needs at least that many.
147    /// (a snapshot typically contains 2000, for example).
148    ///
149    /// Each message costs FIL to execute - this is _gas_.
150    /// After execution, the message has a _receipt_, showing how much gas was spent.
151    /// This is similarly a [`Cid`] into the block store.
152    ///
153    /// For details, see the documentation for [`apply_block_messages`].
154    ///
155    pub async fn compute_tipset_state(
156        self: &Arc<Self>,
157        tipset: Tipset,
158        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
159        enable_tracing: VMTrace,
160    ) -> Result<ExecutedTipset, Error> {
161        let this = Arc::clone(self);
162        tokio::task::spawn_blocking(move || {
163            this.compute_tipset_state_blocking(tipset, callback, enable_tracing)
164        })
165        .await?
166    }
167
168    /// Blocking version of `compute_tipset_state`
169    pub fn compute_tipset_state_blocking(
170        &self,
171        tipset: Tipset,
172        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
173        enable_tracing: VMTrace,
174    ) -> Result<ExecutedTipset, Error> {
175        let epoch = tipset.epoch();
176        let has_callback = callback.is_some();
177        info!(
178            "Evaluating tipset: EPOCH={epoch}, blocks={}, tsk={}",
179            tipset.len(),
180            tipset.key(),
181        );
182        Ok(apply_block_messages(
183            self.chain_store().genesis_block_header().timestamp,
184            self.chain_index().shallow_clone(),
185            self.chain_config().shallow_clone(),
186            self.beacon_schedule().shallow_clone(),
187            &self.engine,
188            tipset,
189            callback,
190            enable_tracing,
191        )
192        .map_err(|e| {
193            if has_callback {
194                e
195            } else {
196                e.context(format!("Failed to compute tipset state@{epoch}"))
197            }
198        })?)
199    }
200
201    #[instrument(skip_all)]
202    pub async fn compute_state(
203        self: &Arc<Self>,
204        height: ChainEpoch,
205        messages: Vec<Message>,
206        tipset: Tipset,
207        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
208        enable_tracing: VMTrace,
209    ) -> Result<ExecutedTipset, Error> {
210        let this = Arc::clone(self);
211        tokio::task::spawn_blocking(move || {
212            this.compute_state_blocking(height, messages, tipset, callback, enable_tracing)
213        })
214        .await?
215    }
216
217    /// Blocking version of `compute_state`
218    #[tracing::instrument(skip_all)]
219    pub fn compute_state_blocking(
220        &self,
221        height: ChainEpoch,
222        messages: Vec<Message>,
223        tipset: Tipset,
224        callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
225        enable_tracing: VMTrace,
226    ) -> Result<ExecutedTipset, Error> {
227        Ok(compute_state(
228            height,
229            messages,
230            tipset,
231            self.chain_store().genesis_block_header().timestamp,
232            self.chain_index().shallow_clone(),
233            self.chain_config().shallow_clone(),
234            self.beacon_schedule().shallow_clone(),
235            &self.engine,
236            callback,
237            enable_tracing,
238        )?)
239    }
240}
241
242pub fn validate_tipsets<DB, T>(
243    genesis_timestamp: u64,
244    chain_index: &ChainIndex<DB>,
245    chain_config: &Arc<ChainConfig>,
246    beacon: &Arc<BeaconSchedule>,
247    engine: &MultiEngine,
248    tipsets: T,
249) -> anyhow::Result<()>
250where
251    DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
252    T: Iterator<Item = Tipset> + Send,
253{
254    // Validate one tipset at a time. Parallelizing the outer loop across tipsets
255    // might wedge the global rayon pool.
256    // Sequential outer iteration leaves the entire rayon pool free for that
257    // already-rich inner parallelism.
258    for (child, parent) in tipsets.tuple_windows() {
259        info!(height = parent.epoch(), "compute parent state");
260        let ExecutedTipset {
261            state_root: actual_state,
262            receipt_root: actual_receipt,
263            ..
264        } = apply_block_messages(
265            genesis_timestamp,
266            chain_index.shallow_clone(),
267            chain_config.shallow_clone(),
268            beacon.shallow_clone(),
269            engine,
270            parent,
271            NO_CALLBACK,
272            VMTrace::NotTraced,
273        )
274        .context("couldn't compute tipset state")?;
275        let expected_receipt = child.min_ticket_block().message_receipts;
276        let expected_state = child.parent_state();
277        if (expected_state, expected_receipt) != (&actual_state, actual_receipt) {
278            error!(
279                height = child.epoch(),
280                ?expected_state,
281                ?expected_receipt,
282                ?actual_state,
283                ?actual_receipt,
284                "state mismatch"
285            );
286            bail!("state mismatch");
287        }
288    }
289    Ok(())
290}
291
292/// Shared context for creating VMs and preparing tipset state.
293///
294/// Encapsulates randomness source, genesis info, VM construction,
295/// null-epoch cron handling, and state migrations.
296pub(in crate::state_manager) struct TipsetExecutor<'a, DB: Blockstore + Send + Sync + 'static> {
297    tipset: Tipset,
298    rand: ChainRand<DB>,
299    chain_config: Arc<ChainConfig>,
300    chain_index: ChainIndex<DB>,
301    genesis_info: GenesisInfo,
302    engine: &'a MultiEngine,
303}
304
305impl<'a, DB: Blockstore + Send + Sync + 'static> TipsetExecutor<'a, DB> {
306    pub(in crate::state_manager) fn new(
307        chain_index: ChainIndex<DB>,
308        chain_config: Arc<ChainConfig>,
309        beacon: Arc<BeaconSchedule>,
310        engine: &'a MultiEngine,
311        tipset: Tipset,
312    ) -> Self {
313        let rand = ChainRand::new(
314            chain_config.shallow_clone(),
315            tipset.shallow_clone(),
316            chain_index.shallow_clone(),
317            beacon,
318        );
319        let genesis_info = GenesisInfo::from_chain_config(chain_config.shallow_clone());
320        Self {
321            tipset,
322            rand,
323            chain_config,
324            chain_index,
325            genesis_info,
326            engine,
327        }
328    }
329
330    pub(in crate::state_manager) fn create_vm(
331        &self,
332        state_root: Cid,
333        epoch: ChainEpoch,
334        timestamp: u64,
335        trace: VMTrace,
336    ) -> anyhow::Result<VM<DB>>
337    where
338        DB: EthMappingsStore,
339    {
340        let circ_supply = self.genesis_info.get_vm_circulating_supply(
341            epoch,
342            self.chain_index.db(),
343            &state_root,
344        )?;
345        VM::new(
346            ExecutionContext {
347                heaviest_tipset: self.tipset.shallow_clone(),
348                state_tree_root: state_root,
349                epoch,
350                rand: Box::new(self.rand.shallow_clone()),
351                base_fee: self.tipset.min_ticket_block().parent_base_fee.clone(),
352                circ_supply,
353                chain_config: self.chain_config.shallow_clone(),
354                chain_index: self.chain_index.shallow_clone(),
355                timestamp,
356            },
357            self.engine,
358            trace,
359        )
360    }
361
362    /// Produces the state root ready for message execution by running
363    /// null-epoch `crons` and any pending state migrations.
364    pub(in crate::state_manager) fn prepare_parent_state<F>(
365        &self,
366        genesis_timestamp: u64,
367        null_epoch_trace: VMTrace,
368        cron_callback: &mut Option<F>,
369    ) -> anyhow::Result<(Cid, ChainEpoch, Vec<BlockMessages>)>
370    where
371        DB: EthMappingsStore,
372        F: FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>,
373    {
374        use crate::shim::clock::EPOCH_DURATION_SECONDS;
375
376        let mut parent_state = *self.tipset.parent_state();
377        let parent_epoch = self
378            .chain_index
379            .load_required_tipset(self.tipset.parents())?
380            .epoch();
381        let epoch = self.tipset.epoch();
382
383        for epoch_i in parent_epoch..epoch {
384            if epoch_i > parent_epoch {
385                let timestamp = genesis_timestamp + ((EPOCH_DURATION_SECONDS * epoch_i) as u64);
386                parent_state = stacker::grow(64 << 20, || -> anyhow::Result<Cid> {
387                    let mut vm =
388                        self.create_vm(parent_state, epoch_i, timestamp, null_epoch_trace)?;
389                    if let Err(e) = vm.run_cron(epoch_i, cron_callback.as_mut()) {
390                        error!("Beginning of epoch cron failed to run: {e:#}");
391                        return Err(e);
392                    }
393                    vm.flush()
394                })?;
395            }
396            if let Some(new_state) = run_state_migrations(
397                epoch_i,
398                &self.chain_config,
399                self.chain_index.db(),
400                &parent_state,
401            )? {
402                parent_state = new_state;
403            }
404        }
405
406        let block_messages = BlockMessages::for_tipset(self.chain_index.db(), &self.tipset)?;
407        Ok((parent_state, epoch, block_messages))
408    }
409}
410
411/// Messages are transactions that produce new states. The state (usually
412/// referred to as the 'state-tree') is a mapping from actor addresses to actor
413/// states. Each block contains the hash of the state-tree that should be used
414/// as the starting state when executing the block messages.
415///
416/// # Execution environment
417///
418/// Transaction execution has the following inputs:
419/// - a current state-tree (stored as IPLD in a key-value database). This
420///   reference is in [`Tipset::parent_state`].
421/// - up to 900 past state-trees. See
422///   <https://docs.filecoin.io/reference/general/glossary/#finality>.
423/// - up to 900 past tipset IDs.
424/// - a deterministic source of randomness.
425/// - the circulating supply of FIL (see
426///   <https://filecoin.io/blog/filecoin-circulating-supply/>). The circulating
427///   supply is determined by the epoch and the states of a few key actors.
428/// - the base fee (see <https://spec.filecoin.io/systems/filecoin_vm/gas_fee/>).
429///   This value is defined by `tipset.parent_base_fee`.
430/// - the genesis timestamp (UNIX epoch time when the first block was
431///   mined/created).
432/// - a chain configuration (maps epoch to network version, has chain specific
433///   settings).
434///
435/// The result of running a set of block messages is an index to the final
436/// state-tree and an index to an array of message receipts (listing gas used,
437/// return codes, etc).
438///
439/// # Cron and null tipsets
440///
441/// Once per epoch, after all messages have run, a special 'cron' transaction
442/// must be executed. The tasks of the 'cron' transaction include running batch
443/// jobs and keeping the state up-to-date with the current epoch.
444///
445/// It can happen that no blocks are mined in an epoch. The tipset for such an
446/// epoch is called a null tipset. A null tipset has no identity and cannot be
447/// directly executed. This is a problem for 'cron' which must run for every
448/// epoch, even if there are no messages. The fix is to run 'cron' if there are
449/// any null tipsets between the current epoch and the parent epoch.
450///
451/// Imagine the blockchain looks like this with a null tipset at epoch 9:
452///
453/// ```text
454/// ┌────────┐ ┌────┐ ┌───────┐  ┌───────┐
455/// │Epoch 10│ │Null│ │Epoch 8├──►Epoch 7├─►
456/// └───┬────┘ └────┘ └───▲───┘  └───────┘
457///     └─────────────────┘
458/// ```
459///
460/// The parent of tipset-epoch-10 is tipset-epoch-8. Before executing the
461/// messages in epoch 10, we have to run cron for epoch 9. However, running
462/// 'cron' requires the timestamp of the youngest block in the tipset (which
463/// doesn't exist because there are no blocks in the tipset). Lotus dictates that
464/// the timestamp of a null tipset is `30s * epoch` after the genesis timestamp.
465/// So, in the above example, if the genesis block was mined at time `X`, the
466/// null tipset for epoch 9 will have timestamp `X + 30 * 9`.
467///
468/// # Migrations
469///
470/// Migrations happen between network upgrades and modify the state tree. If a
471/// migration is scheduled for epoch 10, it will be run _after_ the messages for
472/// epoch 10. The tipset for epoch 11 will link the state-tree produced by the
473/// migration.
474///
475/// Example timeline with a migration at epoch 10:
476///   1. Tipset-epoch-10 executes, producing state-tree A.
477///   2. Migration consumes state-tree A and produces state-tree B.
478///   3. Tipset-epoch-11 executes, consuming state-tree B (rather than A).
479///
480/// Note: The migration actually happens when tipset-epoch-11 executes. This is
481///       because tipset-epoch-10 may be null and therefore not executed at all.
482///
483/// # Caching
484///
485/// Scanning the blockchain to find past tipsets and state-trees may be slow.
486/// The `ChainStore` caches recent tipsets to make these scans faster.
487#[allow(clippy::too_many_arguments)]
488pub fn apply_block_messages<DB>(
489    genesis_timestamp: u64,
490    chain_index: ChainIndex<DB>,
491    chain_config: Arc<ChainConfig>,
492    beacon: Arc<BeaconSchedule>,
493    engine: &MultiEngine,
494    tipset: Tipset,
495    mut callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
496    enable_tracing: VMTrace,
497) -> anyhow::Result<ExecutedTipset>
498where
499    DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
500{
501    // This function will:
502    // 1. handle the genesis block as a special case
503    // 2. run 'cron' for any null-tipsets between the current tipset and our parent tipset
504    // 3. run migrations
505    // 4. execute block messages
506    // 5. write the state-tree to the DB and return the CID
507
508    // step 1: special case for genesis block
509    if tipset.epoch() == 0 {
510        // NB: This is here because the process that executes blocks requires that the
511        // block miner reference a valid miner in the state tree. Unless we create some
512        // magical genesis miner, this won't work properly, so we short circuit here
513        // This avoids the question of 'who gets paid the genesis block reward'
514        let message_receipts = tipset.min_ticket_block().message_receipts;
515        return Ok(ExecutedTipset {
516            state_root: *tipset.parent_state(),
517            receipt_root: message_receipts,
518            executed_messages: vec![].into(),
519        });
520    }
521
522    let exec = TipsetExecutor::new(
523        chain_index.shallow_clone(),
524        chain_config,
525        beacon,
526        engine,
527        tipset.shallow_clone(),
528    );
529
530    // step 2: running cron for any null-tipsets
531    // step 3: run migrations
532    let (parent_state, epoch, block_messages) =
533        exec.prepare_parent_state(genesis_timestamp, enable_tracing, &mut callback)?;
534
535    // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from
536    // FVM, but that introduces some constraints, and possible deadlocks.
537    stacker::grow(64 << 20, || -> anyhow::Result<ExecutedTipset> {
538        let mut vm = exec.create_vm(parent_state, epoch, tipset.min_timestamp(), enable_tracing)?;
539
540        // step 4: apply tipset messages
541        let (receipts, events, events_roots) =
542            vm.apply_block_messages(&block_messages, epoch, callback)?;
543
544        // step 5: construct receipt root from receipts
545        let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts.iter())?;
546
547        // step 6: store events AMTs in the blockstore
548        for (events, events_root) in events.iter().zip(events_roots.iter()) {
549            if let Some(events) = events {
550                let event_root =
551                    events_root.context("events root should be present when events present")?;
552                // Store the events AMT - the root CID should match the one computed by FVM
553                let derived_event_root = Amt::new_from_iter_with_bit_width(
554                    chain_index.db(),
555                    EVENTS_AMT_BITWIDTH,
556                    events.iter(),
557                )
558                .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;
559
560                // Verify the stored root matches the FVM-computed root
561                ensure!(
562                    derived_event_root == event_root,
563                    "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
564                );
565            }
566        }
567
568        let state_root = vm.flush()?;
569
570        // Update executed tipset cache
571        let messages: Vec<ChainMessage> = block_messages
572            .into_iter()
573            .flat_map(|bm| bm.messages)
574            .collect_vec();
575        anyhow::ensure!(
576            messages.len() == receipts.len() && messages.len() == events.len(),
577            "length of messages, receipts, and events should match",
578        );
579        Ok(ExecutedTipset {
580            state_root,
581            receipt_root,
582            executed_messages: messages
583                .into_iter()
584                .zip(receipts)
585                .zip(events)
586                .map(|((message, receipt), events)| ExecutedMessage {
587                    message,
588                    receipt,
589                    events,
590                })
591                .collect_vec()
592                .into(),
593        })
594    })
595}
596
597#[allow(clippy::too_many_arguments)]
598pub(in crate::state_manager) fn compute_state<DB>(
599    _height: ChainEpoch,
600    messages: Vec<Message>,
601    tipset: Tipset,
602    genesis_timestamp: u64,
603    chain_index: ChainIndex<DB>,
604    chain_config: Arc<ChainConfig>,
605    beacon: Arc<BeaconSchedule>,
606    engine: &MultiEngine,
607    callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
608    enable_tracing: VMTrace,
609) -> anyhow::Result<ExecutedTipset>
610where
611    DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
612{
613    if !messages.is_empty() {
614        anyhow::bail!("Applying messages is not yet implemented.");
615    }
616
617    let output = apply_block_messages(
618        genesis_timestamp,
619        chain_index,
620        chain_config,
621        beacon,
622        engine,
623        tipset,
624        callback,
625        enable_tracing,
626    )?;
627
628    Ok(output)
629}