forest/chain_sync/
tipset_syncer.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::sync::Arc;
5
6use crate::chain_sync::BadBlockCache;
7use crate::networks::Height;
8use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
9use crate::shim::crypto::SignatureType;
10use crate::shim::{
11    address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
12    gas::price_list_by_network_version, message::Message, state_tree::StateTree,
13};
14use crate::state_manager::{Error as StateManagerError, StateManager, is_valid_for_sending};
15use crate::{
16    blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
17    fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
18};
19use crate::{
20    chain::{ChainStore, Error as ChainStoreError},
21    metrics::HistogramTimerExt,
22};
23use crate::{
24    eth::is_valid_eth_tx_for_sending,
25    message::{Message as MessageTrait, valid_for_block_inclusion},
26};
27use ahash::HashMap;
28use cid::Cid;
29use futures::TryFutureExt;
30use fvm_ipld_blockstore::Blockstore;
31use fvm_ipld_encoding::to_vec;
32use itertools::Itertools;
33use nunny::Vec as NonEmpty;
34use thiserror::Error;
35use tokio::task::JoinSet;
36use tracing::{error, trace, warn};
37
38use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
39
40#[derive(Debug, Error)]
41pub enum TipsetSyncerError {
42    #[error("Block must have a signature")]
43    BlockWithoutSignature,
44    #[error("Block without BLS aggregate signature")]
45    BlockWithoutBlsAggregate,
46    #[error("Block received from the future: now = {0}, block = {1}")]
47    TimeTravellingBlock(u64, u64),
48    #[error("Validation error: {0}")]
49    Validation(String),
50    #[error("Processing error: {0}")]
51    Calculation(String),
52    #[error("Chain store error: {0}")]
53    ChainStore(#[from] ChainStoreError),
54    #[error("StateManager error: {0}")]
55    StateManager(#[from] StateManagerError),
56    #[error("Block error: {0}")]
57    BlockError(#[from] ForestBlockError),
58    #[error("Querying tipsets from the network failed: {0}")]
59    NetworkTipsetQueryFailed(String),
60    #[error("BLS aggregate signature {0} was invalid for msgs {1}")]
61    BlsAggregateSignatureInvalid(String, String),
62    #[error("Message signature invalid: {0}")]
63    MessageSignatureInvalid(String),
64    #[error("Block message root does not match: expected {0}, computed {1}")]
65    BlockMessageRootInvalid(String, String),
66    #[error("Computing message root failed: {0}")]
67    ComputingMessageRoot(String),
68    #[error("Resolving address from message failed: {0}")]
69    ResolvingAddressFromMessage(String),
70    #[error("Loading tipset parent from the store failed: {0}")]
71    TipsetParentNotFound(ChainStoreError),
72    #[error("Consensus error: {0}")]
73    ConsensusError(FilecoinConsensusError),
74}
75
76impl From<tokio::task::JoinError> for TipsetSyncerError {
77    fn from(err: tokio::task::JoinError) -> Self {
78        TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
79    }
80}
81
82impl TipsetSyncerError {
83    /// Concatenate all validation error messages into one comma separated
84    /// version.
85    fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
86        let msg = errs
87            .iter()
88            .map(|e| e.to_string())
89            .collect::<Vec<_>>()
90            .join(", ");
91
92        TipsetSyncerError::Validation(msg)
93    }
94}
95
96/// Validates full blocks in the tipset in parallel (since the messages are not
97/// executed), adding the successful ones to the tipset tracker, and the failed
98/// ones to the bad block cache, depending on strategy. Any bad block fails
99/// validation.
100pub async fn validate_tipset<DB: Blockstore + Send + Sync + 'static>(
101    state_manager: &Arc<StateManager<DB>>,
102    chainstore: &ChainStore<DB>,
103    full_tipset: FullTipset,
104    genesis: &Tipset,
105    bad_block_cache: Option<Arc<BadBlockCache>>,
106) -> Result<(), TipsetSyncerError> {
107    if full_tipset.key().eq(genesis.key()) {
108        trace!("Skipping genesis tipset validation");
109        return Ok(());
110    }
111
112    let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
113
114    let epoch = full_tipset.epoch();
115    let full_tipset_key = full_tipset.key().clone();
116    trace!("Tipset keys: {full_tipset_key}");
117    let blocks = full_tipset.into_blocks();
118    let mut validations = JoinSet::new();
119    for b in blocks {
120        validations.spawn(validate_block(state_manager.clone(), Arc::new(b)));
121    }
122
123    while let Some(result) = validations.join_next().await {
124        match result? {
125            Ok(block) => {
126                chainstore.add_to_tipset_tracker(block.header());
127            }
128            Err((cid, why)) => {
129                warn!("Validating block [CID = {cid}] in EPOCH = {epoch} failed: {why}");
130                match &why {
131                    TipsetSyncerError::TimeTravellingBlock(_, _) => {
132                        // Do not mark a block as bad for temporary errors.
133                        // See <https://github.com/filecoin-project/lotus/blob/v1.34.1/chain/sync.go#L602> in Lotus
134                    }
135                    _ => {
136                        if let Some(bad_block_cache) = bad_block_cache {
137                            bad_block_cache.push(cid);
138                        }
139                    }
140                };
141                return Err(why);
142            }
143        }
144    }
145    drop(timer);
146    Ok(())
147}
148
149/// Validate the block according to the rules specific to the consensus being
150/// used, and the common rules that pertain to the assumptions of the
151/// `ChainSync` protocol.
152///
153/// Returns the validated block if `Ok`.
154/// Returns the block CID (for marking bad) and `Error` if invalid (`Err`).
155///
156/// Common validation includes:
157/// * Sanity checks
158/// * Clock drifts
159/// * Signatures
160/// * Message inclusion (fees, sequences)
161/// * Parent related fields: base fee, weight, the state root
162/// * NB: This is where the messages in the *parent* tipset are executed.
163///
164/// Consensus specific validation should include:
165/// * Checking that the messages in the block correspond to the agreed upon
166///   total ordering
167/// * That the block is a deterministic derivative of the underlying consensus
168async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
169    state_manager: Arc<StateManager<DB>>,
170    block: Arc<Block>,
171) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
172    let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
173    trace!(
174        "Validating block: epoch = {}, weight = {}, key = {}",
175        block.header().epoch,
176        block.header().weight,
177        block.header().cid(),
178    );
179    let chain_store = state_manager.chain_store().clone();
180    let block_cid = block.cid();
181
182    // Check block validation cache in store
183    let is_validated = chain_store.is_block_validated(block_cid);
184    if is_validated {
185        return Ok(block);
186    }
187
188    let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
189
190    let header = block.header();
191
192    // Check to ensure all optional values exist
193    block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
194    block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
195
196    let base_tipset = chain_store
197        .chain_index()
198        .load_required_tipset(&header.parents)
199        // The parent tipset will always be there when calling validate_block
200        // as part of the sync_tipset_range flow because all of the headers in the range
201        // have been committed to the store. When validate_block is called from sync_tipset
202        // this guarantee does not exist, so we create a specific error to inform the caller
203        // not to add this block to the bad blocks cache.
204        .map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
205
206    // Retrieve lookback tipset for validation
207    let lookback_state = ChainStore::get_lookback_tipset_for_round(
208        state_manager.chain_store().chain_index(),
209        state_manager.chain_config(),
210        &base_tipset,
211        block.header().epoch,
212    )
213    .map_err(|e| (*block_cid, e.into()))
214    .map(|(_, s)| Arc::new(s))?;
215
216    // Work address needed for async validations, so necessary
217    // to do sync to avoid duplication
218    let work_addr = state_manager
219        .get_miner_work_addr(*lookback_state, &header.miner_address)
220        .map_err(|e| (*block_cid, e.into()))?;
221
222    // Async validations
223    let mut validations = JoinSet::new();
224
225    // Check block messages
226    validations.spawn(check_block_messages(
227        state_manager.clone(),
228        block.clone(),
229        base_tipset.clone(),
230    ));
231
232    // Base fee check
233    validations.spawn_blocking({
234        let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
235        let base_tipset = base_tipset.clone();
236        let block_store = state_manager.blockstore_owned();
237        let block = Arc::clone(&block);
238        move || {
239            let base_fee = crate::chain::compute_base_fee(&block_store, &base_tipset, smoke_height)
240                .map_err(|e| {
241                    TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
242                })?;
243            let parent_base_fee = &block.header.parent_base_fee;
244            if &base_fee != parent_base_fee {
245                return Err(TipsetSyncerError::Validation(format!(
246                    "base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
247                )));
248            }
249            Ok(())
250        }
251    });
252
253    // Parent weight calculation check
254    validations.spawn_blocking({
255        let block_store = state_manager.blockstore_owned();
256        let base_tipset = base_tipset.clone();
257        let weight = header.weight.clone();
258        move || {
259            let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
260                TipsetSyncerError::Calculation(format!("Error calculating weight: {e}"))
261            })?;
262            if weight != calc_weight {
263                return Err(TipsetSyncerError::Validation(format!(
264                    "Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
265                )));
266            }
267            Ok(())
268        }
269    });
270
271    // State root and receipt root validations
272    validations.spawn({
273        let state_manager = state_manager.clone();
274        let block = block.clone();
275        async move {
276            let header = block.header();
277            let (state_root, receipt_root) = state_manager
278                .tipset_state(&base_tipset)
279                .await
280                .map_err(|e| {
281                    TipsetSyncerError::Calculation(format!("Failed to calculate state: {e}"))
282                })?;
283
284            if state_root != header.state_root {
285                return Err(TipsetSyncerError::Validation(format!(
286                    "Parent state root did not match computed state: {} (header), {} (computed)",
287                    header.state_root, state_root,
288                )));
289            }
290
291            if receipt_root != header.message_receipts {
292                return Err(TipsetSyncerError::Validation(format!(
293                    "Parent receipt root did not match computed root: {} (header), {} (computed)",
294                    header.message_receipts, receipt_root
295                )));
296            }
297            Ok(())
298        }
299    });
300
301    // Block signature check
302    validations.spawn_blocking({
303        let block = block.clone();
304        move || {
305            block.header().verify_signature_against(&work_addr)?;
306            Ok(())
307        }
308    });
309
310    validations.spawn({
311        let block = block.clone();
312        async move {
313            consensus
314                .validate_block(state_manager, block)
315                .map_err(|errs| {
316                    // NOTE: Concatenating errors here means the wrapper type of error
317                    // never surfaces, yet we always pay the cost of the generic argument.
318                    // But there's no reason `validate_block` couldn't return a list of all
319                    // errors instead of a single one that has all the error messages,
320                    // removing the caller's ability to distinguish between them.
321
322                    TipsetSyncerError::concat(
323                        errs.into_iter_ne()
324                            .map(TipsetSyncerError::ConsensusError)
325                            .collect_vec(),
326                    )
327                })
328                .await
329        }
330    });
331
332    // Collect the errors from the async validations
333    if let Err(errs) = collect_errs(validations).await {
334        return Err((*block_cid, TipsetSyncerError::concat(errs)));
335    }
336
337    chain_store.mark_block_as_validated(block_cid);
338
339    Ok(block)
340}
341
342/// Validate messages in a full block, relative to the parent tipset.
343///
344/// This includes:
345/// * signature checks
346/// * gas limits, and prices
347/// * account nonce values
348/// * the message root in the header
349///
350/// NB: This loads/computes the state resulting from the execution of the parent
351/// tipset.
352async fn check_block_messages<DB: Blockstore + Send + Sync + 'static>(
353    state_manager: Arc<StateManager<DB>>,
354    block: Arc<Block>,
355    base_tipset: Tipset,
356) -> Result<(), TipsetSyncerError> {
357    let network_version = state_manager
358        .chain_config()
359        .network_version(block.header.epoch);
360    let eth_chain_id = state_manager.chain_config().eth_chain_id;
361
362    if let Some(sig) = &block.header().bls_aggregate {
363        // Do the initial loop here
364        // check block message and signatures in them
365        let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
366        let mut cids = Vec::with_capacity(block.bls_msgs().len());
367        let db = state_manager.blockstore();
368        for m in block.bls_msgs() {
369            let pk = StateManager::get_bls_public_key(db, &m.from, *base_tipset.parent_state())?;
370            pub_keys.push(pk);
371            cids.push(m.cid().to_bytes());
372        }
373
374        if !verify_bls_aggregate(
375            &cids.iter().map(|x| x.as_slice()).collect_vec(),
376            &pub_keys,
377            sig,
378        ) {
379            return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
380                format!("{sig:?}"),
381                format!("{cids:?}"),
382            ));
383        }
384    } else {
385        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
386    }
387
388    let price_list = price_list_by_network_version(network_version);
389    let mut sum_gas_limit = 0;
390
391    // Check messages for validity
392    let mut check_msg = |msg: &Message,
393                         account_sequences: &mut HashMap<Address, u64>,
394                         tree: &StateTree<DB>|
395     -> Result<(), anyhow::Error> {
396        // Phase 1: Syntactic validation
397        let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
398        valid_for_block_inclusion(msg, min_gas.total(), network_version)
399            .map_err(|e| anyhow::anyhow!("{}", e))?;
400        sum_gas_limit += msg.gas_limit;
401        if sum_gas_limit > BLOCK_GAS_LIMIT {
402            anyhow::bail!("block gas limit exceeded");
403        }
404
405        // Phase 2: (Partial) Semantic validation
406        // Send exists and is an account actor, and sequence is correct
407        let sequence: u64 = match account_sequences.get(&msg.from()) {
408            Some(sequence) => *sequence,
409            None => {
410                let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
411                    anyhow::anyhow!(
412                        "Failed to retrieve nonce for addr: Actor does not exist in state"
413                    )
414                })?;
415                let network_version = state_manager
416                    .chain_config()
417                    .network_version(block.header.epoch);
418                if !is_valid_for_sending(network_version, &actor) {
419                    anyhow::bail!("not valid for sending!");
420                }
421                actor.sequence
422            }
423        };
424
425        // Sequence equality check
426        if sequence != msg.sequence {
427            anyhow::bail!(
428                "Message has incorrect sequence (exp: {} got: {})",
429                sequence,
430                msg.sequence
431            );
432        }
433        account_sequences.insert(msg.from(), sequence + 1);
434        Ok(())
435    };
436
437    let mut account_sequences: HashMap<Address, u64> = HashMap::default();
438    let (state_root, _) = state_manager
439        .tipset_state(&base_tipset)
440        .await
441        .map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e}")))?;
442    let tree =
443        StateTree::new_from_root(state_manager.blockstore_owned(), &state_root).map_err(|e| {
444            TipsetSyncerError::Calculation(format!(
445                "Could not load from new state root in state manager: {e}"
446            ))
447        })?;
448
449    // Check validity for BLS messages
450    for (i, msg) in block.bls_msgs().iter().enumerate() {
451        check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
452            TipsetSyncerError::Validation(format!(
453                "Block had invalid BLS message at index {i}: {e}"
454            ))
455        })?;
456    }
457
458    // Check validity for SECP messages
459    for (i, msg) in block.secp_msgs().iter().enumerate() {
460        if msg.signature().signature_type() == SignatureType::Delegated
461            && !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
462        {
463            return Err(TipsetSyncerError::Validation(
464                "Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
465            ));
466        }
467        check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
468            TipsetSyncerError::Validation(format!(
469                "block had an invalid secp message at index {i}: {e}"
470            ))
471        })?;
472        // Resolve key address for signature verification
473        let key_addr = state_manager
474            .resolve_to_key_addr(&msg.from(), &base_tipset)
475            .await
476            .map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
477        // SecP256K1 Signature validation
478        msg.signature
479            .authenticate_msg(eth_chain_id, msg, &key_addr)
480            .map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
481    }
482
483    // Validate message root from header matches message root
484    let msg_root = TipsetValidator::compute_msg_root(
485        state_manager.blockstore(),
486        block.bls_msgs(),
487        block.secp_msgs(),
488    )
489    .map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
490    if block.header().messages != msg_root {
491        return Err(TipsetSyncerError::BlockMessageRootInvalid(
492            format!("{:?}", block.header().messages),
493            format!("{msg_root:?}"),
494        ));
495    }
496
497    Ok(())
498}
499
500/// Checks optional values in header.
501///
502/// It only looks for fields which are common to all consensus types.
503fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
504    if header.signature.is_none() {
505        return Err(TipsetSyncerError::BlockWithoutSignature);
506    }
507    if header.bls_aggregate.is_none() {
508        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
509    }
510    Ok(())
511}
512
513/// Check the clock drift.
514fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
515    let time_now = chrono::Utc::now().timestamp() as u64;
516    if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
517        return Err(TipsetSyncerError::TimeTravellingBlock(
518            time_now,
519            header.timestamp,
520        ));
521    } else if header.timestamp > time_now {
522        warn!(
523            "Got block from the future, but within clock drift threshold, {} > {}",
524            header.timestamp, time_now
525        );
526    }
527    Ok(())
528}