1use super::state_computation::{TipsetExecutor, apply_block_messages, validate_tipsets};
5use super::utils::structured;
6use super::*;
7use crate::interpreter::{CalledAt, VMTrace};
8use crate::rpc::state::{ApiInvocResult, MessageGasCost};
9use crate::utils::ShallowClone as _;
10use anyhow::{Context as _, bail};
11use num_traits::identities::Zero;
12use std::ops::RangeInclusive;
13
14impl<DB> StateManager<DB>
15where
16 DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
17{
18 pub async fn replay(self: &Arc<Self>, ts: Tipset, mcid: Cid) -> Result<ApiInvocResult, Error> {
21 let this = Arc::clone(self);
22 tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid)).await?
23 }
24
25 pub fn replay_blocking(
27 self: &Arc<Self>,
28 ts: Tipset,
29 mcid: Cid,
30 ) -> Result<ApiInvocResult, Error> {
31 const REPLAY_HALT: &str = "replay_halt";
32
33 let mut api_invoc_result = None;
34 let callback = |ctx: MessageCallbackCtx<'_>| {
35 match ctx.at {
36 CalledAt::Applied | CalledAt::Reward
37 if api_invoc_result.is_none() && ctx.cid == mcid =>
38 {
39 api_invoc_result = Some(ApiInvocResult {
40 msg_cid: ctx.message.cid(),
41 msg: ctx.message.message().clone(),
42 msg_rct: Some(ctx.apply_ret.msg_receipt()),
43 error: ctx.apply_ret.failure_info().unwrap_or_default(),
44 duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
45 gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
46 execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
47 .unwrap_or_default(),
48 });
49 anyhow::bail!(REPLAY_HALT);
50 }
51 _ => Ok(()), }
53 };
54 let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced);
55 if let Err(error_message) = result
56 && error_message.to_string() != REPLAY_HALT
57 {
58 return Err(Error::Other(format!(
59 "unexpected error during execution : {error_message:}"
60 )));
61 }
62 api_invoc_result.ok_or_else(|| Error::Other("failed to replay".into()))
63 }
64
65 pub async fn replay_for_prestate(
68 self: &Arc<Self>,
69 ts: Tipset,
70 target_message_cid: Cid,
71 ) -> Result<(Cid, ApiInvocResult, Cid), Error> {
72 let this = Arc::clone(self);
73 tokio::task::spawn_blocking(move || {
74 this.replay_for_prestate_blocking(ts, target_message_cid)
75 })
76 .await
77 .map_err(|e| Error::Other(format!("{e}")))?
78 }
79
80 fn replay_for_prestate_blocking(
81 self: &Arc<Self>,
82 ts: Tipset,
83 target_msg_cid: Cid,
84 ) -> Result<(Cid, ApiInvocResult, Cid), Error> {
85 if ts.epoch() == 0 {
86 return Err(Error::Other(
87 "cannot trace messages in the genesis block".into(),
88 ));
89 }
90
91 let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
92 let exec = TipsetExecutor::new(
93 self.chain_index().shallow_clone(),
94 self.chain_config().shallow_clone(),
95 self.beacon_schedule().shallow_clone(),
96 &self.engine,
97 ts.shallow_clone(),
98 );
99 let mut no_cb = NO_CALLBACK;
100 let (parent_state, epoch, block_messages) =
101 exec.prepare_parent_state(genesis_timestamp, VMTrace::NotTraced, &mut no_cb)?;
102
103 Ok(stacker::grow(64 << 20, || {
104 let mut vm =
105 exec.create_vm(parent_state, epoch, ts.min_timestamp(), VMTrace::NotTraced)?;
106 let mut processed = ahash::HashSet::default();
107
108 for block in block_messages.iter() {
109 let mut penalty = TokenAmount::zero();
110 let mut gas_reward = TokenAmount::zero();
111
112 for msg in block.messages.iter() {
113 let cid = msg.cid();
114 if processed.contains(&cid) {
115 continue;
116 }
117
118 processed.insert(cid);
119
120 if cid == target_msg_cid {
121 let pre_root = vm.flush()?;
122 let mut traced_vm =
123 exec.create_vm(pre_root, epoch, ts.min_timestamp(), VMTrace::Traced)?;
124 let (ret, duration) = traced_vm.apply_message(msg)?;
125 let post_root = traced_vm.flush()?;
126
127 return Ok((
128 pre_root,
129 ApiInvocResult {
130 msg_cid: cid,
131 msg: msg.message().clone(),
132 msg_rct: Some(ret.msg_receipt()),
133 error: ret.failure_info().unwrap_or_default(),
134 duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
135 gas_cost: MessageGasCost::default(),
136 execution_trace: structured::parse_events(ret.exec_trace())
137 .unwrap_or_default(),
138 },
139 post_root,
140 ));
141 }
142
143 let (ret, _) = vm.apply_message(msg)?;
144 gas_reward += ret.miner_tip();
145 penalty += ret.penalty();
146 }
147
148 if let Some(rew_msg) =
149 vm.reward_message(epoch, block.miner, block.win_count, penalty, gas_reward)?
150 {
151 let (ret, _) = vm.apply_implicit_message(&rew_msg)?;
152 if let Some(err) = ret.failure_info() {
153 bail!(
154 "failed to apply reward message for miner {}: {err}",
155 block.miner
156 );
157 }
158
159 if !ret.msg_receipt().exit_code().is_success() {
161 bail!(
162 "reward application message failed (exit: {:?})",
163 ret.msg_receipt().exit_code()
164 );
165 }
166 }
167 }
168
169 bail!("message {target_msg_cid} not found in tipset")
170 })?)
171 }
172
173 #[tracing::instrument(skip(self))]
195 pub fn validate_range(&self, epochs: RangeInclusive<i64>) -> anyhow::Result<()> {
196 let heaviest = self.heaviest_tipset();
197 let heaviest_epoch = heaviest.epoch();
198 let end = self.chain_index().load_required_tipset_by_height(
199 *epochs.end(),
200 heaviest,
201 ResolveNullTipset::TakeOlder,
202 ).with_context(|| {
203 format!(
204 "couldn't get a tipset at height {} behind heaviest tipset at height {heaviest_epoch}",
205 *epochs.end(),
206 )})?;
207
208 let tipsets = end
210 .chain(self.blockstore())
211 .take_while(|ts| ts.epoch() >= *epochs.start());
212
213 self.validate_tipsets(tipsets)
214 }
215
216 pub fn validate_tipsets<T>(&self, tipsets: T) -> anyhow::Result<()>
217 where
218 T: Iterator<Item = Tipset> + Send,
219 {
220 let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
221 validate_tipsets(
222 genesis_timestamp,
223 self.chain_index(),
224 self.chain_config(),
225 self.beacon_schedule(),
226 &self.engine,
227 tipsets,
228 )
229 }
230
231 pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec<ApiInvocResult>)> {
232 let mut invoc_trace = vec![];
233
234 let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;
235
236 let callback = |ctx: MessageCallbackCtx<'_>| {
237 match ctx.at {
238 CalledAt::Applied | CalledAt::Reward => {
239 invoc_trace.push(ApiInvocResult {
240 msg_cid: ctx.message.cid(),
241 msg: ctx.message.message().clone(),
242 msg_rct: Some(ctx.apply_ret.msg_receipt()),
243 error: ctx.apply_ret.failure_info().unwrap_or_default(),
244 duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64,
245 gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
246 execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
247 .unwrap_or_default(),
248 });
249 Ok(())
250 }
251 _ => Ok(()), }
253 };
254
255 let ExecutedTipset { state_root, .. } = apply_block_messages(
256 genesis_timestamp,
257 self.chain_index().shallow_clone(),
258 self.chain_config().shallow_clone(),
259 self.beacon_schedule().shallow_clone(),
260 &self.engine,
261 tipset.shallow_clone(),
262 Some(callback),
263 VMTrace::Traced,
264 )?;
265
266 Ok((state_root, invoc_trace))
267 }
268}