1use std::sync::Arc;
5
6use crate::chain_sync::BadBlockCache;
7use crate::db::EthMappingsStore;
8use crate::networks::Height;
9use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
10use crate::shim::crypto::SignatureType;
11use crate::shim::{
12 address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
13 gas::price_list_by_network_version, message::Message, state_tree::StateTree,
14};
15use crate::state_manager::ExecutedTipset;
16use crate::state_manager::{Error as StateManagerError, StateManager, utils::is_valid_for_sending};
17use crate::utils::ShallowClone as _;
18use crate::{
19 blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
20 fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
21};
22use crate::{
23 chain::{ChainStore, Error as ChainStoreError},
24 metrics::HistogramTimerExt,
25};
26use crate::{
27 eth::is_valid_eth_tx_for_sending,
28 message::{MessageRead as _, valid_for_block_inclusion},
29};
30use ahash::HashMap;
31use cid::Cid;
32use futures::TryFutureExt;
33use fvm_ipld_blockstore::Blockstore;
34use fvm_ipld_encoding::to_vec;
35use itertools::Itertools;
36use nunny::Vec as NonEmpty;
37use thiserror::Error;
38use tokio::task::JoinSet;
39use tracing::{trace, warn};
40
41use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
42
43#[derive(Debug, Error)]
44pub enum TipsetSyncerError {
45 #[error("Block must have a signature")]
46 BlockWithoutSignature,
47 #[error("Block without BLS aggregate signature")]
48 BlockWithoutBlsAggregate,
49 #[error("Block received from the future: now = {0}, block = {1}")]
50 TimeTravellingBlock(u64, u64),
51 #[error("Validation error: {0}")]
52 Validation(String),
53 #[error("Processing error: {0}")]
54 Calculation(String),
55 #[error("Chain store error: {0}")]
56 ChainStore(#[from] ChainStoreError),
57 #[error("StateManager error: {0}")]
58 StateManager(#[from] StateManagerError),
59 #[error("Block error: {0}")]
60 BlockError(#[from] ForestBlockError),
61 #[error("Querying tipsets from the network failed: {0}")]
62 NetworkTipsetQueryFailed(String),
63 #[error("BLS aggregate signature {0} was invalid for msgs {1}")]
64 BlsAggregateSignatureInvalid(String, String),
65 #[error("Message signature invalid: {0}")]
66 MessageSignatureInvalid(String),
67 #[error("Block message root does not match: expected {0}, computed {1}")]
68 BlockMessageRootInvalid(String, String),
69 #[error("Computing message root failed: {0}")]
70 ComputingMessageRoot(String),
71 #[error("Resolving address from message failed: {0}")]
72 ResolvingAddressFromMessage(String),
73 #[error("Loading tipset parent from the store failed: {0}")]
74 TipsetParentNotFound(ChainStoreError),
75 #[error("Consensus error: {0}")]
76 ConsensusError(FilecoinConsensusError),
77}
78
79impl From<tokio::task::JoinError> for TipsetSyncerError {
80 fn from(err: tokio::task::JoinError) -> Self {
81 TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
82 }
83}
84
85impl TipsetSyncerError {
86 fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
89 let msg = errs.iter().map(|e| e.to_string()).collect_vec().join(", ");
90
91 TipsetSyncerError::Validation(msg)
92 }
93}
94
95pub async fn validate_tipset<DB: Blockstore + EthMappingsStore + Send + Sync + 'static>(
100 state_manager: &Arc<StateManager<DB>>,
101 full_tipset: FullTipset,
102 bad_block_cache: Option<Arc<BadBlockCache>>,
103) -> Result<(), TipsetSyncerError> {
104 if full_tipset
105 .key()
106 .eq(state_manager.chain_store().genesis_tipset().key())
107 {
108 trace!("Skipping genesis tipset validation");
109 return Ok(());
110 }
111
112 let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
113
114 let epoch = full_tipset.epoch();
115 let parent_state = *full_tipset.parent_state();
116 let tipset_key = full_tipset.key();
117 trace!("Tipset keys: {tipset_key}");
118 let blocks = full_tipset.into_blocks();
119 let mut validations = JoinSet::new();
120 for b in blocks {
121 validations.spawn(validate_block(state_manager.clone(), Arc::new(b)));
122 }
123
124 while let Some(result) = validations.join_next().await {
125 match result? {
126 Ok(block) => {
127 state_manager
128 .chain_store()
129 .add_to_tipset_tracker(block.header());
130 }
131 Err((cid, why)) => {
132 warn!(
133 "Validating block [CID = {cid}, PARENT_STATE = {parent_state}] in EPOCH = {epoch} failed: {why}",
134 );
135 match &why {
136 TipsetSyncerError::TimeTravellingBlock(_, _) => {
137 }
140 _ => {
141 if StateTree::new_from_root(state_manager.blockstore_owned(), &parent_state)
143 .is_ok()
144 && let Some(bad_block_cache) = bad_block_cache
145 {
146 bad_block_cache.push(cid);
147 }
148 }
149 };
150 return Err(why);
151 }
152 }
153 }
154 drop(timer);
155 Ok(())
156}
157
158async fn validate_block<DB: Blockstore + EthMappingsStore + Sync + Send + 'static>(
178 state_manager: Arc<StateManager<DB>>,
179 block: Arc<Block>,
180) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
181 let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
182 trace!(
183 "Validating block: epoch = {}, weight = {}, key = {}",
184 block.header().epoch,
185 block.header().weight,
186 block.header().cid(),
187 );
188 let chain_store = state_manager.chain_store().clone();
189 let block_cid = block.cid();
190
191 let is_validated = chain_store.is_block_validated(block_cid);
193 if is_validated {
194 return Ok(block);
195 }
196
197 let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
198
199 let header = block.header();
200
201 block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
203 block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
204
205 let base_tipset = chain_store
206 .chain_index()
207 .load_required_tipset(&header.parents)
208 .map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
214
215 let lookback_state = ChainStore::get_lookback_tipset_for_round(
217 state_manager.chain_store().chain_index(),
218 state_manager.chain_config(),
219 &base_tipset,
220 block.header().epoch,
221 )
222 .map_err(|e| (*block_cid, e.into()))
223 .map(|(_, s)| Arc::new(s))?;
224
225 let work_addr = state_manager
228 .get_miner_work_addr(*lookback_state, &header.miner_address)
229 .map_err(|e| (*block_cid, e.into()))?;
230
231 let mut validations = JoinSet::new();
233
234 validations.spawn(check_block_messages(
236 state_manager.shallow_clone(),
237 block.shallow_clone(),
238 base_tipset.shallow_clone(),
239 ));
240
241 validations.spawn_blocking({
243 let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
244 let firehorse_height = state_manager.chain_config().epoch(Height::FireHorse);
245 let base_tipset = base_tipset.shallow_clone();
246 let block_store = state_manager.blockstore_owned();
247 let block = block.shallow_clone();
248 move || {
249 let base_fee = crate::chain::compute_base_fee(
250 &block_store,
251 &base_tipset,
252 smoke_height,
253 firehorse_height,
254 )
255 .map_err(|e| {
256 TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
257 })?;
258 let parent_base_fee = &block.header.parent_base_fee;
259 if &base_fee != parent_base_fee {
260 return Err(TipsetSyncerError::Validation(format!(
261 "base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
262 )));
263 }
264 Ok(())
265 }
266 });
267
268 validations.spawn_blocking({
270 let block_store = state_manager.blockstore_owned();
271 let base_tipset = base_tipset.shallow_clone();
272 let weight = header.weight.clone();
273 move || {
274 let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
275 TipsetSyncerError::Calculation(format!("Error calculating weight: {e:#}"))
276 })?;
277 if weight != calc_weight {
278 return Err(TipsetSyncerError::Validation(format!(
279 "Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
280 )));
281 }
282 Ok(())
283 }
284 });
285
286 validations.spawn({
288 let state_manager = state_manager.clone();
289 let block = block.clone();
290 async move {
291 let header = block.header();
292 let ExecutedTipset {
293 state_root,
294 receipt_root,
295 ..
296 } = state_manager
297 .load_executed_tipset(&base_tipset)
298 .await
299 .map_err(|e| {
300 TipsetSyncerError::Calculation(format!("Failed to calculate state: {e:#}"))
301 })?;
302
303 if state_root != header.state_root {
304 return Err(TipsetSyncerError::Validation(format!(
305 "Parent state root did not match computed state: {} (header), {} (computed)",
306 header.state_root, state_root,
307 )));
308 }
309
310 if receipt_root != header.message_receipts {
311 return Err(TipsetSyncerError::Validation(format!(
312 "Parent receipt root did not match computed root: {} (header), {} (computed)",
313 header.message_receipts, receipt_root
314 )));
315 }
316 Ok(())
317 }
318 });
319
320 validations.spawn_blocking({
322 let block = block.clone();
323 move || {
324 block.header().verify_signature_against(&work_addr)?;
325 Ok(())
326 }
327 });
328
329 validations.spawn({
330 let block = block.clone();
331 async move {
332 consensus
333 .validate_block(state_manager, block)
334 .map_err(|errs| {
335 TipsetSyncerError::concat(
342 errs.into_iter_ne()
343 .map(TipsetSyncerError::ConsensusError)
344 .collect_vec(),
345 )
346 })
347 .await
348 }
349 });
350
351 if let Err(errs) = collect_errs(validations).await {
353 return Err((*block_cid, TipsetSyncerError::concat(errs)));
354 }
355
356 chain_store.mark_block_as_validated(block_cid);
357
358 Ok(block)
359}
360
361async fn check_block_messages<DB: Blockstore + EthMappingsStore + Send + Sync + 'static>(
372 state_manager: Arc<StateManager<DB>>,
373 block: Arc<Block>,
374 base_tipset: Tipset,
375) -> Result<(), TipsetSyncerError> {
376 let network_version = state_manager
377 .chain_config()
378 .network_version(block.header.epoch);
379 let eth_chain_id = state_manager.chain_config().eth_chain_id;
380
381 if let Some(sig) = &block.header().bls_aggregate {
382 let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
385 let mut cids = Vec::with_capacity(block.bls_msgs().len());
386 let db = state_manager.blockstore();
387 for m in block.bls_msgs() {
388 let pk = StateManager::get_bls_public_key(db, &m.from, *base_tipset.parent_state())?;
389 pub_keys.push(pk);
390 cids.push(m.cid().to_bytes());
391 }
392
393 if !verify_bls_aggregate(
394 &cids.iter().map(|x| x.as_slice()).collect_vec(),
395 &pub_keys,
396 sig,
397 ) {
398 return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
399 format!("{sig:?}"),
400 format!("{cids:?}"),
401 ));
402 }
403 } else {
404 return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
405 }
406
407 let price_list = price_list_by_network_version(network_version);
408 let mut sum_gas_limit = 0;
409
410 let mut check_msg = |msg: &Message,
412 account_sequences: &mut HashMap<Address, u64>,
413 tree: &StateTree<DB>|
414 -> anyhow::Result<()> {
415 let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
417 valid_for_block_inclusion(msg, min_gas.total(), network_version)
418 .map_err(|e| anyhow::anyhow!("{}", e))?;
419 sum_gas_limit += msg.gas_limit;
420 if sum_gas_limit > BLOCK_GAS_LIMIT {
421 anyhow::bail!("block gas limit exceeded");
422 }
423
424 let sequence: u64 = match account_sequences.get(&msg.from()) {
427 Some(sequence) => *sequence,
428 None => {
429 let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
430 anyhow::anyhow!(
431 "Failed to retrieve nonce for addr: Actor does not exist in state"
432 )
433 })?;
434 let network_version = state_manager
435 .chain_config()
436 .network_version(block.header.epoch);
437 if !is_valid_for_sending(network_version, &actor) {
438 anyhow::bail!("not valid for sending!");
439 }
440 actor.sequence
441 }
442 };
443
444 if sequence != msg.sequence {
446 anyhow::bail!(
447 "Message has incorrect sequence (exp: {} got: {})",
448 sequence,
449 msg.sequence
450 );
451 }
452 account_sequences.insert(msg.from(), sequence + 1);
453 Ok(())
454 };
455
456 let mut account_sequences: HashMap<Address, u64> = HashMap::default();
457 let ExecutedTipset { state_root, .. } = state_manager
458 .load_executed_tipset(&base_tipset)
459 .await
460 .map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e:#}")))?;
461 let tree =
462 StateTree::new_from_root(state_manager.blockstore_owned(), &state_root).map_err(|e| {
463 TipsetSyncerError::Calculation(format!(
464 "Could not load from new state root in state manager: {e:#}"
465 ))
466 })?;
467
468 for (i, msg) in block.bls_msgs().iter().enumerate() {
470 check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
471 TipsetSyncerError::Validation(format!(
472 "Block had invalid BLS message at index {i}: {e:#}"
473 ))
474 })?;
475 }
476
477 for (i, msg) in block.secp_msgs().iter().enumerate() {
479 if msg.signature().signature_type() == SignatureType::Delegated
480 && !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
481 {
482 return Err(TipsetSyncerError::Validation(
483 "Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
484 ));
485 }
486 check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
487 TipsetSyncerError::Validation(format!(
488 "block had an invalid secp message at index {i}: {e:#}"
489 ))
490 })?;
491 let key_addr = state_manager
493 .resolve_to_key_addr(&msg.from(), &base_tipset)
494 .await
495 .map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
496 msg.signature
498 .authenticate_msg(eth_chain_id, msg, &key_addr)
499 .map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
500 }
501
502 let msg_root = TipsetValidator::compute_msg_root(
504 state_manager.blockstore(),
505 block.bls_msgs(),
506 block.secp_msgs(),
507 )
508 .map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
509 if block.header().messages != msg_root {
510 return Err(TipsetSyncerError::BlockMessageRootInvalid(
511 format!("{:?}", block.header().messages),
512 format!("{msg_root:?}"),
513 ));
514 }
515
516 Ok(())
517}
518
519fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
523 if header.signature.is_none() {
524 return Err(TipsetSyncerError::BlockWithoutSignature);
525 }
526 if header.bls_aggregate.is_none() {
527 return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
528 }
529 Ok(())
530}
531
532fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
534 let time_now = chrono::Utc::now().timestamp() as u64;
535 if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
536 return Err(TipsetSyncerError::TimeTravellingBlock(
537 time_now,
538 header.timestamp,
539 ));
540 } else if header.timestamp > time_now {
541 warn!(
542 "Got block from the future, but within clock drift threshold, {} > {}",
543 header.timestamp, time_now
544 );
545 }
546 Ok(())
547}