Skip to main content

forest/chain_sync/
tipset_syncer.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::sync::Arc;
5
6use crate::chain_sync::BadBlockCache;
7use crate::db::EthMappingsStore;
8use crate::networks::Height;
9use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
10use crate::shim::crypto::SignatureType;
11use crate::shim::{
12    address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
13    gas::price_list_by_network_version, message::Message, state_tree::StateTree,
14};
15use crate::state_manager::ExecutedTipset;
16use crate::state_manager::{Error as StateManagerError, StateManager, utils::is_valid_for_sending};
17use crate::utils::ShallowClone as _;
18use crate::{
19    blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
20    fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
21};
22use crate::{
23    chain::{ChainStore, Error as ChainStoreError},
24    metrics::HistogramTimerExt,
25};
26use crate::{
27    eth::is_valid_eth_tx_for_sending,
28    message::{MessageRead as _, valid_for_block_inclusion},
29};
30use ahash::HashMap;
31use cid::Cid;
32use futures::TryFutureExt;
33use fvm_ipld_blockstore::Blockstore;
34use fvm_ipld_encoding::to_vec;
35use itertools::Itertools;
36use nunny::Vec as NonEmpty;
37use thiserror::Error;
38use tokio::task::JoinSet;
39use tracing::{trace, warn};
40
41use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
42
43#[derive(Debug, Error)]
44pub enum TipsetSyncerError {
45    #[error("Block must have a signature")]
46    BlockWithoutSignature,
47    #[error("Block without BLS aggregate signature")]
48    BlockWithoutBlsAggregate,
49    #[error("Block received from the future: now = {0}, block = {1}")]
50    TimeTravellingBlock(u64, u64),
51    #[error("Validation error: {0}")]
52    Validation(String),
53    #[error("Processing error: {0}")]
54    Calculation(String),
55    #[error("Chain store error: {0}")]
56    ChainStore(#[from] ChainStoreError),
57    #[error("StateManager error: {0}")]
58    StateManager(#[from] StateManagerError),
59    #[error("Block error: {0}")]
60    BlockError(#[from] ForestBlockError),
61    #[error("Querying tipsets from the network failed: {0}")]
62    NetworkTipsetQueryFailed(String),
63    #[error("BLS aggregate signature {0} was invalid for msgs {1}")]
64    BlsAggregateSignatureInvalid(String, String),
65    #[error("Message signature invalid: {0}")]
66    MessageSignatureInvalid(String),
67    #[error("Block message root does not match: expected {0}, computed {1}")]
68    BlockMessageRootInvalid(String, String),
69    #[error("Computing message root failed: {0}")]
70    ComputingMessageRoot(String),
71    #[error("Resolving address from message failed: {0}")]
72    ResolvingAddressFromMessage(String),
73    #[error("Loading tipset parent from the store failed: {0}")]
74    TipsetParentNotFound(ChainStoreError),
75    #[error("Consensus error: {0}")]
76    ConsensusError(FilecoinConsensusError),
77}
78
79impl From<tokio::task::JoinError> for TipsetSyncerError {
80    fn from(err: tokio::task::JoinError) -> Self {
81        TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
82    }
83}
84
85impl TipsetSyncerError {
86    /// Concatenate all validation error messages into one comma separated
87    /// version.
88    fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
89        let msg = errs.iter().map(|e| e.to_string()).collect_vec().join(", ");
90
91        TipsetSyncerError::Validation(msg)
92    }
93}
94
95/// Validates full blocks in the tipset in parallel (since the messages are not
96/// executed), adding the successful ones to the tipset tracker, and the failed
97/// ones to the bad block cache, depending on strategy. Any bad block fails
98/// validation.
99pub async fn validate_tipset<DB: Blockstore + EthMappingsStore + Send + Sync + 'static>(
100    state_manager: &Arc<StateManager<DB>>,
101    full_tipset: FullTipset,
102    bad_block_cache: Option<Arc<BadBlockCache>>,
103) -> Result<(), TipsetSyncerError> {
104    if full_tipset
105        .key()
106        .eq(state_manager.chain_store().genesis_tipset().key())
107    {
108        trace!("Skipping genesis tipset validation");
109        return Ok(());
110    }
111
112    let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
113
114    let epoch = full_tipset.epoch();
115    let parent_state = *full_tipset.parent_state();
116    let tipset_key = full_tipset.key();
117    trace!("Tipset keys: {tipset_key}");
118    let blocks = full_tipset.into_blocks();
119    let mut validations = JoinSet::new();
120    for b in blocks {
121        validations.spawn(validate_block(state_manager.clone(), Arc::new(b)));
122    }
123
124    while let Some(result) = validations.join_next().await {
125        match result? {
126            Ok(block) => {
127                state_manager
128                    .chain_store()
129                    .add_to_tipset_tracker(block.header());
130            }
131            Err((cid, why)) => {
132                warn!(
133                    "Validating block [CID = {cid}, PARENT_STATE = {parent_state}] in EPOCH = {epoch} failed: {why}",
134                );
135                match &why {
136                    TipsetSyncerError::TimeTravellingBlock(_, _) => {
137                        // Do not mark a block as bad for temporary errors.
138                        // See <https://github.com/filecoin-project/lotus/blob/v1.34.1/chain/sync.go#L602> in Lotus
139                    }
140                    _ => {
141                        // Do not mark block as bad if the parent state tree does not exist
142                        if StateTree::new_from_root(state_manager.blockstore_owned(), &parent_state)
143                            .is_ok()
144                            && let Some(bad_block_cache) = bad_block_cache
145                        {
146                            bad_block_cache.push(cid);
147                        }
148                    }
149                };
150                return Err(why);
151            }
152        }
153    }
154    drop(timer);
155    Ok(())
156}
157
158/// Validate the block according to the rules specific to the consensus being
159/// used, and the common rules that pertain to the assumptions of the
160/// `ChainSync` protocol.
161///
162/// Returns the validated block if `Ok`.
163/// Returns the block CID (for marking bad) and `Error` if invalid (`Err`).
164///
165/// Common validation includes:
166/// * Sanity checks
167/// * Clock drifts
168/// * Signatures
169/// * Message inclusion (fees, sequences)
170/// * Parent related fields: base fee, weight, the state root
171/// * NB: This is where the messages in the *parent* tipset are executed.
172///
173/// Consensus specific validation should include:
174/// * Checking that the messages in the block correspond to the agreed upon
175///   total ordering
176/// * That the block is a deterministic derivative of the underlying consensus
177async fn validate_block<DB: Blockstore + EthMappingsStore + Sync + Send + 'static>(
178    state_manager: Arc<StateManager<DB>>,
179    block: Arc<Block>,
180) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
181    let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
182    trace!(
183        "Validating block: epoch = {}, weight = {}, key = {}",
184        block.header().epoch,
185        block.header().weight,
186        block.header().cid(),
187    );
188    let chain_store = state_manager.chain_store().clone();
189    let block_cid = block.cid();
190
191    // Check block validation cache in store
192    let is_validated = chain_store.is_block_validated(block_cid);
193    if is_validated {
194        return Ok(block);
195    }
196
197    let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
198
199    let header = block.header();
200
201    // Check to ensure all optional values exist
202    block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
203    block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
204
205    let base_tipset = chain_store
206        .chain_index()
207        .load_required_tipset(&header.parents)
208        // The parent tipset will always be there when calling validate_block
209        // as part of the sync_tipset_range flow because all of the headers in the range
210        // have been committed to the store. When validate_block is called from sync_tipset
211        // this guarantee does not exist, so we create a specific error to inform the caller
212        // not to add this block to the bad blocks cache.
213        .map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
214
215    // Retrieve lookback tipset for validation
216    let lookback_state = ChainStore::get_lookback_tipset_for_round(
217        state_manager.chain_store().chain_index(),
218        state_manager.chain_config(),
219        &base_tipset,
220        block.header().epoch,
221    )
222    .map_err(|e| (*block_cid, e.into()))
223    .map(|(_, s)| Arc::new(s))?;
224
225    // Work address needed for async validations, so necessary
226    // to do sync to avoid duplication
227    let work_addr = state_manager
228        .get_miner_work_addr(*lookback_state, &header.miner_address)
229        .map_err(|e| (*block_cid, e.into()))?;
230
231    // Async validations
232    let mut validations = JoinSet::new();
233
234    // Check block messages
235    validations.spawn(check_block_messages(
236        state_manager.shallow_clone(),
237        block.shallow_clone(),
238        base_tipset.shallow_clone(),
239    ));
240
241    // Base fee check
242    validations.spawn_blocking({
243        let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
244        let firehorse_height = state_manager.chain_config().epoch(Height::FireHorse);
245        let base_tipset = base_tipset.shallow_clone();
246        let block_store = state_manager.blockstore_owned();
247        let block = block.shallow_clone();
248        move || {
249            let base_fee = crate::chain::compute_base_fee(
250                &block_store,
251                &base_tipset,
252                smoke_height,
253                firehorse_height,
254            )
255            .map_err(|e| {
256                TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
257            })?;
258            let parent_base_fee = &block.header.parent_base_fee;
259            if &base_fee != parent_base_fee {
260                return Err(TipsetSyncerError::Validation(format!(
261                    "base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
262                )));
263            }
264            Ok(())
265        }
266    });
267
268    // Parent weight calculation check
269    validations.spawn_blocking({
270        let block_store = state_manager.blockstore_owned();
271        let base_tipset = base_tipset.shallow_clone();
272        let weight = header.weight.clone();
273        move || {
274            let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
275                TipsetSyncerError::Calculation(format!("Error calculating weight: {e:#}"))
276            })?;
277            if weight != calc_weight {
278                return Err(TipsetSyncerError::Validation(format!(
279                    "Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
280                )));
281            }
282            Ok(())
283        }
284    });
285
286    // State root and receipt root validations
287    validations.spawn({
288        let state_manager = state_manager.clone();
289        let block = block.clone();
290        async move {
291            let header = block.header();
292            let ExecutedTipset {
293                state_root,
294                receipt_root,
295                ..
296            } = state_manager
297                .load_executed_tipset(&base_tipset)
298                .await
299                .map_err(|e| {
300                    TipsetSyncerError::Calculation(format!("Failed to calculate state: {e:#}"))
301                })?;
302
303            if state_root != header.state_root {
304                return Err(TipsetSyncerError::Validation(format!(
305                    "Parent state root did not match computed state: {} (header), {} (computed)",
306                    header.state_root, state_root,
307                )));
308            }
309
310            if receipt_root != header.message_receipts {
311                return Err(TipsetSyncerError::Validation(format!(
312                    "Parent receipt root did not match computed root: {} (header), {} (computed)",
313                    header.message_receipts, receipt_root
314                )));
315            }
316            Ok(())
317        }
318    });
319
320    // Block signature check
321    validations.spawn_blocking({
322        let block = block.clone();
323        move || {
324            block.header().verify_signature_against(&work_addr)?;
325            Ok(())
326        }
327    });
328
329    validations.spawn({
330        let block = block.clone();
331        async move {
332            consensus
333                .validate_block(state_manager, block)
334                .map_err(|errs| {
335                    // NOTE: Concatenating errors here means the wrapper type of error
336                    // never surfaces, yet we always pay the cost of the generic argument.
337                    // But there's no reason `validate_block` couldn't return a list of all
338                    // errors instead of a single one that has all the error messages,
339                    // removing the caller's ability to distinguish between them.
340
341                    TipsetSyncerError::concat(
342                        errs.into_iter_ne()
343                            .map(TipsetSyncerError::ConsensusError)
344                            .collect_vec(),
345                    )
346                })
347                .await
348        }
349    });
350
351    // Collect the errors from the async validations
352    if let Err(errs) = collect_errs(validations).await {
353        return Err((*block_cid, TipsetSyncerError::concat(errs)));
354    }
355
356    chain_store.mark_block_as_validated(block_cid);
357
358    Ok(block)
359}
360
361/// Validate messages in a full block, relative to the parent tipset.
362///
363/// This includes:
364/// * signature checks
365/// * gas limits, and prices
366/// * account nonce values
367/// * the message root in the header
368///
369/// NB: This loads/computes the state resulting from the execution of the parent
370/// tipset.
371async fn check_block_messages<DB: Blockstore + EthMappingsStore + Send + Sync + 'static>(
372    state_manager: Arc<StateManager<DB>>,
373    block: Arc<Block>,
374    base_tipset: Tipset,
375) -> Result<(), TipsetSyncerError> {
376    let network_version = state_manager
377        .chain_config()
378        .network_version(block.header.epoch);
379    let eth_chain_id = state_manager.chain_config().eth_chain_id;
380
381    if let Some(sig) = &block.header().bls_aggregate {
382        // Do the initial loop here
383        // check block message and signatures in them
384        let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
385        let mut cids = Vec::with_capacity(block.bls_msgs().len());
386        let db = state_manager.blockstore();
387        for m in block.bls_msgs() {
388            let pk = StateManager::get_bls_public_key(db, &m.from, *base_tipset.parent_state())?;
389            pub_keys.push(pk);
390            cids.push(m.cid().to_bytes());
391        }
392
393        if !verify_bls_aggregate(
394            &cids.iter().map(|x| x.as_slice()).collect_vec(),
395            &pub_keys,
396            sig,
397        ) {
398            return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
399                format!("{sig:?}"),
400                format!("{cids:?}"),
401            ));
402        }
403    } else {
404        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
405    }
406
407    let price_list = price_list_by_network_version(network_version);
408    let mut sum_gas_limit = 0;
409
410    // Check messages for validity
411    let mut check_msg = |msg: &Message,
412                         account_sequences: &mut HashMap<Address, u64>,
413                         tree: &StateTree<DB>|
414     -> anyhow::Result<()> {
415        // Phase 1: Syntactic validation
416        let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
417        valid_for_block_inclusion(msg, min_gas.total(), network_version)
418            .map_err(|e| anyhow::anyhow!("{}", e))?;
419        sum_gas_limit += msg.gas_limit;
420        if sum_gas_limit > BLOCK_GAS_LIMIT {
421            anyhow::bail!("block gas limit exceeded");
422        }
423
424        // Phase 2: (Partial) Semantic validation
425        // Send exists and is an account actor, and sequence is correct
426        let sequence: u64 = match account_sequences.get(&msg.from()) {
427            Some(sequence) => *sequence,
428            None => {
429                let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
430                    anyhow::anyhow!(
431                        "Failed to retrieve nonce for addr: Actor does not exist in state"
432                    )
433                })?;
434                let network_version = state_manager
435                    .chain_config()
436                    .network_version(block.header.epoch);
437                if !is_valid_for_sending(network_version, &actor) {
438                    anyhow::bail!("not valid for sending!");
439                }
440                actor.sequence
441            }
442        };
443
444        // Sequence equality check
445        if sequence != msg.sequence {
446            anyhow::bail!(
447                "Message has incorrect sequence (exp: {} got: {})",
448                sequence,
449                msg.sequence
450            );
451        }
452        account_sequences.insert(msg.from(), sequence + 1);
453        Ok(())
454    };
455
456    let mut account_sequences: HashMap<Address, u64> = HashMap::default();
457    let ExecutedTipset { state_root, .. } = state_manager
458        .load_executed_tipset(&base_tipset)
459        .await
460        .map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e:#}")))?;
461    let tree =
462        StateTree::new_from_root(state_manager.blockstore_owned(), &state_root).map_err(|e| {
463            TipsetSyncerError::Calculation(format!(
464                "Could not load from new state root in state manager: {e:#}"
465            ))
466        })?;
467
468    // Check validity for BLS messages
469    for (i, msg) in block.bls_msgs().iter().enumerate() {
470        check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
471            TipsetSyncerError::Validation(format!(
472                "Block had invalid BLS message at index {i}: {e:#}"
473            ))
474        })?;
475    }
476
477    // Check validity for SECP messages
478    for (i, msg) in block.secp_msgs().iter().enumerate() {
479        if msg.signature().signature_type() == SignatureType::Delegated
480            && !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
481        {
482            return Err(TipsetSyncerError::Validation(
483                "Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
484            ));
485        }
486        check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
487            TipsetSyncerError::Validation(format!(
488                "block had an invalid secp message at index {i}: {e:#}"
489            ))
490        })?;
491        // Resolve key address for signature verification
492        let key_addr = state_manager
493            .resolve_to_key_addr(&msg.from(), &base_tipset)
494            .await
495            .map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
496        // SecP256K1 Signature validation
497        msg.signature
498            .authenticate_msg(eth_chain_id, msg, &key_addr)
499            .map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
500    }
501
502    // Validate message root from header matches message root
503    let msg_root = TipsetValidator::compute_msg_root(
504        state_manager.blockstore(),
505        block.bls_msgs(),
506        block.secp_msgs(),
507    )
508    .map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
509    if block.header().messages != msg_root {
510        return Err(TipsetSyncerError::BlockMessageRootInvalid(
511            format!("{:?}", block.header().messages),
512            format!("{msg_root:?}"),
513        ));
514    }
515
516    Ok(())
517}
518
519/// Checks optional values in header.
520///
521/// It only looks for fields which are common to all consensus types.
522fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
523    if header.signature.is_none() {
524        return Err(TipsetSyncerError::BlockWithoutSignature);
525    }
526    if header.bls_aggregate.is_none() {
527        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
528    }
529    Ok(())
530}
531
532/// Check the clock drift.
533fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
534    let time_now = chrono::Utc::now().timestamp() as u64;
535    if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
536        return Err(TipsetSyncerError::TimeTravellingBlock(
537            time_now,
538            header.timestamp,
539        ));
540    } else if header.timestamp > time_now {
541        warn!(
542            "Got block from the future, but within clock drift threshold, {} > {}",
543            header.timestamp, time_now
544        );
545    }
546    Ok(())
547}