forest/chain_sync/
tipset_syncer.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::sync::Arc;
5
6use crate::chain_sync::BadBlockCache;
7use crate::networks::Height;
8use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
9use crate::shim::crypto::SignatureType;
10use crate::shim::{
11    address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
12    gas::price_list_by_network_version, message::Message, state_tree::StateTree,
13};
14use crate::state_manager::{Error as StateManagerError, StateManager, is_valid_for_sending};
15use crate::{
16    blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
17    fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
18};
19use crate::{
20    chain::{ChainStore, Error as ChainStoreError},
21    metrics::HistogramTimerExt,
22};
23use crate::{
24    eth::is_valid_eth_tx_for_sending,
25    message::{Message as MessageTrait, valid_for_block_inclusion},
26};
27use ahash::HashMap;
28use cid::Cid;
29use futures::TryFutureExt;
30use fvm_ipld_blockstore::Blockstore;
31use fvm_ipld_encoding::to_vec;
32use itertools::Itertools;
33use nunny::Vec as NonEmpty;
34use thiserror::Error;
35use tokio::task::JoinSet;
36use tracing::{error, trace, warn};
37
38use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
39
40#[derive(Debug, Error)]
41pub enum TipsetSyncerError {
42    #[error("Block must have a signature")]
43    BlockWithoutSignature,
44    #[error("Block without BLS aggregate signature")]
45    BlockWithoutBlsAggregate,
46    #[error("Block received from the future: now = {0}, block = {1}")]
47    TimeTravellingBlock(u64, u64),
48    #[error("Validation error: {0}")]
49    Validation(String),
50    #[error("Processing error: {0}")]
51    Calculation(String),
52    #[error("Chain store error: {0}")]
53    ChainStore(#[from] ChainStoreError),
54    #[error("StateManager error: {0}")]
55    StateManager(#[from] StateManagerError),
56    #[error("Block error: {0}")]
57    BlockError(#[from] ForestBlockError),
58    #[error("Querying tipsets from the network failed: {0}")]
59    NetworkTipsetQueryFailed(String),
60    #[error("BLS aggregate signature {0} was invalid for msgs {1}")]
61    BlsAggregateSignatureInvalid(String, String),
62    #[error("Message signature invalid: {0}")]
63    MessageSignatureInvalid(String),
64    #[error("Block message root does not match: expected {0}, computed {1}")]
65    BlockMessageRootInvalid(String, String),
66    #[error("Computing message root failed: {0}")]
67    ComputingMessageRoot(String),
68    #[error("Resolving address from message failed: {0}")]
69    ResolvingAddressFromMessage(String),
70    #[error("Loading tipset parent from the store failed: {0}")]
71    TipsetParentNotFound(ChainStoreError),
72    #[error("Consensus error: {0}")]
73    ConsensusError(FilecoinConsensusError),
74}
75
76impl From<tokio::task::JoinError> for TipsetSyncerError {
77    fn from(err: tokio::task::JoinError) -> Self {
78        TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
79    }
80}
81
82impl TipsetSyncerError {
83    /// Concatenate all validation error messages into one comma separated
84    /// version.
85    fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
86        let msg = errs
87            .iter()
88            .map(|e| e.to_string())
89            .collect::<Vec<_>>()
90            .join(", ");
91
92        TipsetSyncerError::Validation(msg)
93    }
94}
95
96/// Validates full blocks in the tipset in parallel (since the messages are not
97/// executed), adding the successful ones to the tipset tracker, and the failed
98/// ones to the bad block cache, depending on strategy. Any bad block fails
99/// validation.
100pub async fn validate_tipset<DB: Blockstore + Send + Sync + 'static>(
101    state_manager: Arc<StateManager<DB>>,
102    chainstore: &ChainStore<DB>,
103    full_tipset: FullTipset,
104    genesis: &Tipset,
105    bad_block_cache: Option<Arc<BadBlockCache>>,
106) -> Result<(), TipsetSyncerError> {
107    if full_tipset.key().eq(genesis.key()) {
108        trace!("Skipping genesis tipset validation");
109        return Ok(());
110    }
111
112    let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
113
114    let epoch = full_tipset.epoch();
115    let full_tipset_key = full_tipset.key().clone();
116    trace!("Tipset keys: {full_tipset_key}");
117    let blocks = full_tipset.into_blocks();
118    let mut validations = JoinSet::new();
119    for b in blocks {
120        validations.spawn(validate_block(state_manager.clone(), Arc::new(b)));
121    }
122
123    while let Some(result) = validations.join_next().await {
124        match result? {
125            Ok(block) => {
126                chainstore.add_to_tipset_tracker(block.header());
127            }
128            Err((cid, why)) => {
129                warn!("Validating block [CID = {cid}] in EPOCH = {epoch} failed: {why}");
130                match &why {
131                    TipsetSyncerError::TimeTravellingBlock(_, _) => {
132                        // Do not mark a block as bad for temporary errors.
133                        // See <https://github.com/filecoin-project/lotus/blob/v1.34.1/chain/sync.go#L602> in Lotus
134                    }
135                    _ => {
136                        if let Some(bad_block_cache) = bad_block_cache {
137                            bad_block_cache.push(cid);
138                        }
139                    }
140                };
141                return Err(why);
142            }
143        }
144    }
145    drop(timer);
146    Ok(())
147}
148
149/// Validate the block according to the rules specific to the consensus being
150/// used, and the common rules that pertain to the assumptions of the
151/// `ChainSync` protocol.
152///
153/// Returns the validated block if `Ok`.
154/// Returns the block CID (for marking bad) and `Error` if invalid (`Err`).
155///
156/// Common validation includes:
157/// * Sanity checks
158/// * Clock drifts
159/// * Signatures
160/// * Message inclusion (fees, sequences)
161/// * Parent related fields: base fee, weight, the state root
162/// * NB: This is where the messages in the *parent* tipset are executed.
163///
164/// Consensus specific validation should include:
165/// * Checking that the messages in the block correspond to the agreed upon
166///   total ordering
167/// * That the block is a deterministic derivative of the underlying consensus
168async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
169    state_manager: Arc<StateManager<DB>>,
170    block: Arc<Block>,
171) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
172    let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
173    trace!(
174        "Validating block: epoch = {}, weight = {}, key = {}",
175        block.header().epoch,
176        block.header().weight,
177        block.header().cid(),
178    );
179    let chain_store = state_manager.chain_store().clone();
180    let block_cid = block.cid();
181
182    // Check block validation cache in store
183    let is_validated = chain_store.is_block_validated(block_cid);
184    if is_validated {
185        return Ok(block);
186    }
187
188    let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
189
190    let header = block.header();
191
192    // Check to ensure all optional values exist
193    block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
194    block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
195
196    let base_tipset = chain_store
197        .chain_index()
198        .load_required_tipset(&header.parents)
199        // The parent tipset will always be there when calling validate_block
200        // as part of the sync_tipset_range flow because all of the headers in the range
201        // have been committed to the store. When validate_block is called from sync_tipset
202        // this guarantee does not exist, so we create a specific error to inform the caller
203        // not to add this block to the bad blocks cache.
204        .map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
205
206    // Retrieve lookback tipset for validation
207    let lookback_state = ChainStore::get_lookback_tipset_for_round(
208        state_manager.chain_store().chain_index().clone(),
209        state_manager.chain_config().clone(),
210        base_tipset.clone(),
211        block.header().epoch,
212    )
213    .map_err(|e| (*block_cid, e.into()))
214    .map(|(_, s)| Arc::new(s))?;
215
216    // Work address needed for async validations, so necessary
217    // to do sync to avoid duplication
218    let work_addr = state_manager
219        .get_miner_work_addr(*lookback_state, &header.miner_address)
220        .map_err(|e| (*block_cid, e.into()))?;
221
222    // Async validations
223    let mut validations = JoinSet::new();
224
225    // Check block messages
226    validations.spawn(check_block_messages(
227        Arc::clone(&state_manager),
228        Arc::clone(&block),
229        Arc::clone(&base_tipset),
230    ));
231
232    // Base fee check
233    let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
234    let v_base_tipset = Arc::clone(&base_tipset);
235    let v_block_store = state_manager.blockstore_owned();
236    let v_block = Arc::clone(&block);
237    validations.spawn_blocking(move || {
238        let base_fee = crate::chain::compute_base_fee(&v_block_store, &v_base_tipset, smoke_height)
239            .map_err(|e| {
240                TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
241            })?;
242        let parent_base_fee = &v_block.header.parent_base_fee;
243        if &base_fee != parent_base_fee {
244            return Err(TipsetSyncerError::Validation(format!(
245                "base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
246            )));
247        }
248        Ok(())
249    });
250
251    // Parent weight calculation check
252    let v_block_store = state_manager.blockstore_owned();
253    let v_base_tipset = Arc::clone(&base_tipset);
254    let weight = header.weight.clone();
255    validations.spawn_blocking(move || {
256        let calc_weight = fil_cns::weight(&v_block_store, &v_base_tipset).map_err(|e| {
257            TipsetSyncerError::Calculation(format!("Error calculating weight: {e}"))
258        })?;
259        if weight != calc_weight {
260            return Err(TipsetSyncerError::Validation(format!(
261                "Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
262            )));
263        }
264        Ok(())
265    });
266
267    // State root and receipt root validations
268    let v_state_manager = Arc::clone(&state_manager);
269    let v_base_tipset = Arc::clone(&base_tipset);
270    let v_block = Arc::clone(&block);
271    validations.spawn(async move {
272        let header = v_block.header();
273        let (state_root, receipt_root) = v_state_manager
274            .tipset_state(&v_base_tipset)
275            .await
276            .map_err(|e| {
277                TipsetSyncerError::Calculation(format!("Failed to calculate state: {e}"))
278            })?;
279
280        if state_root != header.state_root {
281            return Err(TipsetSyncerError::Validation(format!(
282                "Parent state root did not match computed state: {} (header), {} (computed)",
283                header.state_root, state_root,
284            )));
285        }
286
287        if receipt_root != header.message_receipts {
288            return Err(TipsetSyncerError::Validation(format!(
289                "Parent receipt root did not match computed root: {} (header), {} (computed)",
290                header.message_receipts, receipt_root
291            )));
292        }
293        Ok(())
294    });
295
296    // Block signature check
297    let v_block = block.clone();
298    validations.spawn_blocking(move || {
299        v_block.header().verify_signature_against(&work_addr)?;
300        Ok(())
301    });
302
303    let v_block = block.clone();
304    validations.spawn(async move {
305        consensus
306            .validate_block(state_manager, v_block)
307            .map_err(|errs| {
308                // NOTE: Concatenating errors here means the wrapper type of error
309                // never surfaces, yet we always pay the cost of the generic argument.
310                // But there's no reason `validate_block` couldn't return a list of all
311                // errors instead of a single one that has all the error messages,
312                // removing the caller's ability to distinguish between them.
313
314                TipsetSyncerError::concat(
315                    errs.into_iter_ne()
316                        .map(TipsetSyncerError::ConsensusError)
317                        .collect_vec(),
318                )
319            })
320            .await
321    });
322
323    // Collect the errors from the async validations
324    if let Err(errs) = collect_errs(validations).await {
325        return Err((*block_cid, TipsetSyncerError::concat(errs)));
326    }
327
328    chain_store.mark_block_as_validated(block_cid);
329
330    Ok(block)
331}
332
333/// Validate messages in a full block, relative to the parent tipset.
334///
335/// This includes:
336/// * signature checks
337/// * gas limits, and prices
338/// * account nonce values
339/// * the message root in the header
340///
341/// NB: This loads/computes the state resulting from the execution of the parent
342/// tipset.
343async fn check_block_messages<DB: Blockstore + Send + Sync + 'static>(
344    state_manager: Arc<StateManager<DB>>,
345    block: Arc<Block>,
346    base_tipset: Arc<Tipset>,
347) -> Result<(), TipsetSyncerError> {
348    let network_version = state_manager
349        .chain_config()
350        .network_version(block.header.epoch);
351    let eth_chain_id = state_manager.chain_config().eth_chain_id;
352
353    if let Some(sig) = &block.header().bls_aggregate {
354        // Do the initial loop here
355        // check block message and signatures in them
356        let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
357        let mut cids = Vec::with_capacity(block.bls_msgs().len());
358        let db = state_manager.blockstore_owned();
359        for m in block.bls_msgs() {
360            let pk = StateManager::get_bls_public_key(&db, &m.from, *base_tipset.parent_state())?;
361            pub_keys.push(pk);
362            cids.push(m.cid().to_bytes());
363        }
364
365        if !verify_bls_aggregate(
366            &cids.iter().map(|x| x.as_slice()).collect_vec(),
367            &pub_keys,
368            sig,
369        ) {
370            return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
371                format!("{sig:?}"),
372                format!("{cids:?}"),
373            ));
374        }
375    } else {
376        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
377    }
378
379    let price_list = price_list_by_network_version(network_version);
380    let mut sum_gas_limit = 0;
381
382    // Check messages for validity
383    let mut check_msg = |msg: &Message,
384                         account_sequences: &mut HashMap<Address, u64>,
385                         tree: &StateTree<DB>|
386     -> Result<(), anyhow::Error> {
387        // Phase 1: Syntactic validation
388        let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
389        valid_for_block_inclusion(msg, min_gas.total(), network_version)
390            .map_err(|e| anyhow::anyhow!("{}", e))?;
391        sum_gas_limit += msg.gas_limit;
392        if sum_gas_limit > BLOCK_GAS_LIMIT {
393            anyhow::bail!("block gas limit exceeded");
394        }
395
396        // Phase 2: (Partial) Semantic validation
397        // Send exists and is an account actor, and sequence is correct
398        let sequence: u64 = match account_sequences.get(&msg.from()) {
399            Some(sequence) => *sequence,
400            None => {
401                let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
402                    anyhow::anyhow!(
403                        "Failed to retrieve nonce for addr: Actor does not exist in state"
404                    )
405                })?;
406                let network_version = state_manager
407                    .chain_config()
408                    .network_version(block.header.epoch);
409                if !is_valid_for_sending(network_version, &actor) {
410                    anyhow::bail!("not valid for sending!");
411                }
412                actor.sequence
413            }
414        };
415
416        // Sequence equality check
417        if sequence != msg.sequence {
418            anyhow::bail!(
419                "Message has incorrect sequence (exp: {} got: {})",
420                sequence,
421                msg.sequence
422            );
423        }
424        account_sequences.insert(msg.from(), sequence + 1);
425        Ok(())
426    };
427
428    let mut account_sequences: HashMap<Address, u64> = HashMap::default();
429    let (state_root, _) = state_manager
430        .tipset_state(&base_tipset)
431        .await
432        .map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e}")))?;
433    let tree =
434        StateTree::new_from_root(state_manager.blockstore_owned(), &state_root).map_err(|e| {
435            TipsetSyncerError::Calculation(format!(
436                "Could not load from new state root in state manager: {e}"
437            ))
438        })?;
439
440    // Check validity for BLS messages
441    for (i, msg) in block.bls_msgs().iter().enumerate() {
442        check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
443            TipsetSyncerError::Validation(format!(
444                "Block had invalid BLS message at index {i}: {e}"
445            ))
446        })?;
447    }
448
449    // Check validity for SECP messages
450    for (i, msg) in block.secp_msgs().iter().enumerate() {
451        if msg.signature().signature_type() == SignatureType::Delegated
452            && !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
453        {
454            return Err(TipsetSyncerError::Validation(
455                "Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
456            ));
457        }
458        check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
459            TipsetSyncerError::Validation(format!(
460                "block had an invalid secp message at index {i}: {e}"
461            ))
462        })?;
463        // Resolve key address for signature verification
464        let key_addr = state_manager
465            .resolve_to_key_addr(&msg.from(), &base_tipset)
466            .await
467            .map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
468        // SecP256K1 Signature validation
469        msg.signature
470            .authenticate_msg(eth_chain_id, msg, &key_addr)
471            .map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
472    }
473
474    // Validate message root from header matches message root
475    let msg_root = TipsetValidator::compute_msg_root(
476        state_manager.blockstore(),
477        block.bls_msgs(),
478        block.secp_msgs(),
479    )
480    .map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
481    if block.header().messages != msg_root {
482        return Err(TipsetSyncerError::BlockMessageRootInvalid(
483            format!("{:?}", block.header().messages),
484            format!("{msg_root:?}"),
485        ));
486    }
487
488    Ok(())
489}
490
491/// Checks optional values in header.
492///
493/// It only looks for fields which are common to all consensus types.
494fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
495    if header.signature.is_none() {
496        return Err(TipsetSyncerError::BlockWithoutSignature);
497    }
498    if header.bls_aggregate.is_none() {
499        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
500    }
501    Ok(())
502}
503
504/// Check the clock drift.
505fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
506    let time_now = chrono::Utc::now().timestamp() as u64;
507    if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
508        return Err(TipsetSyncerError::TimeTravellingBlock(
509            time_now,
510            header.timestamp,
511        ));
512    } else if header.timestamp > time_now {
513        warn!(
514            "Got block from the future, but within clock drift threshold, {} > {}",
515            header.timestamp, time_now
516        );
517    }
518    Ok(())
519}