Skip to main content

forest/state_manager/
execution.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::state_computation::{TipsetExecutor, apply_block_messages, validate_tipsets};
5use super::utils::structured;
6use super::*;
7use crate::interpreter::{CalledAt, VMTrace};
8use crate::rpc::state::{ApiInvocResult, MessageGasCost};
9use crate::utils::ShallowClone as _;
10use anyhow::{Context as _, bail};
11use num_traits::identities::Zero;
12use std::ops::RangeInclusive;
13
14impl<DB> StateManager<DB>
15where
16    DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
17{
18    /// Replays the given message and returns the result of executing the
19    /// indicated message, assuming it was executed in the indicated tipset.
20    pub async fn replay(self: &Arc<Self>, ts: Tipset, mcid: Cid) -> Result<ApiInvocResult, Error> {
21        let this = Arc::clone(self);
22        tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid)).await?
23    }
24
25    /// Blocking version of `replay`
26    pub fn replay_blocking(
27        self: &Arc<Self>,
28        ts: Tipset,
29        mcid: Cid,
30    ) -> Result<ApiInvocResult, Error> {
31        const REPLAY_HALT: &str = "replay_halt";
32
33        let mut api_invoc_result = None;
34        let callback = |ctx: MessageCallbackCtx<'_>| {
35            match ctx.at {
36                CalledAt::Applied | CalledAt::Reward
37                    if api_invoc_result.is_none() && ctx.cid == mcid =>
38                {
39                    api_invoc_result = Some(ApiInvocResult {
40                        msg_cid: ctx.message.cid(),
41                        msg: ctx.message.message().clone(),
42                        msg_rct: Some(ctx.apply_ret.msg_receipt()),
43                        error: ctx.apply_ret.failure_info().unwrap_or_default(),
44                        duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
45                        gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
46                        execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
47                            .unwrap_or_default(),
48                    });
49                    anyhow::bail!(REPLAY_HALT);
50                }
51                _ => Ok(()), // ignored
52            }
53        };
54        let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced);
55        if let Err(error_message) = result
56            && error_message.to_string() != REPLAY_HALT
57        {
58            return Err(Error::Other(format!(
59                "unexpected error during execution : {error_message:}"
60            )));
61        }
62        api_invoc_result.ok_or_else(|| Error::Other("failed to replay".into()))
63    }
64
65    /// Replays a tipset up to a target message, capturing the state root before
66    /// and after execution.
67    pub async fn replay_for_prestate(
68        self: &Arc<Self>,
69        ts: Tipset,
70        target_message_cid: Cid,
71    ) -> Result<(Cid, ApiInvocResult, Cid), Error> {
72        let this = Arc::clone(self);
73        tokio::task::spawn_blocking(move || {
74            this.replay_for_prestate_blocking(ts, target_message_cid)
75        })
76        .await
77        .map_err(|e| Error::Other(format!("{e}")))?
78    }
79
80    fn replay_for_prestate_blocking(
81        self: &Arc<Self>,
82        ts: Tipset,
83        target_msg_cid: Cid,
84    ) -> Result<(Cid, ApiInvocResult, Cid), Error> {
85        if ts.epoch() == 0 {
86            return Err(Error::Other(
87                "cannot trace messages in the genesis block".into(),
88            ));
89        }
90
91        let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
92        let exec = TipsetExecutor::new(
93            self.chain_index().shallow_clone(),
94            self.chain_config().shallow_clone(),
95            self.beacon_schedule().shallow_clone(),
96            &self.engine,
97            ts.shallow_clone(),
98        );
99        let mut no_cb = NO_CALLBACK;
100        let (parent_state, epoch, block_messages) =
101            exec.prepare_parent_state(genesis_timestamp, VMTrace::NotTraced, &mut no_cb)?;
102
103        Ok(stacker::grow(64 << 20, || {
104            let mut vm =
105                exec.create_vm(parent_state, epoch, ts.min_timestamp(), VMTrace::NotTraced)?;
106            let mut processed = ahash::HashSet::default();
107
108            for block in block_messages.iter() {
109                let mut penalty = TokenAmount::zero();
110                let mut gas_reward = TokenAmount::zero();
111
112                for msg in block.messages.iter() {
113                    let cid = msg.cid();
114                    if processed.contains(&cid) {
115                        continue;
116                    }
117
118                    processed.insert(cid);
119
120                    if cid == target_msg_cid {
121                        let pre_root = vm.flush()?;
122                        let mut traced_vm =
123                            exec.create_vm(pre_root, epoch, ts.min_timestamp(), VMTrace::Traced)?;
124                        let (ret, duration) = traced_vm.apply_message(msg)?;
125                        let post_root = traced_vm.flush()?;
126
127                        return Ok((
128                            pre_root,
129                            ApiInvocResult {
130                                msg_cid: cid,
131                                msg: msg.message().clone(),
132                                msg_rct: Some(ret.msg_receipt()),
133                                error: ret.failure_info().unwrap_or_default(),
134                                duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
135                                gas_cost: MessageGasCost::default(),
136                                execution_trace: structured::parse_events(ret.exec_trace())
137                                    .unwrap_or_default(),
138                            },
139                            post_root,
140                        ));
141                    }
142
143                    let (ret, _) = vm.apply_message(msg)?;
144                    gas_reward += ret.miner_tip();
145                    penalty += ret.penalty();
146                }
147
148                if let Some(rew_msg) =
149                    vm.reward_message(epoch, block.miner, block.win_count, penalty, gas_reward)?
150                {
151                    let (ret, _) = vm.apply_implicit_message(&rew_msg)?;
152                    if let Some(err) = ret.failure_info() {
153                        bail!(
154                            "failed to apply reward message for miner {}: {err}",
155                            block.miner
156                        );
157                    }
158
159                    // This is more of a sanity check, this should not be able to be hit.
160                    if !ret.msg_receipt().exit_code().is_success() {
161                        bail!(
162                            "reward application message failed (exit: {:?})",
163                            ret.msg_receipt().exit_code()
164                        );
165                    }
166                }
167            }
168
169            bail!("message {target_msg_cid} not found in tipset")
170        })?)
171    }
172
173    /// Validates all tipsets at epoch `start..=end` behind the heaviest tipset.
174    ///
175    /// Tipsets are processed sequentially. The compute-intensive work inside each
176    /// tipset (`bellperson` proof verification, FVM batch seal verification, etc.)
177    /// is already heavily rayon-parallelized. Parallelizing the outer loop actually introduces
178    /// some issues due to locks in the aforementioned crates. So don't do it.
179    ///
180    /// # What is validation?
181    /// Every state transition returns a new _state root_, which is typically retained in, e.g., snapshots.
182    /// For "full" snapshots, all state roots are retained.
183    /// For standard snapshots, the last 2000 or so state roots are retained.
184    ///
185    /// _receipts_ meanwhile, are typically ephemeral, but each tipset knows the _receipt root_
186    /// (hash) of the previous tipset.
187    ///
188    /// This function takes advantage of that fact to validate tipsets:
189    /// - `tipset[N]` claims that `receipt_root[N-1]` should be `0xDEADBEEF`
190    /// - find `tipset[N-1]`, and perform its state transition to get the actual `receipt_root`
191    /// - assert that they match
192    ///
193    /// See [`Self::compute_tipset_state_blocking`] for an explanation of state transitions.
194    #[tracing::instrument(skip(self))]
195    pub fn validate_range(&self, epochs: RangeInclusive<i64>) -> anyhow::Result<()> {
196        let heaviest = self.heaviest_tipset();
197        let heaviest_epoch = heaviest.epoch();
198        let end = self.chain_index().load_required_tipset_by_height(
199            *epochs.end(),
200            heaviest,
201            ResolveNullTipset::TakeOlder,
202        ).with_context(|| {
203            format!(
204                "couldn't get a tipset at height {} behind heaviest tipset at height {heaviest_epoch}",
205                *epochs.end(),
206            )})?;
207
208        // lookup tipset parents as we go along, iterating DOWN from `end`
209        let tipsets = end
210            .chain(self.blockstore())
211            .take_while(|ts| ts.epoch() >= *epochs.start());
212
213        self.validate_tipsets(tipsets)
214    }
215
216    pub fn validate_tipsets<T>(&self, tipsets: T) -> anyhow::Result<()>
217    where
218        T: Iterator<Item = Tipset> + Send,
219    {
220        let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
221        validate_tipsets(
222            genesis_timestamp,
223            self.chain_index(),
224            self.chain_config(),
225            self.beacon_schedule(),
226            &self.engine,
227            tipsets,
228        )
229    }
230
231    pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec<ApiInvocResult>)> {
232        let mut invoc_trace = vec![];
233
234        let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
235
236        let callback = |ctx: MessageCallbackCtx<'_>| {
237            match ctx.at {
238                CalledAt::Applied | CalledAt::Reward => {
239                    invoc_trace.push(ApiInvocResult {
240                        msg_cid: ctx.message.cid(),
241                        msg: ctx.message.message().clone(),
242                        msg_rct: Some(ctx.apply_ret.msg_receipt()),
243                        error: ctx.apply_ret.failure_info().unwrap_or_default(),
244                        duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
245                        gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
246                        execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
247                            .unwrap_or_default(),
248                    });
249                    Ok(())
250                }
251                _ => Ok(()), // ignored
252            }
253        };
254
255        let ExecutedTipset { state_root, .. } = apply_block_messages(
256            genesis_timestamp,
257            self.chain_index().shallow_clone(),
258            self.chain_config().shallow_clone(),
259            self.beacon_schedule().shallow_clone(),
260            &self.engine,
261            tipset.shallow_clone(),
262            Some(callback),
263            VMTrace::Traced,
264        )?;
265
266        Ok((state_root, invoc_trace))
267    }
268}