1use std::sync::Arc;
5
6use crate::chain_sync::BadBlockCache;
7use crate::networks::Height;
8use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
9use crate::shim::crypto::SignatureType;
10use crate::shim::{
11 address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
12 gas::price_list_by_network_version, message::Message, state_tree::StateTree,
13};
14use crate::state_manager::ExecutedTipset;
15use crate::state_manager::{Error as StateManagerError, StateManager, utils::is_valid_for_sending};
16use crate::utils::ShallowClone as _;
17use crate::{
18 blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
19 fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
20};
21use crate::{
22 chain::{ChainStore, Error as ChainStoreError},
23 metrics::HistogramTimerExt,
24};
25use crate::{
26 eth::is_valid_eth_tx_for_sending,
27 message::{MessageRead as _, valid_for_block_inclusion},
28};
29use ahash::HashMap;
30use cid::Cid;
31use futures::TryFutureExt;
32use fvm_ipld_blockstore::Blockstore;
33use fvm_ipld_encoding::to_vec;
34use itertools::Itertools;
35use nunny::Vec as NonEmpty;
36use thiserror::Error;
37use tokio::task::JoinSet;
38use tracing::{trace, warn};
39
40use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
41
42#[derive(Debug, Error)]
43pub enum TipsetSyncerError {
44 #[error("Block must have a signature")]
45 BlockWithoutSignature,
46 #[error("Block without BLS aggregate signature")]
47 BlockWithoutBlsAggregate,
48 #[error("Block received from the future: now = {0}, block = {1}")]
49 TimeTravellingBlock(u64, u64),
50 #[error("Validation error: {0}")]
51 Validation(String),
52 #[error("Processing error: {0}")]
53 Calculation(String),
54 #[error("Chain store error: {0}")]
55 ChainStore(#[from] ChainStoreError),
56 #[error("StateManager error: {0}")]
57 StateManager(#[from] StateManagerError),
58 #[error("Block error: {0}")]
59 BlockError(#[from] ForestBlockError),
60 #[error("Querying tipsets from the network failed: {0}")]
61 NetworkTipsetQueryFailed(String),
62 #[error("BLS aggregate signature {0} was invalid for msgs {1}")]
63 BlsAggregateSignatureInvalid(String, String),
64 #[error("Message signature invalid: {0}")]
65 MessageSignatureInvalid(String),
66 #[error("Block message root does not match: expected {0}, computed {1}")]
67 BlockMessageRootInvalid(String, String),
68 #[error("Computing message root failed: {0}")]
69 ComputingMessageRoot(String),
70 #[error("Resolving address from message failed: {0}")]
71 ResolvingAddressFromMessage(String),
72 #[error("Loading tipset parent from the store failed: {0}")]
73 TipsetParentNotFound(ChainStoreError),
74 #[error("Consensus error: {0}")]
75 ConsensusError(FilecoinConsensusError),
76}
77
78impl From<tokio::task::JoinError> for TipsetSyncerError {
79 fn from(err: tokio::task::JoinError) -> Self {
80 TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
81 }
82}
83
84impl TipsetSyncerError {
85 fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
88 let msg = errs.iter().map(|e| e.to_string()).collect_vec().join(", ");
89
90 TipsetSyncerError::Validation(msg)
91 }
92}
93
94pub async fn validate_tipset<DB: Blockstore + Send + Sync + 'static>(
99 state_manager: &Arc<StateManager<DB>>,
100 full_tipset: FullTipset,
101 bad_block_cache: Option<Arc<BadBlockCache>>,
102) -> Result<(), TipsetSyncerError> {
103 if full_tipset
104 .key()
105 .eq(state_manager.chain_store().genesis_tipset().key())
106 {
107 trace!("Skipping genesis tipset validation");
108 return Ok(());
109 }
110
111 let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
112
113 let epoch = full_tipset.epoch();
114 let parent_state = *full_tipset.parent_state();
115 let tipset_key = full_tipset.key();
116 trace!("Tipset keys: {tipset_key}");
117 let blocks = full_tipset.into_blocks();
118 let mut validations = JoinSet::new();
119 for b in blocks {
120 validations.spawn(validate_block(state_manager.clone(), Arc::new(b)));
121 }
122
123 while let Some(result) = validations.join_next().await {
124 match result? {
125 Ok(block) => {
126 state_manager
127 .chain_store()
128 .add_to_tipset_tracker(block.header());
129 }
130 Err((cid, why)) => {
131 warn!(
132 "Validating block [CID = {cid}, PARENT_STATE = {parent_state}] in EPOCH = {epoch} failed: {why}",
133 );
134 match &why {
135 TipsetSyncerError::TimeTravellingBlock(_, _) => {
136 }
139 _ => {
140 if StateTree::new_from_root(state_manager.blockstore_owned(), &parent_state)
142 .is_ok()
143 && let Some(bad_block_cache) = bad_block_cache
144 {
145 bad_block_cache.push(cid);
146 }
147 }
148 };
149 return Err(why);
150 }
151 }
152 }
153 drop(timer);
154 Ok(())
155}
156
157async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
177 state_manager: Arc<StateManager<DB>>,
178 block: Arc<Block>,
179) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
180 let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
181 trace!(
182 "Validating block: epoch = {}, weight = {}, key = {}",
183 block.header().epoch,
184 block.header().weight,
185 block.header().cid(),
186 );
187 let chain_store = state_manager.chain_store().clone();
188 let block_cid = block.cid();
189
190 let is_validated = chain_store.is_block_validated(block_cid);
192 if is_validated {
193 return Ok(block);
194 }
195
196 let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
197
198 let header = block.header();
199
200 block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
202 block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
203
204 let base_tipset = chain_store
205 .chain_index()
206 .load_required_tipset(&header.parents)
207 .map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
213
214 let lookback_state = ChainStore::get_lookback_tipset_for_round(
216 state_manager.chain_store().chain_index(),
217 state_manager.chain_config(),
218 &base_tipset,
219 block.header().epoch,
220 )
221 .map_err(|e| (*block_cid, e.into()))
222 .map(|(_, s)| Arc::new(s))?;
223
224 let work_addr = state_manager
227 .get_miner_work_addr(*lookback_state, &header.miner_address)
228 .map_err(|e| (*block_cid, e.into()))?;
229
230 let mut validations = JoinSet::new();
232
233 validations.spawn(check_block_messages(
235 state_manager.shallow_clone(),
236 block.shallow_clone(),
237 base_tipset.shallow_clone(),
238 ));
239
240 validations.spawn_blocking({
242 let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
243 let base_tipset = base_tipset.shallow_clone();
244 let block_store = state_manager.blockstore_owned();
245 let block = block.shallow_clone();
246 move || {
247 let base_fee = crate::chain::compute_base_fee(&block_store, &base_tipset, smoke_height)
248 .map_err(|e| {
249 TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
250 })?;
251 let parent_base_fee = &block.header.parent_base_fee;
252 if &base_fee != parent_base_fee {
253 return Err(TipsetSyncerError::Validation(format!(
254 "base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
255 )));
256 }
257 Ok(())
258 }
259 });
260
261 validations.spawn_blocking({
263 let block_store = state_manager.blockstore_owned();
264 let base_tipset = base_tipset.shallow_clone();
265 let weight = header.weight.clone();
266 move || {
267 let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
268 TipsetSyncerError::Calculation(format!("Error calculating weight: {e:#}"))
269 })?;
270 if weight != calc_weight {
271 return Err(TipsetSyncerError::Validation(format!(
272 "Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
273 )));
274 }
275 Ok(())
276 }
277 });
278
279 validations.spawn({
281 let state_manager = state_manager.clone();
282 let block = block.clone();
283 async move {
284 let header = block.header();
285 let ExecutedTipset {
286 state_root,
287 receipt_root,
288 ..
289 } = state_manager
290 .load_executed_tipset(&base_tipset)
291 .await
292 .map_err(|e| {
293 TipsetSyncerError::Calculation(format!("Failed to calculate state: {e:#}"))
294 })?;
295
296 if state_root != header.state_root {
297 return Err(TipsetSyncerError::Validation(format!(
298 "Parent state root did not match computed state: {} (header), {} (computed)",
299 header.state_root, state_root,
300 )));
301 }
302
303 if receipt_root != header.message_receipts {
304 return Err(TipsetSyncerError::Validation(format!(
305 "Parent receipt root did not match computed root: {} (header), {} (computed)",
306 header.message_receipts, receipt_root
307 )));
308 }
309 Ok(())
310 }
311 });
312
313 validations.spawn_blocking({
315 let block = block.clone();
316 move || {
317 block.header().verify_signature_against(&work_addr)?;
318 Ok(())
319 }
320 });
321
322 validations.spawn({
323 let block = block.clone();
324 async move {
325 consensus
326 .validate_block(state_manager, block)
327 .map_err(|errs| {
328 TipsetSyncerError::concat(
335 errs.into_iter_ne()
336 .map(TipsetSyncerError::ConsensusError)
337 .collect_vec(),
338 )
339 })
340 .await
341 }
342 });
343
344 if let Err(errs) = collect_errs(validations).await {
346 return Err((*block_cid, TipsetSyncerError::concat(errs)));
347 }
348
349 chain_store.mark_block_as_validated(block_cid);
350
351 Ok(block)
352}
353
354async fn check_block_messages<DB: Blockstore + Send + Sync + 'static>(
365 state_manager: Arc<StateManager<DB>>,
366 block: Arc<Block>,
367 base_tipset: Tipset,
368) -> Result<(), TipsetSyncerError> {
369 let network_version = state_manager
370 .chain_config()
371 .network_version(block.header.epoch);
372 let eth_chain_id = state_manager.chain_config().eth_chain_id;
373
374 if let Some(sig) = &block.header().bls_aggregate {
375 let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
378 let mut cids = Vec::with_capacity(block.bls_msgs().len());
379 let db = state_manager.blockstore();
380 for m in block.bls_msgs() {
381 let pk = StateManager::get_bls_public_key(db, &m.from, *base_tipset.parent_state())?;
382 pub_keys.push(pk);
383 cids.push(m.cid().to_bytes());
384 }
385
386 if !verify_bls_aggregate(
387 &cids.iter().map(|x| x.as_slice()).collect_vec(),
388 &pub_keys,
389 sig,
390 ) {
391 return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
392 format!("{sig:?}"),
393 format!("{cids:?}"),
394 ));
395 }
396 } else {
397 return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
398 }
399
400 let price_list = price_list_by_network_version(network_version);
401 let mut sum_gas_limit = 0;
402
403 let mut check_msg = |msg: &Message,
405 account_sequences: &mut HashMap<Address, u64>,
406 tree: &StateTree<DB>|
407 -> anyhow::Result<()> {
408 let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
410 valid_for_block_inclusion(msg, min_gas.total(), network_version)
411 .map_err(|e| anyhow::anyhow!("{}", e))?;
412 sum_gas_limit += msg.gas_limit;
413 if sum_gas_limit > BLOCK_GAS_LIMIT {
414 anyhow::bail!("block gas limit exceeded");
415 }
416
417 let sequence: u64 = match account_sequences.get(&msg.from()) {
420 Some(sequence) => *sequence,
421 None => {
422 let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
423 anyhow::anyhow!(
424 "Failed to retrieve nonce for addr: Actor does not exist in state"
425 )
426 })?;
427 let network_version = state_manager
428 .chain_config()
429 .network_version(block.header.epoch);
430 if !is_valid_for_sending(network_version, &actor) {
431 anyhow::bail!("not valid for sending!");
432 }
433 actor.sequence
434 }
435 };
436
437 if sequence != msg.sequence {
439 anyhow::bail!(
440 "Message has incorrect sequence (exp: {} got: {})",
441 sequence,
442 msg.sequence
443 );
444 }
445 account_sequences.insert(msg.from(), sequence + 1);
446 Ok(())
447 };
448
449 let mut account_sequences: HashMap<Address, u64> = HashMap::default();
450 let ExecutedTipset { state_root, .. } = state_manager
451 .load_executed_tipset(&base_tipset)
452 .await
453 .map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e:#}")))?;
454 let tree =
455 StateTree::new_from_root(state_manager.blockstore_owned(), &state_root).map_err(|e| {
456 TipsetSyncerError::Calculation(format!(
457 "Could not load from new state root in state manager: {e:#}"
458 ))
459 })?;
460
461 for (i, msg) in block.bls_msgs().iter().enumerate() {
463 check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
464 TipsetSyncerError::Validation(format!(
465 "Block had invalid BLS message at index {i}: {e:#}"
466 ))
467 })?;
468 }
469
470 for (i, msg) in block.secp_msgs().iter().enumerate() {
472 if msg.signature().signature_type() == SignatureType::Delegated
473 && !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
474 {
475 return Err(TipsetSyncerError::Validation(
476 "Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
477 ));
478 }
479 check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
480 TipsetSyncerError::Validation(format!(
481 "block had an invalid secp message at index {i}: {e:#}"
482 ))
483 })?;
484 let key_addr = state_manager
486 .resolve_to_key_addr(&msg.from(), &base_tipset)
487 .await
488 .map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
489 msg.signature
491 .authenticate_msg(eth_chain_id, msg, &key_addr)
492 .map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
493 }
494
495 let msg_root = TipsetValidator::compute_msg_root(
497 state_manager.blockstore(),
498 block.bls_msgs(),
499 block.secp_msgs(),
500 )
501 .map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
502 if block.header().messages != msg_root {
503 return Err(TipsetSyncerError::BlockMessageRootInvalid(
504 format!("{:?}", block.header().messages),
505 format!("{msg_root:?}"),
506 ));
507 }
508
509 Ok(())
510}
511
512fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
516 if header.signature.is_none() {
517 return Err(TipsetSyncerError::BlockWithoutSignature);
518 }
519 if header.bls_aggregate.is_none() {
520 return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
521 }
522 Ok(())
523}
524
525fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
527 let time_now = chrono::Utc::now().timestamp() as u64;
528 if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
529 return Err(TipsetSyncerError::TimeTravellingBlock(
530 time_now,
531 header.timestamp,
532 ));
533 } else if header.timestamp > time_now {
534 warn!(
535 "Got block from the future, but within clock drift threshold, {} > {}",
536 header.timestamp, time_now
537 );
538 }
539 Ok(())
540}