1use std::sync::Arc;
5
6use crate::chain_sync::BadBlockCache;
7use crate::networks::Height;
8use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
9use crate::shim::crypto::SignatureType;
10use crate::shim::{
11 address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
12 gas::price_list_by_network_version, message::Message, state_tree::StateTree,
13};
14use crate::state_manager::{Error as StateManagerError, StateManager, is_valid_for_sending};
15use crate::{
16 blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
17 fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
18};
19use crate::{
20 chain::{ChainStore, Error as ChainStoreError},
21 metrics::HistogramTimerExt,
22};
23use crate::{
24 eth::is_valid_eth_tx_for_sending,
25 message::{Message as MessageTrait, valid_for_block_inclusion},
26};
27use ahash::HashMap;
28use cid::Cid;
29use futures::TryFutureExt;
30use fvm_ipld_blockstore::Blockstore;
31use fvm_ipld_encoding::to_vec;
32use itertools::Itertools;
33use nunny::Vec as NonEmpty;
34use thiserror::Error;
35use tokio::task::JoinSet;
36use tracing::{error, trace, warn};
37
38use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
39
40#[derive(Debug, Error)]
41pub enum TipsetSyncerError {
42 #[error("Block must have a signature")]
43 BlockWithoutSignature,
44 #[error("Block without BLS aggregate signature")]
45 BlockWithoutBlsAggregate,
46 #[error("Block received from the future: now = {0}, block = {1}")]
47 TimeTravellingBlock(u64, u64),
48 #[error("Validation error: {0}")]
49 Validation(String),
50 #[error("Processing error: {0}")]
51 Calculation(String),
52 #[error("Chain store error: {0}")]
53 ChainStore(#[from] ChainStoreError),
54 #[error("StateManager error: {0}")]
55 StateManager(#[from] StateManagerError),
56 #[error("Block error: {0}")]
57 BlockError(#[from] ForestBlockError),
58 #[error("Querying tipsets from the network failed: {0}")]
59 NetworkTipsetQueryFailed(String),
60 #[error("BLS aggregate signature {0} was invalid for msgs {1}")]
61 BlsAggregateSignatureInvalid(String, String),
62 #[error("Message signature invalid: {0}")]
63 MessageSignatureInvalid(String),
64 #[error("Block message root does not match: expected {0}, computed {1}")]
65 BlockMessageRootInvalid(String, String),
66 #[error("Computing message root failed: {0}")]
67 ComputingMessageRoot(String),
68 #[error("Resolving address from message failed: {0}")]
69 ResolvingAddressFromMessage(String),
70 #[error("Loading tipset parent from the store failed: {0}")]
71 TipsetParentNotFound(ChainStoreError),
72 #[error("Consensus error: {0}")]
73 ConsensusError(FilecoinConsensusError),
74}
75
76impl From<tokio::task::JoinError> for TipsetSyncerError {
77 fn from(err: tokio::task::JoinError) -> Self {
78 TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
79 }
80}
81
82impl TipsetSyncerError {
83 fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
86 let msg = errs
87 .iter()
88 .map(|e| e.to_string())
89 .collect::<Vec<_>>()
90 .join(", ");
91
92 TipsetSyncerError::Validation(msg)
93 }
94}
95
96pub async fn validate_tipset<DB: Blockstore + Send + Sync + 'static>(
101 state_manager: Arc<StateManager<DB>>,
102 chainstore: &ChainStore<DB>,
103 full_tipset: FullTipset,
104 genesis: &Tipset,
105 bad_block_cache: Option<Arc<BadBlockCache>>,
106) -> Result<(), TipsetSyncerError> {
107 if full_tipset.key().eq(genesis.key()) {
108 trace!("Skipping genesis tipset validation");
109 return Ok(());
110 }
111
112 let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
113
114 let epoch = full_tipset.epoch();
115 let full_tipset_key = full_tipset.key().clone();
116 trace!("Tipset keys: {full_tipset_key}");
117 let blocks = full_tipset.into_blocks();
118 let mut validations = JoinSet::new();
119 for b in blocks {
120 validations.spawn(validate_block(state_manager.clone(), Arc::new(b)));
121 }
122
123 while let Some(result) = validations.join_next().await {
124 match result? {
125 Ok(block) => {
126 chainstore.add_to_tipset_tracker(block.header());
127 }
128 Err((cid, why)) => {
129 warn!("Validating block [CID = {cid}] in EPOCH = {epoch} failed: {why}");
130 match &why {
131 TipsetSyncerError::TimeTravellingBlock(_, _) => {
132 }
135 _ => {
136 if let Some(bad_block_cache) = bad_block_cache {
137 bad_block_cache.push(cid);
138 }
139 }
140 };
141 return Err(why);
142 }
143 }
144 }
145 drop(timer);
146 Ok(())
147}
148
149async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
169 state_manager: Arc<StateManager<DB>>,
170 block: Arc<Block>,
171) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
172 let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
173 trace!(
174 "Validating block: epoch = {}, weight = {}, key = {}",
175 block.header().epoch,
176 block.header().weight,
177 block.header().cid(),
178 );
179 let chain_store = state_manager.chain_store().clone();
180 let block_cid = block.cid();
181
182 let is_validated = chain_store.is_block_validated(block_cid);
184 if is_validated {
185 return Ok(block);
186 }
187
188 let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
189
190 let header = block.header();
191
192 block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
194 block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
195
196 let base_tipset = chain_store
197 .chain_index()
198 .load_required_tipset(&header.parents)
199 .map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
205
206 let lookback_state = ChainStore::get_lookback_tipset_for_round(
208 state_manager.chain_store().chain_index().clone(),
209 state_manager.chain_config().clone(),
210 base_tipset.clone(),
211 block.header().epoch,
212 )
213 .map_err(|e| (*block_cid, e.into()))
214 .map(|(_, s)| Arc::new(s))?;
215
216 let work_addr = state_manager
219 .get_miner_work_addr(*lookback_state, &header.miner_address)
220 .map_err(|e| (*block_cid, e.into()))?;
221
222 let mut validations = JoinSet::new();
224
225 validations.spawn(check_block_messages(
227 Arc::clone(&state_manager),
228 Arc::clone(&block),
229 Arc::clone(&base_tipset),
230 ));
231
232 let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
234 let v_base_tipset = Arc::clone(&base_tipset);
235 let v_block_store = state_manager.blockstore_owned();
236 let v_block = Arc::clone(&block);
237 validations.spawn_blocking(move || {
238 let base_fee = crate::chain::compute_base_fee(&v_block_store, &v_base_tipset, smoke_height)
239 .map_err(|e| {
240 TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
241 })?;
242 let parent_base_fee = &v_block.header.parent_base_fee;
243 if &base_fee != parent_base_fee {
244 return Err(TipsetSyncerError::Validation(format!(
245 "base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
246 )));
247 }
248 Ok(())
249 });
250
251 let v_block_store = state_manager.blockstore_owned();
253 let v_base_tipset = Arc::clone(&base_tipset);
254 let weight = header.weight.clone();
255 validations.spawn_blocking(move || {
256 let calc_weight = fil_cns::weight(&v_block_store, &v_base_tipset).map_err(|e| {
257 TipsetSyncerError::Calculation(format!("Error calculating weight: {e}"))
258 })?;
259 if weight != calc_weight {
260 return Err(TipsetSyncerError::Validation(format!(
261 "Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
262 )));
263 }
264 Ok(())
265 });
266
267 let v_state_manager = Arc::clone(&state_manager);
269 let v_base_tipset = Arc::clone(&base_tipset);
270 let v_block = Arc::clone(&block);
271 validations.spawn(async move {
272 let header = v_block.header();
273 let (state_root, receipt_root) = v_state_manager
274 .tipset_state(&v_base_tipset)
275 .await
276 .map_err(|e| {
277 TipsetSyncerError::Calculation(format!("Failed to calculate state: {e}"))
278 })?;
279
280 if state_root != header.state_root {
281 return Err(TipsetSyncerError::Validation(format!(
282 "Parent state root did not match computed state: {} (header), {} (computed)",
283 header.state_root, state_root,
284 )));
285 }
286
287 if receipt_root != header.message_receipts {
288 return Err(TipsetSyncerError::Validation(format!(
289 "Parent receipt root did not match computed root: {} (header), {} (computed)",
290 header.message_receipts, receipt_root
291 )));
292 }
293 Ok(())
294 });
295
296 let v_block = block.clone();
298 validations.spawn_blocking(move || {
299 v_block.header().verify_signature_against(&work_addr)?;
300 Ok(())
301 });
302
303 let v_block = block.clone();
304 validations.spawn(async move {
305 consensus
306 .validate_block(state_manager, v_block)
307 .map_err(|errs| {
308 TipsetSyncerError::concat(
315 errs.into_iter_ne()
316 .map(TipsetSyncerError::ConsensusError)
317 .collect_vec(),
318 )
319 })
320 .await
321 });
322
323 if let Err(errs) = collect_errs(validations).await {
325 return Err((*block_cid, TipsetSyncerError::concat(errs)));
326 }
327
328 chain_store.mark_block_as_validated(block_cid);
329
330 Ok(block)
331}
332
333async fn check_block_messages<DB: Blockstore + Send + Sync + 'static>(
344 state_manager: Arc<StateManager<DB>>,
345 block: Arc<Block>,
346 base_tipset: Arc<Tipset>,
347) -> Result<(), TipsetSyncerError> {
348 let network_version = state_manager
349 .chain_config()
350 .network_version(block.header.epoch);
351 let eth_chain_id = state_manager.chain_config().eth_chain_id;
352
353 if let Some(sig) = &block.header().bls_aggregate {
354 let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
357 let mut cids = Vec::with_capacity(block.bls_msgs().len());
358 let db = state_manager.blockstore_owned();
359 for m in block.bls_msgs() {
360 let pk = StateManager::get_bls_public_key(&db, &m.from, *base_tipset.parent_state())?;
361 pub_keys.push(pk);
362 cids.push(m.cid().to_bytes());
363 }
364
365 if !verify_bls_aggregate(
366 &cids.iter().map(|x| x.as_slice()).collect_vec(),
367 &pub_keys,
368 sig,
369 ) {
370 return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
371 format!("{sig:?}"),
372 format!("{cids:?}"),
373 ));
374 }
375 } else {
376 return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
377 }
378
379 let price_list = price_list_by_network_version(network_version);
380 let mut sum_gas_limit = 0;
381
382 let mut check_msg = |msg: &Message,
384 account_sequences: &mut HashMap<Address, u64>,
385 tree: &StateTree<DB>|
386 -> Result<(), anyhow::Error> {
387 let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
389 valid_for_block_inclusion(msg, min_gas.total(), network_version)
390 .map_err(|e| anyhow::anyhow!("{}", e))?;
391 sum_gas_limit += msg.gas_limit;
392 if sum_gas_limit > BLOCK_GAS_LIMIT {
393 anyhow::bail!("block gas limit exceeded");
394 }
395
396 let sequence: u64 = match account_sequences.get(&msg.from()) {
399 Some(sequence) => *sequence,
400 None => {
401 let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
402 anyhow::anyhow!(
403 "Failed to retrieve nonce for addr: Actor does not exist in state"
404 )
405 })?;
406 let network_version = state_manager
407 .chain_config()
408 .network_version(block.header.epoch);
409 if !is_valid_for_sending(network_version, &actor) {
410 anyhow::bail!("not valid for sending!");
411 }
412 actor.sequence
413 }
414 };
415
416 if sequence != msg.sequence {
418 anyhow::bail!(
419 "Message has incorrect sequence (exp: {} got: {})",
420 sequence,
421 msg.sequence
422 );
423 }
424 account_sequences.insert(msg.from(), sequence + 1);
425 Ok(())
426 };
427
428 let mut account_sequences: HashMap<Address, u64> = HashMap::default();
429 let (state_root, _) = state_manager
430 .tipset_state(&base_tipset)
431 .await
432 .map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e}")))?;
433 let tree =
434 StateTree::new_from_root(state_manager.blockstore_owned(), &state_root).map_err(|e| {
435 TipsetSyncerError::Calculation(format!(
436 "Could not load from new state root in state manager: {e}"
437 ))
438 })?;
439
440 for (i, msg) in block.bls_msgs().iter().enumerate() {
442 check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
443 TipsetSyncerError::Validation(format!(
444 "Block had invalid BLS message at index {i}: {e}"
445 ))
446 })?;
447 }
448
449 for (i, msg) in block.secp_msgs().iter().enumerate() {
451 if msg.signature().signature_type() == SignatureType::Delegated
452 && !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
453 {
454 return Err(TipsetSyncerError::Validation(
455 "Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
456 ));
457 }
458 check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
459 TipsetSyncerError::Validation(format!(
460 "block had an invalid secp message at index {i}: {e}"
461 ))
462 })?;
463 let key_addr = state_manager
465 .resolve_to_key_addr(&msg.from(), &base_tipset)
466 .await
467 .map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
468 msg.signature
470 .authenticate_msg(eth_chain_id, msg, &key_addr)
471 .map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
472 }
473
474 let msg_root = TipsetValidator::compute_msg_root(
476 state_manager.blockstore(),
477 block.bls_msgs(),
478 block.secp_msgs(),
479 )
480 .map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
481 if block.header().messages != msg_root {
482 return Err(TipsetSyncerError::BlockMessageRootInvalid(
483 format!("{:?}", block.header().messages),
484 format!("{msg_root:?}"),
485 ));
486 }
487
488 Ok(())
489}
490
491fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
495 if header.signature.is_none() {
496 return Err(TipsetSyncerError::BlockWithoutSignature);
497 }
498 if header.bls_aggregate.is_none() {
499 return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
500 }
501 Ok(())
502}
503
504fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
506 let time_now = chrono::Utc::now().timestamp() as u64;
507 if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
508 return Err(TipsetSyncerError::TimeTravellingBlock(
509 time_now,
510 header.timestamp,
511 ));
512 } else if header.timestamp > time_now {
513 warn!(
514 "Got block from the future, but within clock drift threshold, {} > {}",
515 header.timestamp, time_now
516 );
517 }
518 Ok(())
519}