1use super::circulating_supply::GenesisInfo;
5use super::*;
6use crate::db::EthMappingsStore;
7use crate::interpreter::{BlockMessages, ExecutionContext, VM, VMTrace};
8use crate::shim::message::Message;
9use crate::state_migration::run_state_migrations;
10use crate::utils::ShallowClone as _;
11use anyhow::{Context as _, bail, ensure};
12use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0};
13use itertools::Itertools as _;
14use tracing::{error, info, instrument};
15
16impl<DB> StateManager<DB>
17where
18 DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
19{
20 pub async fn load_tipset_state(self: &Arc<Self>, ts: &Tipset) -> anyhow::Result<TipsetState> {
22 if let Some(state) = self.cache.get_map(ts.key(), |et| et.into()) {
23 Ok(state)
24 } else {
25 match self.chain_store().load_child_tipset(ts)? {
26 Some(receipt_ts) => Ok(TipsetState {
27 state_root: *receipt_ts.parent_state(),
28 receipt_root: *receipt_ts.parent_message_receipts(),
29 }),
30 None => Ok(self.load_executed_tipset(ts).await?.into()),
31 }
32 }
33 }
34
35 pub async fn load_executed_tipset(
37 self: &Arc<Self>,
38 ts: &Tipset,
39 ) -> anyhow::Result<ExecutedTipset> {
40 if ts.epoch() >= self.heaviest_tipset().epoch()
42 && let Some(cached) = self.cache.get(ts.key())
43 {
44 if StateTree::new_from_root(self.blockstore_owned(), &cached.state_root).is_ok() {
45 return Ok(cached);
46 } else {
47 self.cache.remove(ts.key());
48 }
49 }
50 self.cache
51 .get_or_else(ts.key(), || async move {
52 let receipt_ts = self.chain_store().load_child_tipset(ts)?;
53 self.load_executed_tipset_inner(ts, receipt_ts.as_ref())
54 .await
55 })
56 .await
57 }
58
59 async fn load_executed_tipset_inner(
60 self: &Arc<Self>,
61 msg_ts: &Tipset,
62 receipt_ts: Option<&Tipset>,
64 ) -> anyhow::Result<ExecutedTipset> {
65 if let Some(receipt_ts) = receipt_ts {
66 anyhow::ensure!(
67 msg_ts.key() == receipt_ts.parents(),
68 "message tipset should be the parent of message receipt tipset"
69 );
70 }
71 let mut recomputed = false;
72 let (state_root, receipt_root, receipts) = match receipt_ts.and_then(|ts| {
73 let receipt_root = *ts.parent_message_receipts();
74 Receipt::get_receipts(self.cs.blockstore(), receipt_root)
75 .ok()
76 .map(|r| (*ts.parent_state(), receipt_root, r))
77 }) {
78 Some((state_root, receipt_root, receipts)) => (state_root, receipt_root, receipts),
79 None => {
80 let state_output = self
81 .compute_tipset_state(msg_ts.shallow_clone(), NO_CALLBACK, VMTrace::NotTraced)
82 .await?;
83 recomputed = true;
84 (
85 state_output.state_root,
86 state_output.receipt_root,
87 Receipt::get_receipts(self.cs.blockstore(), state_output.receipt_root)?,
88 )
89 }
90 };
91
92 let messages = self.chain_store().messages_for_tipset(msg_ts)?;
93 anyhow::ensure!(
94 messages.len() == receipts.len(),
95 "mismatching message and receipt counts ({} messages, {} receipts)",
96 messages.len(),
97 receipts.len()
98 );
99 let mut executed_messages = Vec::with_capacity(messages.len());
100 for (message, receipt) in messages.iter().cloned().zip(receipts) {
101 let events = if let Some(events_root) = receipt.events_root() {
102 Some(
103 match StampedEvent::get_events(self.cs.blockstore(), &events_root) {
104 Ok(events) => events,
105 Err(e) if recomputed => return Err(e),
106 Err(_) => {
107 self.compute_tipset_state(
108 msg_ts.shallow_clone(),
109 NO_CALLBACK,
110 VMTrace::NotTraced,
111 )
112 .await?;
113 recomputed = true;
114 StampedEvent::get_events(self.cs.blockstore(), &events_root)?
115 }
116 },
117 )
118 } else {
119 None
120 };
121 executed_messages.push(ExecutedMessage {
122 message,
123 receipt,
124 events,
125 });
126 }
127 Ok(ExecutedTipset {
128 state_root,
129 receipt_root,
130 executed_messages: Arc::new(executed_messages),
131 })
132 }
133
134 pub async fn compute_tipset_state(
156 self: &Arc<Self>,
157 tipset: Tipset,
158 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
159 enable_tracing: VMTrace,
160 ) -> Result<ExecutedTipset, Error> {
161 let this = Arc::clone(self);
162 tokio::task::spawn_blocking(move || {
163 this.compute_tipset_state_blocking(tipset, callback, enable_tracing)
164 })
165 .await?
166 }
167
168 pub fn compute_tipset_state_blocking(
170 &self,
171 tipset: Tipset,
172 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
173 enable_tracing: VMTrace,
174 ) -> Result<ExecutedTipset, Error> {
175 let epoch = tipset.epoch();
176 let has_callback = callback.is_some();
177 info!(
178 "Evaluating tipset: EPOCH={epoch}, blocks={}, tsk={}",
179 tipset.len(),
180 tipset.key(),
181 );
182 Ok(apply_block_messages(
183 self.chain_store().genesis_block_header().timestamp,
184 self.chain_index().shallow_clone(),
185 self.chain_config().shallow_clone(),
186 self.beacon_schedule().shallow_clone(),
187 &self.engine,
188 tipset,
189 callback,
190 enable_tracing,
191 )
192 .map_err(|e| {
193 if has_callback {
194 e
195 } else {
196 e.context(format!("Failed to compute tipset state@{epoch}"))
197 }
198 })?)
199 }
200
201 #[instrument(skip_all)]
202 pub async fn compute_state(
203 self: &Arc<Self>,
204 height: ChainEpoch,
205 messages: Vec<Message>,
206 tipset: Tipset,
207 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()> + Send + 'static>,
208 enable_tracing: VMTrace,
209 ) -> Result<ExecutedTipset, Error> {
210 let this = Arc::clone(self);
211 tokio::task::spawn_blocking(move || {
212 this.compute_state_blocking(height, messages, tipset, callback, enable_tracing)
213 })
214 .await?
215 }
216
217 #[tracing::instrument(skip_all)]
219 pub fn compute_state_blocking(
220 &self,
221 height: ChainEpoch,
222 messages: Vec<Message>,
223 tipset: Tipset,
224 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
225 enable_tracing: VMTrace,
226 ) -> Result<ExecutedTipset, Error> {
227 Ok(compute_state(
228 height,
229 messages,
230 tipset,
231 self.chain_store().genesis_block_header().timestamp,
232 self.chain_index().shallow_clone(),
233 self.chain_config().shallow_clone(),
234 self.beacon_schedule().shallow_clone(),
235 &self.engine,
236 callback,
237 enable_tracing,
238 )?)
239 }
240}
241
242pub fn validate_tipsets<DB, T>(
243 genesis_timestamp: u64,
244 chain_index: &ChainIndex<DB>,
245 chain_config: &Arc<ChainConfig>,
246 beacon: &Arc<BeaconSchedule>,
247 engine: &MultiEngine,
248 tipsets: T,
249) -> anyhow::Result<()>
250where
251 DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
252 T: Iterator<Item = Tipset> + Send,
253{
254 for (child, parent) in tipsets.tuple_windows() {
259 info!(height = parent.epoch(), "compute parent state");
260 let ExecutedTipset {
261 state_root: actual_state,
262 receipt_root: actual_receipt,
263 ..
264 } = apply_block_messages(
265 genesis_timestamp,
266 chain_index.shallow_clone(),
267 chain_config.shallow_clone(),
268 beacon.shallow_clone(),
269 engine,
270 parent,
271 NO_CALLBACK,
272 VMTrace::NotTraced,
273 )
274 .context("couldn't compute tipset state")?;
275 let expected_receipt = child.min_ticket_block().message_receipts;
276 let expected_state = child.parent_state();
277 if (expected_state, expected_receipt) != (&actual_state, actual_receipt) {
278 error!(
279 height = child.epoch(),
280 ?expected_state,
281 ?expected_receipt,
282 ?actual_state,
283 ?actual_receipt,
284 "state mismatch"
285 );
286 bail!("state mismatch");
287 }
288 }
289 Ok(())
290}
291
292pub(in crate::state_manager) struct TipsetExecutor<'a, DB: Blockstore + Send + Sync + 'static> {
297 tipset: Tipset,
298 rand: ChainRand<DB>,
299 chain_config: Arc<ChainConfig>,
300 chain_index: ChainIndex<DB>,
301 genesis_info: GenesisInfo,
302 engine: &'a MultiEngine,
303}
304
305impl<'a, DB: Blockstore + Send + Sync + 'static> TipsetExecutor<'a, DB> {
306 pub(in crate::state_manager) fn new(
307 chain_index: ChainIndex<DB>,
308 chain_config: Arc<ChainConfig>,
309 beacon: Arc<BeaconSchedule>,
310 engine: &'a MultiEngine,
311 tipset: Tipset,
312 ) -> Self {
313 let rand = ChainRand::new(
314 chain_config.shallow_clone(),
315 tipset.shallow_clone(),
316 chain_index.shallow_clone(),
317 beacon,
318 );
319 let genesis_info = GenesisInfo::from_chain_config(chain_config.shallow_clone());
320 Self {
321 tipset,
322 rand,
323 chain_config,
324 chain_index,
325 genesis_info,
326 engine,
327 }
328 }
329
330 pub(in crate::state_manager) fn create_vm(
331 &self,
332 state_root: Cid,
333 epoch: ChainEpoch,
334 timestamp: u64,
335 trace: VMTrace,
336 ) -> anyhow::Result<VM<DB>>
337 where
338 DB: EthMappingsStore,
339 {
340 let circ_supply = self.genesis_info.get_vm_circulating_supply(
341 epoch,
342 self.chain_index.db(),
343 &state_root,
344 )?;
345 VM::new(
346 ExecutionContext {
347 heaviest_tipset: self.tipset.shallow_clone(),
348 state_tree_root: state_root,
349 epoch,
350 rand: Box::new(self.rand.shallow_clone()),
351 base_fee: self.tipset.min_ticket_block().parent_base_fee.clone(),
352 circ_supply,
353 chain_config: self.chain_config.shallow_clone(),
354 chain_index: self.chain_index.shallow_clone(),
355 timestamp,
356 },
357 self.engine,
358 trace,
359 )
360 }
361
362 pub(in crate::state_manager) fn prepare_parent_state<F>(
365 &self,
366 genesis_timestamp: u64,
367 null_epoch_trace: VMTrace,
368 cron_callback: &mut Option<F>,
369 ) -> anyhow::Result<(Cid, ChainEpoch, Vec<BlockMessages>)>
370 where
371 DB: EthMappingsStore,
372 F: FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>,
373 {
374 use crate::shim::clock::EPOCH_DURATION_SECONDS;
375
376 let mut parent_state = *self.tipset.parent_state();
377 let parent_epoch = self
378 .chain_index
379 .load_required_tipset(self.tipset.parents())?
380 .epoch();
381 let epoch = self.tipset.epoch();
382
383 for epoch_i in parent_epoch..epoch {
384 if epoch_i > parent_epoch {
385 let timestamp = genesis_timestamp + ((EPOCH_DURATION_SECONDS * epoch_i) as u64);
386 parent_state = stacker::grow(64 << 20, || -> anyhow::Result<Cid> {
387 let mut vm =
388 self.create_vm(parent_state, epoch_i, timestamp, null_epoch_trace)?;
389 if let Err(e) = vm.run_cron(epoch_i, cron_callback.as_mut()) {
390 error!("Beginning of epoch cron failed to run: {e:#}");
391 return Err(e);
392 }
393 vm.flush()
394 })?;
395 }
396 if let Some(new_state) = run_state_migrations(
397 epoch_i,
398 &self.chain_config,
399 self.chain_index.db(),
400 &parent_state,
401 )? {
402 parent_state = new_state;
403 }
404 }
405
406 let block_messages = BlockMessages::for_tipset(self.chain_index.db(), &self.tipset)?;
407 Ok((parent_state, epoch, block_messages))
408 }
409}
410
411#[allow(clippy::too_many_arguments)]
488pub fn apply_block_messages<DB>(
489 genesis_timestamp: u64,
490 chain_index: ChainIndex<DB>,
491 chain_config: Arc<ChainConfig>,
492 beacon: Arc<BeaconSchedule>,
493 engine: &MultiEngine,
494 tipset: Tipset,
495 mut callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
496 enable_tracing: VMTrace,
497) -> anyhow::Result<ExecutedTipset>
498where
499 DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
500{
501 if tipset.epoch() == 0 {
510 let message_receipts = tipset.min_ticket_block().message_receipts;
515 return Ok(ExecutedTipset {
516 state_root: *tipset.parent_state(),
517 receipt_root: message_receipts,
518 executed_messages: vec![].into(),
519 });
520 }
521
522 let exec = TipsetExecutor::new(
523 chain_index.shallow_clone(),
524 chain_config,
525 beacon,
526 engine,
527 tipset.shallow_clone(),
528 );
529
530 let (parent_state, epoch, block_messages) =
533 exec.prepare_parent_state(genesis_timestamp, enable_tracing, &mut callback)?;
534
535 stacker::grow(64 << 20, || -> anyhow::Result<ExecutedTipset> {
538 let mut vm = exec.create_vm(parent_state, epoch, tipset.min_timestamp(), enable_tracing)?;
539
540 let (receipts, events, events_roots) =
542 vm.apply_block_messages(&block_messages, epoch, callback)?;
543
544 let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts.iter())?;
546
547 for (events, events_root) in events.iter().zip(events_roots.iter()) {
549 if let Some(events) = events {
550 let event_root =
551 events_root.context("events root should be present when events present")?;
552 let derived_event_root = Amt::new_from_iter_with_bit_width(
554 chain_index.db(),
555 EVENTS_AMT_BITWIDTH,
556 events.iter(),
557 )
558 .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;
559
560 ensure!(
562 derived_event_root == event_root,
563 "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
564 );
565 }
566 }
567
568 let state_root = vm.flush()?;
569
570 let messages: Vec<ChainMessage> = block_messages
572 .into_iter()
573 .flat_map(|bm| bm.messages)
574 .collect_vec();
575 anyhow::ensure!(
576 messages.len() == receipts.len() && messages.len() == events.len(),
577 "length of messages, receipts, and events should match",
578 );
579 Ok(ExecutedTipset {
580 state_root,
581 receipt_root,
582 executed_messages: messages
583 .into_iter()
584 .zip(receipts)
585 .zip(events)
586 .map(|((message, receipt), events)| ExecutedMessage {
587 message,
588 receipt,
589 events,
590 })
591 .collect_vec()
592 .into(),
593 })
594 })
595}
596
597#[allow(clippy::too_many_arguments)]
598pub(in crate::state_manager) fn compute_state<DB>(
599 _height: ChainEpoch,
600 messages: Vec<Message>,
601 tipset: Tipset,
602 genesis_timestamp: u64,
603 chain_index: ChainIndex<DB>,
604 chain_config: Arc<ChainConfig>,
605 beacon: Arc<BeaconSchedule>,
606 engine: &MultiEngine,
607 callback: Option<impl FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>>,
608 enable_tracing: VMTrace,
609) -> anyhow::Result<ExecutedTipset>
610where
611 DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
612{
613 if !messages.is_empty() {
614 anyhow::bail!("Applying messages is not yet implemented.");
615 }
616
617 let output = apply_block_messages(
618 genesis_timestamp,
619 chain_index,
620 chain_config,
621 beacon,
622 engine,
623 tipset,
624 callback,
625 enable_tracing,
626 )?;
627
628 Ok(output)
629}