1use std::sync::Arc;
5
6use crate::chain_sync::BadBlockCache;
7use crate::networks::Height;
8use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
9use crate::shim::crypto::SignatureType;
10use crate::shim::{
11 address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
12 gas::price_list_by_network_version, message::Message, state_tree::StateTree,
13};
14use crate::state_manager::{Error as StateManagerError, StateManager, is_valid_for_sending};
15use crate::{
16 blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
17 fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
18};
19use crate::{
20 chain::{ChainStore, Error as ChainStoreError},
21 metrics::HistogramTimerExt,
22};
23use crate::{
24 eth::is_valid_eth_tx_for_sending,
25 message::{Message as MessageTrait, valid_for_block_inclusion},
26};
27use ahash::HashMap;
28use cid::Cid;
29use futures::TryFutureExt;
30use fvm_ipld_blockstore::Blockstore;
31use fvm_ipld_encoding::to_vec;
32use itertools::Itertools;
33use nunny::Vec as NonEmpty;
34use thiserror::Error;
35use tokio::task::JoinSet;
36use tracing::{error, trace, warn};
37
38use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
39
40#[derive(Debug, Error)]
41pub enum TipsetSyncerError {
42 #[error("Block must have a signature")]
43 BlockWithoutSignature,
44 #[error("Block without BLS aggregate signature")]
45 BlockWithoutBlsAggregate,
46 #[error("Block received from the future: now = {0}, block = {1}")]
47 TimeTravellingBlock(u64, u64),
48 #[error("Validation error: {0}")]
49 Validation(String),
50 #[error("Processing error: {0}")]
51 Calculation(String),
52 #[error("Chain store error: {0}")]
53 ChainStore(#[from] ChainStoreError),
54 #[error("StateManager error: {0}")]
55 StateManager(#[from] StateManagerError),
56 #[error("Block error: {0}")]
57 BlockError(#[from] ForestBlockError),
58 #[error("Querying tipsets from the network failed: {0}")]
59 NetworkTipsetQueryFailed(String),
60 #[error("BLS aggregate signature {0} was invalid for msgs {1}")]
61 BlsAggregateSignatureInvalid(String, String),
62 #[error("Message signature invalid: {0}")]
63 MessageSignatureInvalid(String),
64 #[error("Block message root does not match: expected {0}, computed {1}")]
65 BlockMessageRootInvalid(String, String),
66 #[error("Computing message root failed: {0}")]
67 ComputingMessageRoot(String),
68 #[error("Resolving address from message failed: {0}")]
69 ResolvingAddressFromMessage(String),
70 #[error("Loading tipset parent from the store failed: {0}")]
71 TipsetParentNotFound(ChainStoreError),
72 #[error("Consensus error: {0}")]
73 ConsensusError(FilecoinConsensusError),
74}
75
76impl From<tokio::task::JoinError> for TipsetSyncerError {
77 fn from(err: tokio::task::JoinError) -> Self {
78 TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
79 }
80}
81
82impl TipsetSyncerError {
83 fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
86 let msg = errs
87 .iter()
88 .map(|e| e.to_string())
89 .collect::<Vec<_>>()
90 .join(", ");
91
92 TipsetSyncerError::Validation(msg)
93 }
94}
95
96pub async fn validate_tipset<DB: Blockstore + Send + Sync + 'static>(
101 state_manager: &Arc<StateManager<DB>>,
102 chainstore: &ChainStore<DB>,
103 full_tipset: FullTipset,
104 genesis: &Tipset,
105 bad_block_cache: Option<Arc<BadBlockCache>>,
106) -> Result<(), TipsetSyncerError> {
107 if full_tipset.key().eq(genesis.key()) {
108 trace!("Skipping genesis tipset validation");
109 return Ok(());
110 }
111
112 let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
113
114 let epoch = full_tipset.epoch();
115 let full_tipset_key = full_tipset.key().clone();
116 trace!("Tipset keys: {full_tipset_key}");
117 let blocks = full_tipset.into_blocks();
118 let mut validations = JoinSet::new();
119 for b in blocks {
120 validations.spawn(validate_block(state_manager.clone(), Arc::new(b)));
121 }
122
123 while let Some(result) = validations.join_next().await {
124 match result? {
125 Ok(block) => {
126 chainstore.add_to_tipset_tracker(block.header());
127 }
128 Err((cid, why)) => {
129 warn!("Validating block [CID = {cid}] in EPOCH = {epoch} failed: {why}");
130 match &why {
131 TipsetSyncerError::TimeTravellingBlock(_, _) => {
132 }
135 _ => {
136 if let Some(bad_block_cache) = bad_block_cache {
137 bad_block_cache.push(cid);
138 }
139 }
140 };
141 return Err(why);
142 }
143 }
144 }
145 drop(timer);
146 Ok(())
147}
148
149async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
169 state_manager: Arc<StateManager<DB>>,
170 block: Arc<Block>,
171) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
172 let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
173 trace!(
174 "Validating block: epoch = {}, weight = {}, key = {}",
175 block.header().epoch,
176 block.header().weight,
177 block.header().cid(),
178 );
179 let chain_store = state_manager.chain_store().clone();
180 let block_cid = block.cid();
181
182 let is_validated = chain_store.is_block_validated(block_cid);
184 if is_validated {
185 return Ok(block);
186 }
187
188 let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
189
190 let header = block.header();
191
192 block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
194 block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
195
196 let base_tipset = chain_store
197 .chain_index()
198 .load_required_tipset(&header.parents)
199 .map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
205
206 let lookback_state = ChainStore::get_lookback_tipset_for_round(
208 state_manager.chain_store().chain_index(),
209 state_manager.chain_config(),
210 &base_tipset,
211 block.header().epoch,
212 )
213 .map_err(|e| (*block_cid, e.into()))
214 .map(|(_, s)| Arc::new(s))?;
215
216 let work_addr = state_manager
219 .get_miner_work_addr(*lookback_state, &header.miner_address)
220 .map_err(|e| (*block_cid, e.into()))?;
221
222 let mut validations = JoinSet::new();
224
225 validations.spawn(check_block_messages(
227 state_manager.clone(),
228 block.clone(),
229 base_tipset.clone(),
230 ));
231
232 validations.spawn_blocking({
234 let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
235 let base_tipset = base_tipset.clone();
236 let block_store = state_manager.blockstore_owned();
237 let block = Arc::clone(&block);
238 move || {
239 let base_fee = crate::chain::compute_base_fee(&block_store, &base_tipset, smoke_height)
240 .map_err(|e| {
241 TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
242 })?;
243 let parent_base_fee = &block.header.parent_base_fee;
244 if &base_fee != parent_base_fee {
245 return Err(TipsetSyncerError::Validation(format!(
246 "base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
247 )));
248 }
249 Ok(())
250 }
251 });
252
253 validations.spawn_blocking({
255 let block_store = state_manager.blockstore_owned();
256 let base_tipset = base_tipset.clone();
257 let weight = header.weight.clone();
258 move || {
259 let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
260 TipsetSyncerError::Calculation(format!("Error calculating weight: {e}"))
261 })?;
262 if weight != calc_weight {
263 return Err(TipsetSyncerError::Validation(format!(
264 "Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
265 )));
266 }
267 Ok(())
268 }
269 });
270
271 validations.spawn({
273 let state_manager = state_manager.clone();
274 let block = block.clone();
275 async move {
276 let header = block.header();
277 let (state_root, receipt_root) = state_manager
278 .tipset_state(&base_tipset)
279 .await
280 .map_err(|e| {
281 TipsetSyncerError::Calculation(format!("Failed to calculate state: {e}"))
282 })?;
283
284 if state_root != header.state_root {
285 return Err(TipsetSyncerError::Validation(format!(
286 "Parent state root did not match computed state: {} (header), {} (computed)",
287 header.state_root, state_root,
288 )));
289 }
290
291 if receipt_root != header.message_receipts {
292 return Err(TipsetSyncerError::Validation(format!(
293 "Parent receipt root did not match computed root: {} (header), {} (computed)",
294 header.message_receipts, receipt_root
295 )));
296 }
297 Ok(())
298 }
299 });
300
301 validations.spawn_blocking({
303 let block = block.clone();
304 move || {
305 block.header().verify_signature_against(&work_addr)?;
306 Ok(())
307 }
308 });
309
310 validations.spawn({
311 let block = block.clone();
312 async move {
313 consensus
314 .validate_block(state_manager, block)
315 .map_err(|errs| {
316 TipsetSyncerError::concat(
323 errs.into_iter_ne()
324 .map(TipsetSyncerError::ConsensusError)
325 .collect_vec(),
326 )
327 })
328 .await
329 }
330 });
331
332 if let Err(errs) = collect_errs(validations).await {
334 return Err((*block_cid, TipsetSyncerError::concat(errs)));
335 }
336
337 chain_store.mark_block_as_validated(block_cid);
338
339 Ok(block)
340}
341
342async fn check_block_messages<DB: Blockstore + Send + Sync + 'static>(
353 state_manager: Arc<StateManager<DB>>,
354 block: Arc<Block>,
355 base_tipset: Tipset,
356) -> Result<(), TipsetSyncerError> {
357 let network_version = state_manager
358 .chain_config()
359 .network_version(block.header.epoch);
360 let eth_chain_id = state_manager.chain_config().eth_chain_id;
361
362 if let Some(sig) = &block.header().bls_aggregate {
363 let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
366 let mut cids = Vec::with_capacity(block.bls_msgs().len());
367 let db = state_manager.blockstore();
368 for m in block.bls_msgs() {
369 let pk = StateManager::get_bls_public_key(db, &m.from, *base_tipset.parent_state())?;
370 pub_keys.push(pk);
371 cids.push(m.cid().to_bytes());
372 }
373
374 if !verify_bls_aggregate(
375 &cids.iter().map(|x| x.as_slice()).collect_vec(),
376 &pub_keys,
377 sig,
378 ) {
379 return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
380 format!("{sig:?}"),
381 format!("{cids:?}"),
382 ));
383 }
384 } else {
385 return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
386 }
387
388 let price_list = price_list_by_network_version(network_version);
389 let mut sum_gas_limit = 0;
390
391 let mut check_msg = |msg: &Message,
393 account_sequences: &mut HashMap<Address, u64>,
394 tree: &StateTree<DB>|
395 -> Result<(), anyhow::Error> {
396 let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
398 valid_for_block_inclusion(msg, min_gas.total(), network_version)
399 .map_err(|e| anyhow::anyhow!("{}", e))?;
400 sum_gas_limit += msg.gas_limit;
401 if sum_gas_limit > BLOCK_GAS_LIMIT {
402 anyhow::bail!("block gas limit exceeded");
403 }
404
405 let sequence: u64 = match account_sequences.get(&msg.from()) {
408 Some(sequence) => *sequence,
409 None => {
410 let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
411 anyhow::anyhow!(
412 "Failed to retrieve nonce for addr: Actor does not exist in state"
413 )
414 })?;
415 let network_version = state_manager
416 .chain_config()
417 .network_version(block.header.epoch);
418 if !is_valid_for_sending(network_version, &actor) {
419 anyhow::bail!("not valid for sending!");
420 }
421 actor.sequence
422 }
423 };
424
425 if sequence != msg.sequence {
427 anyhow::bail!(
428 "Message has incorrect sequence (exp: {} got: {})",
429 sequence,
430 msg.sequence
431 );
432 }
433 account_sequences.insert(msg.from(), sequence + 1);
434 Ok(())
435 };
436
437 let mut account_sequences: HashMap<Address, u64> = HashMap::default();
438 let (state_root, _) = state_manager
439 .tipset_state(&base_tipset)
440 .await
441 .map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e}")))?;
442 let tree =
443 StateTree::new_from_root(state_manager.blockstore_owned(), &state_root).map_err(|e| {
444 TipsetSyncerError::Calculation(format!(
445 "Could not load from new state root in state manager: {e}"
446 ))
447 })?;
448
449 for (i, msg) in block.bls_msgs().iter().enumerate() {
451 check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
452 TipsetSyncerError::Validation(format!(
453 "Block had invalid BLS message at index {i}: {e}"
454 ))
455 })?;
456 }
457
458 for (i, msg) in block.secp_msgs().iter().enumerate() {
460 if msg.signature().signature_type() == SignatureType::Delegated
461 && !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
462 {
463 return Err(TipsetSyncerError::Validation(
464 "Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
465 ));
466 }
467 check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
468 TipsetSyncerError::Validation(format!(
469 "block had an invalid secp message at index {i}: {e}"
470 ))
471 })?;
472 let key_addr = state_manager
474 .resolve_to_key_addr(&msg.from(), &base_tipset)
475 .await
476 .map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
477 msg.signature
479 .authenticate_msg(eth_chain_id, msg, &key_addr)
480 .map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
481 }
482
483 let msg_root = TipsetValidator::compute_msg_root(
485 state_manager.blockstore(),
486 block.bls_msgs(),
487 block.secp_msgs(),
488 )
489 .map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
490 if block.header().messages != msg_root {
491 return Err(TipsetSyncerError::BlockMessageRootInvalid(
492 format!("{:?}", block.header().messages),
493 format!("{msg_root:?}"),
494 ));
495 }
496
497 Ok(())
498}
499
500fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
504 if header.signature.is_none() {
505 return Err(TipsetSyncerError::BlockWithoutSignature);
506 }
507 if header.bls_aggregate.is_none() {
508 return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
509 }
510 Ok(())
511}
512
513fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
515 let time_now = chrono::Utc::now().timestamp() as u64;
516 if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
517 return Err(TipsetSyncerError::TimeTravellingBlock(
518 time_now,
519 header.timestamp,
520 ));
521 } else if header.timestamp > time_now {
522 warn!(
523 "Got block from the future, but within clock drift threshold, {} > {}",
524 header.timestamp, time_now
525 );
526 }
527 Ok(())
528}