Skip to main content

forest/chain_sync/
tipset_syncer.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::sync::Arc;
5
6use crate::chain_sync::BadBlockCache;
7use crate::networks::Height;
8use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
9use crate::shim::crypto::SignatureType;
10use crate::shim::{
11    address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
12    gas::price_list_by_network_version, message::Message, state_tree::StateTree,
13};
14use crate::state_manager::ExecutedTipset;
15use crate::state_manager::{Error as StateManagerError, StateManager, utils::is_valid_for_sending};
16use crate::utils::ShallowClone as _;
17use crate::{
18    blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
19    fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
20};
21use crate::{
22    chain::{ChainStore, Error as ChainStoreError},
23    metrics::HistogramTimerExt,
24};
25use crate::{
26    eth::is_valid_eth_tx_for_sending,
27    message::{MessageRead as _, valid_for_block_inclusion},
28};
29use ahash::HashMap;
30use cid::Cid;
31use futures::TryFutureExt;
32use fvm_ipld_blockstore::Blockstore;
33use fvm_ipld_encoding::to_vec;
34use itertools::Itertools;
35use nunny::Vec as NonEmpty;
36use thiserror::Error;
37use tokio::task::JoinSet;
38use tracing::{trace, warn};
39
40use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
41
42#[derive(Debug, Error)]
43pub enum TipsetSyncerError {
44    #[error("Block must have a signature")]
45    BlockWithoutSignature,
46    #[error("Block without BLS aggregate signature")]
47    BlockWithoutBlsAggregate,
48    #[error("Block received from the future: now = {0}, block = {1}")]
49    TimeTravellingBlock(u64, u64),
50    #[error("Validation error: {0}")]
51    Validation(String),
52    #[error("Processing error: {0}")]
53    Calculation(String),
54    #[error("Chain store error: {0}")]
55    ChainStore(#[from] ChainStoreError),
56    #[error("StateManager error: {0}")]
57    StateManager(#[from] StateManagerError),
58    #[error("Block error: {0}")]
59    BlockError(#[from] ForestBlockError),
60    #[error("Querying tipsets from the network failed: {0}")]
61    NetworkTipsetQueryFailed(String),
62    #[error("BLS aggregate signature {0} was invalid for msgs {1}")]
63    BlsAggregateSignatureInvalid(String, String),
64    #[error("Message signature invalid: {0}")]
65    MessageSignatureInvalid(String),
66    #[error("Block message root does not match: expected {0}, computed {1}")]
67    BlockMessageRootInvalid(String, String),
68    #[error("Computing message root failed: {0}")]
69    ComputingMessageRoot(String),
70    #[error("Resolving address from message failed: {0}")]
71    ResolvingAddressFromMessage(String),
72    #[error("Loading tipset parent from the store failed: {0}")]
73    TipsetParentNotFound(ChainStoreError),
74    #[error("Consensus error: {0}")]
75    ConsensusError(FilecoinConsensusError),
76}
77
78impl From<tokio::task::JoinError> for TipsetSyncerError {
79    fn from(err: tokio::task::JoinError) -> Self {
80        TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
81    }
82}
83
84impl TipsetSyncerError {
85    /// Concatenate all validation error messages into one comma separated
86    /// version.
87    fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
88        let msg = errs.iter().map(|e| e.to_string()).collect_vec().join(", ");
89
90        TipsetSyncerError::Validation(msg)
91    }
92}
93
94/// Validates full blocks in the tipset in parallel (since the messages are not
95/// executed), adding the successful ones to the tipset tracker, and the failed
96/// ones to the bad block cache, depending on strategy. Any bad block fails
97/// validation.
98pub async fn validate_tipset<DB: Blockstore + Send + Sync + 'static>(
99    state_manager: &Arc<StateManager<DB>>,
100    full_tipset: FullTipset,
101    bad_block_cache: Option<Arc<BadBlockCache>>,
102) -> Result<(), TipsetSyncerError> {
103    if full_tipset
104        .key()
105        .eq(state_manager.chain_store().genesis_tipset().key())
106    {
107        trace!("Skipping genesis tipset validation");
108        return Ok(());
109    }
110
111    let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
112
113    let epoch = full_tipset.epoch();
114    let parent_state = *full_tipset.parent_state();
115    let tipset_key = full_tipset.key();
116    trace!("Tipset keys: {tipset_key}");
117    let blocks = full_tipset.into_blocks();
118    let mut validations = JoinSet::new();
119    for b in blocks {
120        validations.spawn(validate_block(state_manager.clone(), Arc::new(b)));
121    }
122
123    while let Some(result) = validations.join_next().await {
124        match result? {
125            Ok(block) => {
126                state_manager
127                    .chain_store()
128                    .add_to_tipset_tracker(block.header());
129            }
130            Err((cid, why)) => {
131                warn!(
132                    "Validating block [CID = {cid}, PARENT_STATE = {parent_state}] in EPOCH = {epoch} failed: {why}",
133                );
134                match &why {
135                    TipsetSyncerError::TimeTravellingBlock(_, _) => {
136                        // Do not mark a block as bad for temporary errors.
137                        // See <https://github.com/filecoin-project/lotus/blob/v1.34.1/chain/sync.go#L602> in Lotus
138                    }
139                    _ => {
140                        // Do not mark block as bad if the parent state tree does not exist
141                        if StateTree::new_from_root(state_manager.blockstore_owned(), &parent_state)
142                            .is_ok()
143                            && let Some(bad_block_cache) = bad_block_cache
144                        {
145                            bad_block_cache.push(cid);
146                        }
147                    }
148                };
149                return Err(why);
150            }
151        }
152    }
153    drop(timer);
154    Ok(())
155}
156
157/// Validate the block according to the rules specific to the consensus being
158/// used, and the common rules that pertain to the assumptions of the
159/// `ChainSync` protocol.
160///
161/// Returns the validated block if `Ok`.
162/// Returns the block CID (for marking bad) and `Error` if invalid (`Err`).
163///
164/// Common validation includes:
165/// * Sanity checks
166/// * Clock drifts
167/// * Signatures
168/// * Message inclusion (fees, sequences)
169/// * Parent related fields: base fee, weight, the state root
170/// * NB: This is where the messages in the *parent* tipset are executed.
171///
172/// Consensus specific validation should include:
173/// * Checking that the messages in the block correspond to the agreed upon
174///   total ordering
175/// * That the block is a deterministic derivative of the underlying consensus
176async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
177    state_manager: Arc<StateManager<DB>>,
178    block: Arc<Block>,
179) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
180    let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
181    trace!(
182        "Validating block: epoch = {}, weight = {}, key = {}",
183        block.header().epoch,
184        block.header().weight,
185        block.header().cid(),
186    );
187    let chain_store = state_manager.chain_store().clone();
188    let block_cid = block.cid();
189
190    // Check block validation cache in store
191    let is_validated = chain_store.is_block_validated(block_cid);
192    if is_validated {
193        return Ok(block);
194    }
195
196    let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
197
198    let header = block.header();
199
200    // Check to ensure all optional values exist
201    block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
202    block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
203
204    let base_tipset = chain_store
205        .chain_index()
206        .load_required_tipset(&header.parents)
207        // The parent tipset will always be there when calling validate_block
208        // as part of the sync_tipset_range flow because all of the headers in the range
209        // have been committed to the store. When validate_block is called from sync_tipset
210        // this guarantee does not exist, so we create a specific error to inform the caller
211        // not to add this block to the bad blocks cache.
212        .map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
213
214    // Retrieve lookback tipset for validation
215    let lookback_state = ChainStore::get_lookback_tipset_for_round(
216        state_manager.chain_store().chain_index(),
217        state_manager.chain_config(),
218        &base_tipset,
219        block.header().epoch,
220    )
221    .map_err(|e| (*block_cid, e.into()))
222    .map(|(_, s)| Arc::new(s))?;
223
224    // Work address needed for async validations, so necessary
225    // to do sync to avoid duplication
226    let work_addr = state_manager
227        .get_miner_work_addr(*lookback_state, &header.miner_address)
228        .map_err(|e| (*block_cid, e.into()))?;
229
230    // Async validations
231    let mut validations = JoinSet::new();
232
233    // Check block messages
234    validations.spawn(check_block_messages(
235        state_manager.shallow_clone(),
236        block.shallow_clone(),
237        base_tipset.shallow_clone(),
238    ));
239
240    // Base fee check
241    validations.spawn_blocking({
242        let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
243        let base_tipset = base_tipset.shallow_clone();
244        let block_store = state_manager.blockstore_owned();
245        let block = block.shallow_clone();
246        move || {
247            let base_fee = crate::chain::compute_base_fee(&block_store, &base_tipset, smoke_height)
248                .map_err(|e| {
249                    TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
250                })?;
251            let parent_base_fee = &block.header.parent_base_fee;
252            if &base_fee != parent_base_fee {
253                return Err(TipsetSyncerError::Validation(format!(
254                    "base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
255                )));
256            }
257            Ok(())
258        }
259    });
260
261    // Parent weight calculation check
262    validations.spawn_blocking({
263        let block_store = state_manager.blockstore_owned();
264        let base_tipset = base_tipset.shallow_clone();
265        let weight = header.weight.clone();
266        move || {
267            let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
268                TipsetSyncerError::Calculation(format!("Error calculating weight: {e:#}"))
269            })?;
270            if weight != calc_weight {
271                return Err(TipsetSyncerError::Validation(format!(
272                    "Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
273                )));
274            }
275            Ok(())
276        }
277    });
278
279    // State root and receipt root validations
280    validations.spawn({
281        let state_manager = state_manager.clone();
282        let block = block.clone();
283        async move {
284            let header = block.header();
285            let ExecutedTipset {
286                state_root,
287                receipt_root,
288                ..
289            } = state_manager
290                .load_executed_tipset(&base_tipset)
291                .await
292                .map_err(|e| {
293                    TipsetSyncerError::Calculation(format!("Failed to calculate state: {e:#}"))
294                })?;
295
296            if state_root != header.state_root {
297                return Err(TipsetSyncerError::Validation(format!(
298                    "Parent state root did not match computed state: {} (header), {} (computed)",
299                    header.state_root, state_root,
300                )));
301            }
302
303            if receipt_root != header.message_receipts {
304                return Err(TipsetSyncerError::Validation(format!(
305                    "Parent receipt root did not match computed root: {} (header), {} (computed)",
306                    header.message_receipts, receipt_root
307                )));
308            }
309            Ok(())
310        }
311    });
312
313    // Block signature check
314    validations.spawn_blocking({
315        let block = block.clone();
316        move || {
317            block.header().verify_signature_against(&work_addr)?;
318            Ok(())
319        }
320    });
321
322    validations.spawn({
323        let block = block.clone();
324        async move {
325            consensus
326                .validate_block(state_manager, block)
327                .map_err(|errs| {
328                    // NOTE: Concatenating errors here means the wrapper type of error
329                    // never surfaces, yet we always pay the cost of the generic argument.
330                    // But there's no reason `validate_block` couldn't return a list of all
331                    // errors instead of a single one that has all the error messages,
332                    // removing the caller's ability to distinguish between them.
333
334                    TipsetSyncerError::concat(
335                        errs.into_iter_ne()
336                            .map(TipsetSyncerError::ConsensusError)
337                            .collect_vec(),
338                    )
339                })
340                .await
341        }
342    });
343
344    // Collect the errors from the async validations
345    if let Err(errs) = collect_errs(validations).await {
346        return Err((*block_cid, TipsetSyncerError::concat(errs)));
347    }
348
349    chain_store.mark_block_as_validated(block_cid);
350
351    Ok(block)
352}
353
354/// Validate messages in a full block, relative to the parent tipset.
355///
356/// This includes:
357/// * signature checks
358/// * gas limits, and prices
359/// * account nonce values
360/// * the message root in the header
361///
362/// NB: This loads/computes the state resulting from the execution of the parent
363/// tipset.
364async fn check_block_messages<DB: Blockstore + Send + Sync + 'static>(
365    state_manager: Arc<StateManager<DB>>,
366    block: Arc<Block>,
367    base_tipset: Tipset,
368) -> Result<(), TipsetSyncerError> {
369    let network_version = state_manager
370        .chain_config()
371        .network_version(block.header.epoch);
372    let eth_chain_id = state_manager.chain_config().eth_chain_id;
373
374    if let Some(sig) = &block.header().bls_aggregate {
375        // Do the initial loop here
376        // check block message and signatures in them
377        let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
378        let mut cids = Vec::with_capacity(block.bls_msgs().len());
379        let db = state_manager.blockstore();
380        for m in block.bls_msgs() {
381            let pk = StateManager::get_bls_public_key(db, &m.from, *base_tipset.parent_state())?;
382            pub_keys.push(pk);
383            cids.push(m.cid().to_bytes());
384        }
385
386        if !verify_bls_aggregate(
387            &cids.iter().map(|x| x.as_slice()).collect_vec(),
388            &pub_keys,
389            sig,
390        ) {
391            return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
392                format!("{sig:?}"),
393                format!("{cids:?}"),
394            ));
395        }
396    } else {
397        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
398    }
399
400    let price_list = price_list_by_network_version(network_version);
401    let mut sum_gas_limit = 0;
402
403    // Check messages for validity
404    let mut check_msg = |msg: &Message,
405                         account_sequences: &mut HashMap<Address, u64>,
406                         tree: &StateTree<DB>|
407     -> anyhow::Result<()> {
408        // Phase 1: Syntactic validation
409        let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
410        valid_for_block_inclusion(msg, min_gas.total(), network_version)
411            .map_err(|e| anyhow::anyhow!("{}", e))?;
412        sum_gas_limit += msg.gas_limit;
413        if sum_gas_limit > BLOCK_GAS_LIMIT {
414            anyhow::bail!("block gas limit exceeded");
415        }
416
417        // Phase 2: (Partial) Semantic validation
418        // Send exists and is an account actor, and sequence is correct
419        let sequence: u64 = match account_sequences.get(&msg.from()) {
420            Some(sequence) => *sequence,
421            None => {
422                let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
423                    anyhow::anyhow!(
424                        "Failed to retrieve nonce for addr: Actor does not exist in state"
425                    )
426                })?;
427                let network_version = state_manager
428                    .chain_config()
429                    .network_version(block.header.epoch);
430                if !is_valid_for_sending(network_version, &actor) {
431                    anyhow::bail!("not valid for sending!");
432                }
433                actor.sequence
434            }
435        };
436
437        // Sequence equality check
438        if sequence != msg.sequence {
439            anyhow::bail!(
440                "Message has incorrect sequence (exp: {} got: {})",
441                sequence,
442                msg.sequence
443            );
444        }
445        account_sequences.insert(msg.from(), sequence + 1);
446        Ok(())
447    };
448
449    let mut account_sequences: HashMap<Address, u64> = HashMap::default();
450    let ExecutedTipset { state_root, .. } = state_manager
451        .load_executed_tipset(&base_tipset)
452        .await
453        .map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e:#}")))?;
454    let tree =
455        StateTree::new_from_root(state_manager.blockstore_owned(), &state_root).map_err(|e| {
456            TipsetSyncerError::Calculation(format!(
457                "Could not load from new state root in state manager: {e:#}"
458            ))
459        })?;
460
461    // Check validity for BLS messages
462    for (i, msg) in block.bls_msgs().iter().enumerate() {
463        check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
464            TipsetSyncerError::Validation(format!(
465                "Block had invalid BLS message at index {i}: {e:#}"
466            ))
467        })?;
468    }
469
470    // Check validity for SECP messages
471    for (i, msg) in block.secp_msgs().iter().enumerate() {
472        if msg.signature().signature_type() == SignatureType::Delegated
473            && !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
474        {
475            return Err(TipsetSyncerError::Validation(
476                "Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
477            ));
478        }
479        check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
480            TipsetSyncerError::Validation(format!(
481                "block had an invalid secp message at index {i}: {e:#}"
482            ))
483        })?;
484        // Resolve key address for signature verification
485        let key_addr = state_manager
486            .resolve_to_key_addr(&msg.from(), &base_tipset)
487            .await
488            .map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
489        // SecP256K1 Signature validation
490        msg.signature
491            .authenticate_msg(eth_chain_id, msg, &key_addr)
492            .map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
493    }
494
495    // Validate message root from header matches message root
496    let msg_root = TipsetValidator::compute_msg_root(
497        state_manager.blockstore(),
498        block.bls_msgs(),
499        block.secp_msgs(),
500    )
501    .map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
502    if block.header().messages != msg_root {
503        return Err(TipsetSyncerError::BlockMessageRootInvalid(
504            format!("{:?}", block.header().messages),
505            format!("{msg_root:?}"),
506        ));
507    }
508
509    Ok(())
510}
511
512/// Checks optional values in header.
513///
514/// It only looks for fields which are common to all consensus types.
515fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
516    if header.signature.is_none() {
517        return Err(TipsetSyncerError::BlockWithoutSignature);
518    }
519    if header.bls_aggregate.is_none() {
520        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
521    }
522    Ok(())
523}
524
525/// Check the clock drift.
526fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
527    let time_now = chrono::Utc::now().timestamp() as u64;
528    if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
529        return Err(TipsetSyncerError::TimeTravellingBlock(
530            time_now,
531            header.timestamp,
532        ));
533    } else if header.timestamp > time_now {
534        warn!(
535            "Got block from the future, but within clock drift threshold, {} > {}",
536            header.timestamp, time_now
537        );
538    }
539    Ok(())
540}