1use crate::chain_sync::BadBlockCache;
5use crate::db::DbImpl;
6use crate::networks::Height;
7use crate::prelude::*;
8use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
9use crate::shim::crypto::SignatureType;
10use crate::shim::message::Message;
11use crate::shim::{
12 address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
13 gas::price_list_by_network_version, state_tree::StateTree,
14};
15use crate::state_manager::ExecutedTipset;
16use crate::state_manager::{Error as StateManagerError, StateManager, utils::is_valid_for_sending};
17use crate::{
18 blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
19 fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
20};
21use crate::{
22 chain::{ChainStore, Error as ChainStoreError},
23 metrics::HistogramTimerExt,
24};
25use crate::{
26 eth::is_valid_eth_tx_for_sending,
27 message::{MessageRead as _, valid_for_block_inclusion},
28};
29use ahash::HashMap;
30use futures::TryFutureExt;
31use fvm_ipld_encoding::to_vec;
32use nunny::Vec as NonEmpty;
33use thiserror::Error;
34use tokio::task::JoinSet;
35use tracing::{trace, warn};
36
37use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
38
39#[derive(Debug, Error)]
40pub enum TipsetSyncerError {
41 #[error("Block must have a signature")]
42 BlockWithoutSignature,
43 #[error("Block without BLS aggregate signature")]
44 BlockWithoutBlsAggregate,
45 #[error("Block received from the future: now = {0}, block = {1}")]
46 TimeTravellingBlock(u64, u64),
47 #[error("Validation error: {0}")]
48 Validation(String),
49 #[error("Processing error: {0}")]
50 Calculation(String),
51 #[error("Chain store error: {0}")]
52 ChainStore(#[from] ChainStoreError),
53 #[error("StateManager error: {0}")]
54 StateManager(#[from] StateManagerError),
55 #[error("Block error: {0}")]
56 BlockError(#[from] ForestBlockError),
57 #[error("Querying tipsets from the network failed: {0}")]
58 NetworkTipsetQueryFailed(String),
59 #[error("BLS aggregate signature {0} was invalid for msgs {1}")]
60 BlsAggregateSignatureInvalid(String, String),
61 #[error("Message signature invalid: {0}")]
62 MessageSignatureInvalid(String),
63 #[error("Block message root does not match: expected {0}, computed {1}")]
64 BlockMessageRootInvalid(String, String),
65 #[error("Computing message root failed: {0}")]
66 ComputingMessageRoot(String),
67 #[error("Resolving address from message failed: {0}")]
68 ResolvingAddressFromMessage(String),
69 #[error("Loading tipset parent from the store failed: {0}")]
70 TipsetParentNotFound(ChainStoreError),
71 #[error("Consensus error: {0}")]
72 ConsensusError(FilecoinConsensusError),
73}
74
75impl From<tokio::task::JoinError> for TipsetSyncerError {
76 fn from(err: tokio::task::JoinError) -> Self {
77 TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
78 }
79}
80
81impl TipsetSyncerError {
82 fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
85 let msg = errs.iter().map(|e| e.to_string()).collect_vec().join(", ");
86
87 TipsetSyncerError::Validation(msg)
88 }
89}
90
91pub async fn validate_tipset(
96 state_manager: &StateManager,
97 full_tipset: FullTipset,
98 bad_block_cache: Option<BadBlockCache>,
99) -> Result<(), TipsetSyncerError> {
100 if full_tipset
101 .key()
102 .eq(state_manager.chain_store().genesis_tipset().key())
103 {
104 trace!("Skipping genesis tipset validation");
105 return Ok(());
106 }
107
108 let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
109
110 let epoch = full_tipset.epoch();
111 let parent_state = *full_tipset.parent_state();
112 let tipset_key = full_tipset.key();
113 trace!("Tipset keys: {tipset_key}");
114 let blocks = full_tipset.into_blocks();
115 let mut validations = JoinSet::new();
116 for b in blocks {
117 validations.spawn(validate_block(state_manager.shallow_clone(), Arc::new(b)));
118 }
119
120 while let Some(result) = validations.join_next().await {
121 match result? {
122 Ok(block) => {
123 state_manager
124 .chain_store()
125 .add_to_tipset_tracker(block.header());
126 }
127 Err((cid, why)) => {
128 warn!(
129 "Validating block [CID = {cid}, PARENT_STATE = {parent_state}] in EPOCH = {epoch} failed: {why}",
130 );
131 match &why {
132 TipsetSyncerError::TimeTravellingBlock(_, _) => {
133 }
136 _ => {
137 if StateTree::new_from_root(state_manager.db(), &parent_state).is_ok()
139 && let Some(bad_block_cache) = bad_block_cache
140 {
141 bad_block_cache.push(cid);
142 }
143 }
144 };
145 return Err(why);
146 }
147 }
148 }
149 drop(timer);
150 Ok(())
151}
152
153async fn validate_block(
173 state_manager: StateManager,
174 block: Arc<Block>,
175) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
176 let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
177 trace!(
178 "Validating block: epoch = {}, weight = {}, key = {}",
179 block.header().epoch,
180 block.header().weight,
181 block.header().cid(),
182 );
183 let chain_store = state_manager.chain_store().shallow_clone();
184 let block_cid = block.cid();
185
186 let is_validated = chain_store.is_block_validated(block_cid);
188 if is_validated {
189 return Ok(block);
190 }
191
192 let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
193
194 let header = block.header();
195
196 block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
198 block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
199
200 let base_tipset = chain_store
201 .chain_index()
202 .load_required_tipset(&header.parents)
203 .map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
209
210 let lookback_state = ChainStore::get_lookback_tipset_for_round(
212 state_manager.chain_store().chain_index().shallow_clone(),
213 state_manager.chain_config().shallow_clone(),
214 base_tipset.shallow_clone(),
215 block.header().epoch,
216 )
217 .await
218 .map_err(|e| (*block_cid, e.into()))
219 .map(|(_, s)| Arc::new(s))?;
220
221 let work_addr = state_manager
224 .get_miner_work_addr(*lookback_state, &header.miner_address)
225 .map_err(|e| (*block_cid, e.into()))?;
226
227 let mut validations = JoinSet::new();
229
230 validations.spawn(check_block_messages(
232 state_manager.shallow_clone(),
233 block.shallow_clone(),
234 base_tipset.shallow_clone(),
235 ));
236
237 validations.spawn_blocking({
239 let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
240 let firehorse_height = state_manager.chain_config().epoch(Height::FireHorse);
241 let base_tipset = base_tipset.shallow_clone();
242 let block_store = state_manager.db_owned();
243 let block = block.shallow_clone();
244 move || {
245 let base_fee = crate::chain::compute_base_fee(
246 &block_store,
247 &base_tipset,
248 smoke_height,
249 firehorse_height,
250 )
251 .map_err(|e| {
252 TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
253 })?;
254 let parent_base_fee = &block.header.parent_base_fee;
255 if &base_fee != parent_base_fee {
256 return Err(TipsetSyncerError::Validation(format!(
257 "base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
258 )));
259 }
260 Ok(())
261 }
262 });
263
264 validations.spawn_blocking({
266 let block_store = state_manager.db_owned();
267 let base_tipset = base_tipset.shallow_clone();
268 let weight = header.weight.clone();
269 move || {
270 let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
271 TipsetSyncerError::Calculation(format!("Error calculating weight: {e:#}"))
272 })?;
273 if weight != calc_weight {
274 return Err(TipsetSyncerError::Validation(format!(
275 "Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
276 )));
277 }
278 Ok(())
279 }
280 });
281
282 validations.spawn({
284 let state_manager = state_manager.shallow_clone();
285 let block = block.shallow_clone();
286 async move {
287 let header = block.header();
288 let ExecutedTipset {
289 state_root,
290 receipt_root,
291 ..
292 } = state_manager
293 .load_executed_tipset(&base_tipset)
294 .await
295 .map_err(|e| {
296 TipsetSyncerError::Calculation(format!("Failed to calculate state: {e:#}"))
297 })?;
298
299 if state_root != header.state_root {
300 return Err(TipsetSyncerError::Validation(format!(
301 "Parent state root did not match computed state: {} (header), {} (computed)",
302 header.state_root, state_root,
303 )));
304 }
305
306 if receipt_root != header.message_receipts {
307 return Err(TipsetSyncerError::Validation(format!(
308 "Parent receipt root did not match computed root: {} (header), {} (computed)",
309 header.message_receipts, receipt_root
310 )));
311 }
312 Ok(())
313 }
314 });
315
316 validations.spawn_blocking({
318 let block = block.shallow_clone();
319 move || {
320 block.header().verify_signature_against(&work_addr)?;
321 Ok(())
322 }
323 });
324
325 validations.spawn({
326 let block = block.shallow_clone();
327 async move {
328 consensus
329 .validate_block(state_manager, block)
330 .map_err(|errs| {
331 TipsetSyncerError::concat(
338 errs.into_iter_ne()
339 .map(TipsetSyncerError::ConsensusError)
340 .collect_vec(),
341 )
342 })
343 .await
344 }
345 });
346
347 if let Err(errs) = collect_errs(validations).await {
349 return Err((*block_cid, TipsetSyncerError::concat(errs)));
350 }
351
352 chain_store.mark_block_as_validated(block_cid);
353
354 Ok(block)
355}
356
357async fn check_block_messages(
368 state_manager: StateManager,
369 block: Arc<Block>,
370 base_tipset: Tipset,
371) -> Result<(), TipsetSyncerError> {
372 let network_version = state_manager
373 .chain_config()
374 .network_version(block.header.epoch);
375 let eth_chain_id = state_manager.chain_config().eth_chain_id;
376
377 if let Some(sig) = &block.header().bls_aggregate {
378 let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
381 let mut cids = Vec::with_capacity(block.bls_msgs().len());
382 let db = state_manager.db();
383 for m in block.bls_msgs() {
384 let pk = StateManager::get_bls_public_key(db, m.from, *base_tipset.parent_state())?;
385 pub_keys.push(pk);
386 cids.push(m.cid().to_bytes());
387 }
388
389 if !verify_bls_aggregate(
390 &cids.iter().map(|x| x.as_slice()).collect_vec(),
391 &pub_keys,
392 sig,
393 ) {
394 return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
395 format!("{sig:?}"),
396 format!("{cids:?}"),
397 ));
398 }
399 } else {
400 return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
401 }
402
403 let price_list = price_list_by_network_version(network_version);
404 let mut sum_gas_limit = 0;
405
406 let mut check_msg = |msg: &Message,
408 account_sequences: &mut HashMap<Address, u64>,
409 tree: &StateTree<DbImpl>|
410 -> anyhow::Result<()> {
411 let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
413 valid_for_block_inclusion(msg, min_gas.total(), network_version)
414 .map_err(|e| anyhow::anyhow!("{}", e))?;
415 sum_gas_limit += msg.gas_limit;
416 if sum_gas_limit > BLOCK_GAS_LIMIT {
417 anyhow::bail!("block gas limit exceeded");
418 }
419
420 let sequence: u64 = match account_sequences.get(&msg.from()) {
423 Some(sequence) => *sequence,
424 None => {
425 let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
426 anyhow::anyhow!(
427 "Failed to retrieve nonce for addr: Actor does not exist in state"
428 )
429 })?;
430 let network_version = state_manager
431 .chain_config()
432 .network_version(block.header.epoch);
433 if !is_valid_for_sending(network_version, &actor) {
434 anyhow::bail!("not valid for sending!");
435 }
436 actor.sequence
437 }
438 };
439
440 if sequence != msg.sequence {
442 anyhow::bail!(
443 "Message has incorrect sequence (exp: {} got: {})",
444 sequence,
445 msg.sequence
446 );
447 }
448 account_sequences.insert(msg.from(), sequence + 1);
449 Ok(())
450 };
451
452 let mut account_sequences: HashMap<Address, u64> = HashMap::default();
453 let ExecutedTipset { state_root, .. } = state_manager
454 .load_executed_tipset(&base_tipset)
455 .await
456 .map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e:#}")))?;
457 let tree = StateTree::new_from_root(state_manager.db(), &state_root).map_err(|e| {
458 TipsetSyncerError::Calculation(format!(
459 "Could not load from new state root in state manager: {e:#}"
460 ))
461 })?;
462
463 for (i, msg) in block.bls_msgs().iter().enumerate() {
465 check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
466 TipsetSyncerError::Validation(format!(
467 "Block had invalid BLS message at index {i}: {e:#}"
468 ))
469 })?;
470 }
471
472 for (i, msg) in block.secp_msgs().iter().enumerate() {
474 if msg.signature().signature_type() == SignatureType::Delegated
475 && !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
476 {
477 return Err(TipsetSyncerError::Validation(
478 "Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
479 ));
480 }
481 check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
482 TipsetSyncerError::Validation(format!(
483 "block had an invalid secp message at index {i}: {e:#}"
484 ))
485 })?;
486 let key_addr = state_manager
488 .resolve_to_deterministic_address(msg.from(), &base_tipset)
489 .await
490 .map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
491 msg.signature
493 .authenticate_msg(eth_chain_id, msg, &key_addr)
494 .map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
495 }
496
497 let msg_root =
499 TipsetValidator::compute_msg_root(state_manager.db(), block.bls_msgs(), block.secp_msgs())
500 .map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
501 if block.header().messages != msg_root {
502 return Err(TipsetSyncerError::BlockMessageRootInvalid(
503 format!("{:?}", block.header().messages),
504 format!("{msg_root:?}"),
505 ));
506 }
507
508 Ok(())
509}
510
511fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
515 if header.signature.is_none() {
516 return Err(TipsetSyncerError::BlockWithoutSignature);
517 }
518 if header.bls_aggregate.is_none() {
519 return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
520 }
521 Ok(())
522}
523
524fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
526 let time_now = chrono::Utc::now().timestamp() as u64;
527 if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
528 return Err(TipsetSyncerError::TimeTravellingBlock(
529 time_now,
530 header.timestamp,
531 ));
532 } else if header.timestamp > time_now {
533 warn!(
534 "Got block from the future, but within clock drift threshold, {} > {}",
535 header.timestamp, time_now
536 );
537 }
538 Ok(())
539}