1use std::sync::Arc;
5
6use crate::chain_sync::BadBlockCache;
7use crate::networks::Height;
8use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
9use crate::shim::crypto::SignatureType;
10use crate::shim::{
11 address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
12 gas::price_list_by_network_version, message::Message, state_tree::StateTree,
13};
14use crate::state_manager::ExecutedTipset;
15use crate::state_manager::{Error as StateManagerError, StateManager, utils::is_valid_for_sending};
16use crate::utils::ShallowClone as _;
17use crate::{
18 blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
19 fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
20};
21use crate::{
22 chain::{ChainStore, Error as ChainStoreError},
23 metrics::HistogramTimerExt,
24};
25use crate::{
26 eth::is_valid_eth_tx_for_sending,
27 message::{MessageRead as _, valid_for_block_inclusion},
28};
29use ahash::HashMap;
30use cid::Cid;
31use futures::TryFutureExt;
32use fvm_ipld_blockstore::Blockstore;
33use fvm_ipld_encoding::to_vec;
34use itertools::Itertools;
35use nunny::Vec as NonEmpty;
36use thiserror::Error;
37use tokio::task::JoinSet;
38use tracing::{trace, warn};
39
40use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
41
42#[derive(Debug, Error)]
43pub enum TipsetSyncerError {
44 #[error("Block must have a signature")]
45 BlockWithoutSignature,
46 #[error("Block without BLS aggregate signature")]
47 BlockWithoutBlsAggregate,
48 #[error("Block received from the future: now = {0}, block = {1}")]
49 TimeTravellingBlock(u64, u64),
50 #[error("Validation error: {0}")]
51 Validation(String),
52 #[error("Processing error: {0}")]
53 Calculation(String),
54 #[error("Chain store error: {0}")]
55 ChainStore(#[from] ChainStoreError),
56 #[error("StateManager error: {0}")]
57 StateManager(#[from] StateManagerError),
58 #[error("Block error: {0}")]
59 BlockError(#[from] ForestBlockError),
60 #[error("Querying tipsets from the network failed: {0}")]
61 NetworkTipsetQueryFailed(String),
62 #[error("BLS aggregate signature {0} was invalid for msgs {1}")]
63 BlsAggregateSignatureInvalid(String, String),
64 #[error("Message signature invalid: {0}")]
65 MessageSignatureInvalid(String),
66 #[error("Block message root does not match: expected {0}, computed {1}")]
67 BlockMessageRootInvalid(String, String),
68 #[error("Computing message root failed: {0}")]
69 ComputingMessageRoot(String),
70 #[error("Resolving address from message failed: {0}")]
71 ResolvingAddressFromMessage(String),
72 #[error("Loading tipset parent from the store failed: {0}")]
73 TipsetParentNotFound(ChainStoreError),
74 #[error("Consensus error: {0}")]
75 ConsensusError(FilecoinConsensusError),
76}
77
78impl From<tokio::task::JoinError> for TipsetSyncerError {
79 fn from(err: tokio::task::JoinError) -> Self {
80 TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
81 }
82}
83
84impl TipsetSyncerError {
85 fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
88 let msg = errs.iter().map(|e| e.to_string()).collect_vec().join(", ");
89
90 TipsetSyncerError::Validation(msg)
91 }
92}
93
94pub async fn validate_tipset<DB: Blockstore + Send + Sync + 'static>(
99 state_manager: &Arc<StateManager<DB>>,
100 full_tipset: FullTipset,
101 bad_block_cache: Option<Arc<BadBlockCache>>,
102) -> Result<(), TipsetSyncerError> {
103 if full_tipset
104 .key()
105 .eq(state_manager.chain_store().genesis_tipset().key())
106 {
107 trace!("Skipping genesis tipset validation");
108 return Ok(());
109 }
110
111 let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
112
113 let epoch = full_tipset.epoch();
114 let parent_state = *full_tipset.parent_state();
115 let tipset_key = full_tipset.key();
116 trace!("Tipset keys: {tipset_key}");
117 let blocks = full_tipset.into_blocks();
118 let mut validations = JoinSet::new();
119 for b in blocks {
120 validations.spawn(validate_block(state_manager.clone(), Arc::new(b)));
121 }
122
123 while let Some(result) = validations.join_next().await {
124 match result? {
125 Ok(block) => {
126 state_manager
127 .chain_store()
128 .add_to_tipset_tracker(block.header());
129 }
130 Err((cid, why)) => {
131 warn!(
132 "Validating block [CID = {cid}, PARENT_STATE = {parent_state}] in EPOCH = {epoch} failed: {why}",
133 );
134 match &why {
135 TipsetSyncerError::TimeTravellingBlock(_, _) => {
136 }
139 _ => {
140 if StateTree::new_from_root(state_manager.blockstore_owned(), &parent_state)
142 .is_ok()
143 && let Some(bad_block_cache) = bad_block_cache
144 {
145 bad_block_cache.push(cid);
146 }
147 }
148 };
149 return Err(why);
150 }
151 }
152 }
153 drop(timer);
154 Ok(())
155}
156
157async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
177 state_manager: Arc<StateManager<DB>>,
178 block: Arc<Block>,
179) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
180 let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
181 trace!(
182 "Validating block: epoch = {}, weight = {}, key = {}",
183 block.header().epoch,
184 block.header().weight,
185 block.header().cid(),
186 );
187 let chain_store = state_manager.chain_store().clone();
188 let block_cid = block.cid();
189
190 let is_validated = chain_store.is_block_validated(block_cid);
192 if is_validated {
193 return Ok(block);
194 }
195
196 let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
197
198 let header = block.header();
199
200 block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
202 block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
203
204 let base_tipset = chain_store
205 .chain_index()
206 .load_required_tipset(&header.parents)
207 .map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
213
214 let lookback_state = ChainStore::get_lookback_tipset_for_round(
216 state_manager.chain_store().chain_index(),
217 state_manager.chain_config(),
218 &base_tipset,
219 block.header().epoch,
220 )
221 .map_err(|e| (*block_cid, e.into()))
222 .map(|(_, s)| Arc::new(s))?;
223
224 let work_addr = state_manager
227 .get_miner_work_addr(*lookback_state, &header.miner_address)
228 .map_err(|e| (*block_cid, e.into()))?;
229
230 let mut validations = JoinSet::new();
232
233 validations.spawn(check_block_messages(
235 state_manager.shallow_clone(),
236 block.shallow_clone(),
237 base_tipset.shallow_clone(),
238 ));
239
240 validations.spawn_blocking({
242 let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
243 let firehorse_height = state_manager.chain_config().epoch(Height::FireHorse);
244 let base_tipset = base_tipset.shallow_clone();
245 let block_store = state_manager.blockstore_owned();
246 let block = block.shallow_clone();
247 move || {
248 let base_fee = crate::chain::compute_base_fee(
249 &block_store,
250 &base_tipset,
251 smoke_height,
252 firehorse_height,
253 )
254 .map_err(|e| {
255 TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
256 })?;
257 let parent_base_fee = &block.header.parent_base_fee;
258 if &base_fee != parent_base_fee {
259 return Err(TipsetSyncerError::Validation(format!(
260 "base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
261 )));
262 }
263 Ok(())
264 }
265 });
266
267 validations.spawn_blocking({
269 let block_store = state_manager.blockstore_owned();
270 let base_tipset = base_tipset.shallow_clone();
271 let weight = header.weight.clone();
272 move || {
273 let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
274 TipsetSyncerError::Calculation(format!("Error calculating weight: {e:#}"))
275 })?;
276 if weight != calc_weight {
277 return Err(TipsetSyncerError::Validation(format!(
278 "Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
279 )));
280 }
281 Ok(())
282 }
283 });
284
285 validations.spawn({
287 let state_manager = state_manager.clone();
288 let block = block.clone();
289 async move {
290 let header = block.header();
291 let ExecutedTipset {
292 state_root,
293 receipt_root,
294 ..
295 } = state_manager
296 .load_executed_tipset(&base_tipset)
297 .await
298 .map_err(|e| {
299 TipsetSyncerError::Calculation(format!("Failed to calculate state: {e:#}"))
300 })?;
301
302 if state_root != header.state_root {
303 return Err(TipsetSyncerError::Validation(format!(
304 "Parent state root did not match computed state: {} (header), {} (computed)",
305 header.state_root, state_root,
306 )));
307 }
308
309 if receipt_root != header.message_receipts {
310 return Err(TipsetSyncerError::Validation(format!(
311 "Parent receipt root did not match computed root: {} (header), {} (computed)",
312 header.message_receipts, receipt_root
313 )));
314 }
315 Ok(())
316 }
317 });
318
319 validations.spawn_blocking({
321 let block = block.clone();
322 move || {
323 block.header().verify_signature_against(&work_addr)?;
324 Ok(())
325 }
326 });
327
328 validations.spawn({
329 let block = block.clone();
330 async move {
331 consensus
332 .validate_block(state_manager, block)
333 .map_err(|errs| {
334 TipsetSyncerError::concat(
341 errs.into_iter_ne()
342 .map(TipsetSyncerError::ConsensusError)
343 .collect_vec(),
344 )
345 })
346 .await
347 }
348 });
349
350 if let Err(errs) = collect_errs(validations).await {
352 return Err((*block_cid, TipsetSyncerError::concat(errs)));
353 }
354
355 chain_store.mark_block_as_validated(block_cid);
356
357 Ok(block)
358}
359
360async fn check_block_messages<DB: Blockstore + Send + Sync + 'static>(
371 state_manager: Arc<StateManager<DB>>,
372 block: Arc<Block>,
373 base_tipset: Tipset,
374) -> Result<(), TipsetSyncerError> {
375 let network_version = state_manager
376 .chain_config()
377 .network_version(block.header.epoch);
378 let eth_chain_id = state_manager.chain_config().eth_chain_id;
379
380 if let Some(sig) = &block.header().bls_aggregate {
381 let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
384 let mut cids = Vec::with_capacity(block.bls_msgs().len());
385 let db = state_manager.blockstore();
386 for m in block.bls_msgs() {
387 let pk = StateManager::get_bls_public_key(db, &m.from, *base_tipset.parent_state())?;
388 pub_keys.push(pk);
389 cids.push(m.cid().to_bytes());
390 }
391
392 if !verify_bls_aggregate(
393 &cids.iter().map(|x| x.as_slice()).collect_vec(),
394 &pub_keys,
395 sig,
396 ) {
397 return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
398 format!("{sig:?}"),
399 format!("{cids:?}"),
400 ));
401 }
402 } else {
403 return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
404 }
405
406 let price_list = price_list_by_network_version(network_version);
407 let mut sum_gas_limit = 0;
408
409 let mut check_msg = |msg: &Message,
411 account_sequences: &mut HashMap<Address, u64>,
412 tree: &StateTree<DB>|
413 -> anyhow::Result<()> {
414 let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
416 valid_for_block_inclusion(msg, min_gas.total(), network_version)
417 .map_err(|e| anyhow::anyhow!("{}", e))?;
418 sum_gas_limit += msg.gas_limit;
419 if sum_gas_limit > BLOCK_GAS_LIMIT {
420 anyhow::bail!("block gas limit exceeded");
421 }
422
423 let sequence: u64 = match account_sequences.get(&msg.from()) {
426 Some(sequence) => *sequence,
427 None => {
428 let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
429 anyhow::anyhow!(
430 "Failed to retrieve nonce for addr: Actor does not exist in state"
431 )
432 })?;
433 let network_version = state_manager
434 .chain_config()
435 .network_version(block.header.epoch);
436 if !is_valid_for_sending(network_version, &actor) {
437 anyhow::bail!("not valid for sending!");
438 }
439 actor.sequence
440 }
441 };
442
443 if sequence != msg.sequence {
445 anyhow::bail!(
446 "Message has incorrect sequence (exp: {} got: {})",
447 sequence,
448 msg.sequence
449 );
450 }
451 account_sequences.insert(msg.from(), sequence + 1);
452 Ok(())
453 };
454
455 let mut account_sequences: HashMap<Address, u64> = HashMap::default();
456 let ExecutedTipset { state_root, .. } = state_manager
457 .load_executed_tipset(&base_tipset)
458 .await
459 .map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e:#}")))?;
460 let tree =
461 StateTree::new_from_root(state_manager.blockstore_owned(), &state_root).map_err(|e| {
462 TipsetSyncerError::Calculation(format!(
463 "Could not load from new state root in state manager: {e:#}"
464 ))
465 })?;
466
467 for (i, msg) in block.bls_msgs().iter().enumerate() {
469 check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
470 TipsetSyncerError::Validation(format!(
471 "Block had invalid BLS message at index {i}: {e:#}"
472 ))
473 })?;
474 }
475
476 for (i, msg) in block.secp_msgs().iter().enumerate() {
478 if msg.signature().signature_type() == SignatureType::Delegated
479 && !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
480 {
481 return Err(TipsetSyncerError::Validation(
482 "Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
483 ));
484 }
485 check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
486 TipsetSyncerError::Validation(format!(
487 "block had an invalid secp message at index {i}: {e:#}"
488 ))
489 })?;
490 let key_addr = state_manager
492 .resolve_to_key_addr(&msg.from(), &base_tipset)
493 .await
494 .map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
495 msg.signature
497 .authenticate_msg(eth_chain_id, msg, &key_addr)
498 .map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
499 }
500
501 let msg_root = TipsetValidator::compute_msg_root(
503 state_manager.blockstore(),
504 block.bls_msgs(),
505 block.secp_msgs(),
506 )
507 .map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
508 if block.header().messages != msg_root {
509 return Err(TipsetSyncerError::BlockMessageRootInvalid(
510 format!("{:?}", block.header().messages),
511 format!("{msg_root:?}"),
512 ));
513 }
514
515 Ok(())
516}
517
518fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
522 if header.signature.is_none() {
523 return Err(TipsetSyncerError::BlockWithoutSignature);
524 }
525 if header.bls_aggregate.is_none() {
526 return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
527 }
528 Ok(())
529}
530
531fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
533 let time_now = chrono::Utc::now().timestamp() as u64;
534 if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
535 return Err(TipsetSyncerError::TimeTravellingBlock(
536 time_now,
537 header.timestamp,
538 ));
539 } else if header.timestamp > time_now {
540 warn!(
541 "Got block from the future, but within clock drift threshold, {} > {}",
542 header.timestamp, time_now
543 );
544 }
545 Ok(())
546}