Skip to main content

forest/chain_sync/
tipset_syncer.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::chain_sync::BadBlockCache;
5use crate::db::DbImpl;
6use crate::networks::Height;
7use crate::prelude::*;
8use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
9use crate::shim::crypto::SignatureType;
10use crate::shim::message::Message;
11use crate::shim::{
12    address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
13    gas::price_list_by_network_version, state_tree::StateTree,
14};
15use crate::state_manager::ExecutedTipset;
16use crate::state_manager::{Error as StateManagerError, StateManager, utils::is_valid_for_sending};
17use crate::{
18    blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
19    fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
20};
21use crate::{
22    chain::{ChainStore, Error as ChainStoreError},
23    metrics::HistogramTimerExt,
24};
25use crate::{
26    eth::is_valid_eth_tx_for_sending,
27    message::{MessageRead as _, valid_for_block_inclusion},
28};
29use ahash::HashMap;
30use futures::TryFutureExt;
31use fvm_ipld_encoding::to_vec;
32use nunny::Vec as NonEmpty;
33use thiserror::Error;
34use tokio::task::JoinSet;
35use tracing::{trace, warn};
36
37use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
38
39#[derive(Debug, Error)]
40pub enum TipsetSyncerError {
41    #[error("Block must have a signature")]
42    BlockWithoutSignature,
43    #[error("Block without BLS aggregate signature")]
44    BlockWithoutBlsAggregate,
45    #[error("Block received from the future: now = {0}, block = {1}")]
46    TimeTravellingBlock(u64, u64),
47    #[error("Validation error: {0}")]
48    Validation(String),
49    #[error("Processing error: {0}")]
50    Calculation(String),
51    #[error("Chain store error: {0}")]
52    ChainStore(#[from] ChainStoreError),
53    #[error("StateManager error: {0}")]
54    StateManager(#[from] StateManagerError),
55    #[error("Block error: {0}")]
56    BlockError(#[from] ForestBlockError),
57    #[error("Querying tipsets from the network failed: {0}")]
58    NetworkTipsetQueryFailed(String),
59    #[error("BLS aggregate signature {0} was invalid for msgs {1}")]
60    BlsAggregateSignatureInvalid(String, String),
61    #[error("Message signature invalid: {0}")]
62    MessageSignatureInvalid(String),
63    #[error("Block message root does not match: expected {0}, computed {1}")]
64    BlockMessageRootInvalid(String, String),
65    #[error("Computing message root failed: {0}")]
66    ComputingMessageRoot(String),
67    #[error("Resolving address from message failed: {0}")]
68    ResolvingAddressFromMessage(String),
69    #[error("Loading tipset parent from the store failed: {0}")]
70    TipsetParentNotFound(ChainStoreError),
71    #[error("Consensus error: {0}")]
72    ConsensusError(FilecoinConsensusError),
73}
74
75impl From<tokio::task::JoinError> for TipsetSyncerError {
76    fn from(err: tokio::task::JoinError) -> Self {
77        TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
78    }
79}
80
81impl TipsetSyncerError {
82    /// Concatenate all validation error messages into one comma separated
83    /// version.
84    fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
85        let msg = errs.iter().map(|e| e.to_string()).collect_vec().join(", ");
86
87        TipsetSyncerError::Validation(msg)
88    }
89}
90
91/// Validates full blocks in the tipset in parallel (since the messages are not
92/// executed), adding the successful ones to the tipset tracker, and the failed
93/// ones to the bad block cache, depending on strategy. Any bad block fails
94/// validation.
95pub async fn validate_tipset(
96    state_manager: &StateManager,
97    full_tipset: FullTipset,
98    bad_block_cache: Option<BadBlockCache>,
99) -> Result<(), TipsetSyncerError> {
100    if full_tipset
101        .key()
102        .eq(state_manager.chain_store().genesis_tipset().key())
103    {
104        trace!("Skipping genesis tipset validation");
105        return Ok(());
106    }
107
108    let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
109
110    let epoch = full_tipset.epoch();
111    let parent_state = *full_tipset.parent_state();
112    let tipset_key = full_tipset.key();
113    trace!("Tipset keys: {tipset_key}");
114    let blocks = full_tipset.into_blocks();
115    let mut validations = JoinSet::new();
116    for b in blocks {
117        validations.spawn(validate_block(state_manager.shallow_clone(), Arc::new(b)));
118    }
119
120    while let Some(result) = validations.join_next().await {
121        match result? {
122            Ok(block) => {
123                state_manager
124                    .chain_store()
125                    .add_to_tipset_tracker(block.header());
126            }
127            Err((cid, why)) => {
128                warn!(
129                    "Validating block [CID = {cid}, PARENT_STATE = {parent_state}] in EPOCH = {epoch} failed: {why}",
130                );
131                match &why {
132                    TipsetSyncerError::TimeTravellingBlock(_, _) => {
133                        // Do not mark a block as bad for temporary errors.
134                        // See <https://github.com/filecoin-project/lotus/blob/v1.34.1/chain/sync.go#L602> in Lotus
135                    }
136                    _ => {
137                        // Do not mark block as bad if the parent state tree does not exist
138                        if StateTree::new_from_root(state_manager.db(), &parent_state).is_ok()
139                            && let Some(bad_block_cache) = bad_block_cache
140                        {
141                            bad_block_cache.push(cid);
142                        }
143                    }
144                };
145                return Err(why);
146            }
147        }
148    }
149    drop(timer);
150    Ok(())
151}
152
153/// Validate the block according to the rules specific to the consensus being
154/// used, and the common rules that pertain to the assumptions of the
155/// `ChainSync` protocol.
156///
157/// Returns the validated block if `Ok`.
158/// Returns the block CID (for marking bad) and `Error` if invalid (`Err`).
159///
160/// Common validation includes:
161/// * Sanity checks
162/// * Clock drifts
163/// * Signatures
164/// * Message inclusion (fees, sequences)
165/// * Parent related fields: base fee, weight, the state root
166/// * NB: This is where the messages in the *parent* tipset are executed.
167///
168/// Consensus specific validation should include:
169/// * Checking that the messages in the block correspond to the agreed upon
170///   total ordering
171/// * That the block is a deterministic derivative of the underlying consensus
172async fn validate_block(
173    state_manager: StateManager,
174    block: Arc<Block>,
175) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
176    let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
177    trace!(
178        "Validating block: epoch = {}, weight = {}, key = {}",
179        block.header().epoch,
180        block.header().weight,
181        block.header().cid(),
182    );
183    let chain_store = state_manager.chain_store().shallow_clone();
184    let block_cid = block.cid();
185
186    // Check block validation cache in store
187    let is_validated = chain_store.is_block_validated(block_cid);
188    if is_validated {
189        return Ok(block);
190    }
191
192    let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
193
194    let header = block.header();
195
196    // Check to ensure all optional values exist
197    block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
198    block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
199
200    let base_tipset = chain_store
201        .chain_index()
202        .load_required_tipset(&header.parents)
203        // The parent tipset will always be there when calling validate_block
204        // as part of the sync_tipset_range flow because all of the headers in the range
205        // have been committed to the store. When validate_block is called from sync_tipset
206        // this guarantee does not exist, so we create a specific error to inform the caller
207        // not to add this block to the bad blocks cache.
208        .map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
209
210    // Retrieve lookback tipset for validation
211    let lookback_state = ChainStore::get_lookback_tipset_for_round(
212        state_manager.chain_store().chain_index().shallow_clone(),
213        state_manager.chain_config().shallow_clone(),
214        base_tipset.shallow_clone(),
215        block.header().epoch,
216    )
217    .await
218    .map_err(|e| (*block_cid, e.into()))
219    .map(|(_, s)| Arc::new(s))?;
220
221    // Work address needed for async validations, so necessary
222    // to do sync to avoid duplication
223    let work_addr = state_manager
224        .get_miner_work_addr(*lookback_state, &header.miner_address)
225        .map_err(|e| (*block_cid, e.into()))?;
226
227    // Async validations
228    let mut validations = JoinSet::new();
229
230    // Check block messages
231    validations.spawn(check_block_messages(
232        state_manager.shallow_clone(),
233        block.shallow_clone(),
234        base_tipset.shallow_clone(),
235    ));
236
237    // Base fee check
238    validations.spawn_blocking({
239        let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
240        let firehorse_height = state_manager.chain_config().epoch(Height::FireHorse);
241        let base_tipset = base_tipset.shallow_clone();
242        let block_store = state_manager.db_owned();
243        let block = block.shallow_clone();
244        move || {
245            let base_fee = crate::chain::compute_base_fee(
246                &block_store,
247                &base_tipset,
248                smoke_height,
249                firehorse_height,
250            )
251            .map_err(|e| {
252                TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
253            })?;
254            let parent_base_fee = &block.header.parent_base_fee;
255            if &base_fee != parent_base_fee {
256                return Err(TipsetSyncerError::Validation(format!(
257                    "base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
258                )));
259            }
260            Ok(())
261        }
262    });
263
264    // Parent weight calculation check
265    validations.spawn_blocking({
266        let block_store = state_manager.db_owned();
267        let base_tipset = base_tipset.shallow_clone();
268        let weight = header.weight.clone();
269        move || {
270            let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
271                TipsetSyncerError::Calculation(format!("Error calculating weight: {e:#}"))
272            })?;
273            if weight != calc_weight {
274                return Err(TipsetSyncerError::Validation(format!(
275                    "Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
276                )));
277            }
278            Ok(())
279        }
280    });
281
282    // State root and receipt root validations
283    validations.spawn({
284        let state_manager = state_manager.shallow_clone();
285        let block = block.shallow_clone();
286        async move {
287            let header = block.header();
288            let ExecutedTipset {
289                state_root,
290                receipt_root,
291                ..
292            } = state_manager
293                .load_executed_tipset(&base_tipset)
294                .await
295                .map_err(|e| {
296                    TipsetSyncerError::Calculation(format!("Failed to calculate state: {e:#}"))
297                })?;
298
299            if state_root != header.state_root {
300                return Err(TipsetSyncerError::Validation(format!(
301                    "Parent state root did not match computed state: {} (header), {} (computed)",
302                    header.state_root, state_root,
303                )));
304            }
305
306            if receipt_root != header.message_receipts {
307                return Err(TipsetSyncerError::Validation(format!(
308                    "Parent receipt root did not match computed root: {} (header), {} (computed)",
309                    header.message_receipts, receipt_root
310                )));
311            }
312            Ok(())
313        }
314    });
315
316    // Block signature check
317    validations.spawn_blocking({
318        let block = block.shallow_clone();
319        move || {
320            block.header().verify_signature_against(&work_addr)?;
321            Ok(())
322        }
323    });
324
325    validations.spawn({
326        let block = block.shallow_clone();
327        async move {
328            consensus
329                .validate_block(state_manager, block)
330                .map_err(|errs| {
331                    // NOTE: Concatenating errors here means the wrapper type of error
332                    // never surfaces, yet we always pay the cost of the generic argument.
333                    // But there's no reason `validate_block` couldn't return a list of all
334                    // errors instead of a single one that has all the error messages,
335                    // removing the caller's ability to distinguish between them.
336
337                    TipsetSyncerError::concat(
338                        errs.into_iter_ne()
339                            .map(TipsetSyncerError::ConsensusError)
340                            .collect_vec(),
341                    )
342                })
343                .await
344        }
345    });
346
347    // Collect the errors from the async validations
348    if let Err(errs) = collect_errs(validations).await {
349        return Err((*block_cid, TipsetSyncerError::concat(errs)));
350    }
351
352    chain_store.mark_block_as_validated(block_cid);
353
354    Ok(block)
355}
356
357/// Validate messages in a full block, relative to the parent tipset.
358///
359/// This includes:
360/// * signature checks
361/// * gas limits, and prices
362/// * account nonce values
363/// * the message root in the header
364///
365/// NB: This loads/computes the state resulting from the execution of the parent
366/// tipset.
367async fn check_block_messages(
368    state_manager: StateManager,
369    block: Arc<Block>,
370    base_tipset: Tipset,
371) -> Result<(), TipsetSyncerError> {
372    let network_version = state_manager
373        .chain_config()
374        .network_version(block.header.epoch);
375    let eth_chain_id = state_manager.chain_config().eth_chain_id;
376
377    if let Some(sig) = &block.header().bls_aggregate {
378        // Do the initial loop here
379        // check block message and signatures in them
380        let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
381        let mut cids = Vec::with_capacity(block.bls_msgs().len());
382        let db = state_manager.db();
383        for m in block.bls_msgs() {
384            let pk = StateManager::get_bls_public_key(db, m.from, *base_tipset.parent_state())?;
385            pub_keys.push(pk);
386            cids.push(m.cid().to_bytes());
387        }
388
389        if !verify_bls_aggregate(
390            &cids.iter().map(|x| x.as_slice()).collect_vec(),
391            &pub_keys,
392            sig,
393        ) {
394            return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
395                format!("{sig:?}"),
396                format!("{cids:?}"),
397            ));
398        }
399    } else {
400        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
401    }
402
403    let price_list = price_list_by_network_version(network_version);
404    let mut sum_gas_limit = 0;
405
406    // Check messages for validity
407    let mut check_msg = |msg: &Message,
408                         account_sequences: &mut HashMap<Address, u64>,
409                         tree: &StateTree<DbImpl>|
410     -> anyhow::Result<()> {
411        // Phase 1: Syntactic validation
412        let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
413        valid_for_block_inclusion(msg, min_gas.total(), network_version)
414            .map_err(|e| anyhow::anyhow!("{}", e))?;
415        sum_gas_limit += msg.gas_limit;
416        if sum_gas_limit > BLOCK_GAS_LIMIT {
417            anyhow::bail!("block gas limit exceeded");
418        }
419
420        // Phase 2: (Partial) Semantic validation
421        // Send exists and is an account actor, and sequence is correct
422        let sequence: u64 = match account_sequences.get(&msg.from()) {
423            Some(sequence) => *sequence,
424            None => {
425                let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
426                    anyhow::anyhow!(
427                        "Failed to retrieve nonce for addr: Actor does not exist in state"
428                    )
429                })?;
430                let network_version = state_manager
431                    .chain_config()
432                    .network_version(block.header.epoch);
433                if !is_valid_for_sending(network_version, &actor) {
434                    anyhow::bail!("not valid for sending!");
435                }
436                actor.sequence
437            }
438        };
439
440        // Sequence equality check
441        if sequence != msg.sequence {
442            anyhow::bail!(
443                "Message has incorrect sequence (exp: {} got: {})",
444                sequence,
445                msg.sequence
446            );
447        }
448        account_sequences.insert(msg.from(), sequence + 1);
449        Ok(())
450    };
451
452    let mut account_sequences: HashMap<Address, u64> = HashMap::default();
453    let ExecutedTipset { state_root, .. } = state_manager
454        .load_executed_tipset(&base_tipset)
455        .await
456        .map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e:#}")))?;
457    let tree = StateTree::new_from_root(state_manager.db(), &state_root).map_err(|e| {
458        TipsetSyncerError::Calculation(format!(
459            "Could not load from new state root in state manager: {e:#}"
460        ))
461    })?;
462
463    // Check validity for BLS messages
464    for (i, msg) in block.bls_msgs().iter().enumerate() {
465        check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
466            TipsetSyncerError::Validation(format!(
467                "Block had invalid BLS message at index {i}: {e:#}"
468            ))
469        })?;
470    }
471
472    // Check validity for SECP messages
473    for (i, msg) in block.secp_msgs().iter().enumerate() {
474        if msg.signature().signature_type() == SignatureType::Delegated
475            && !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
476        {
477            return Err(TipsetSyncerError::Validation(
478                "Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
479            ));
480        }
481        check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
482            TipsetSyncerError::Validation(format!(
483                "block had an invalid secp message at index {i}: {e:#}"
484            ))
485        })?;
486        // Resolve key address for signature verification
487        let key_addr = state_manager
488            .resolve_to_deterministic_address(msg.from(), &base_tipset)
489            .await
490            .map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
491        // SecP256K1 Signature validation
492        msg.signature
493            .authenticate_msg(eth_chain_id, msg, &key_addr)
494            .map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
495    }
496
497    // Validate message root from header matches message root
498    let msg_root =
499        TipsetValidator::compute_msg_root(state_manager.db(), block.bls_msgs(), block.secp_msgs())
500            .map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
501    if block.header().messages != msg_root {
502        return Err(TipsetSyncerError::BlockMessageRootInvalid(
503            format!("{:?}", block.header().messages),
504            format!("{msg_root:?}"),
505        ));
506    }
507
508    Ok(())
509}
510
511/// Checks optional values in header.
512///
513/// It only looks for fields which are common to all consensus types.
514fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
515    if header.signature.is_none() {
516        return Err(TipsetSyncerError::BlockWithoutSignature);
517    }
518    if header.bls_aggregate.is_none() {
519        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
520    }
521    Ok(())
522}
523
524/// Check the clock drift.
525fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
526    let time_now = chrono::Utc::now().timestamp() as u64;
527    if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
528        return Err(TipsetSyncerError::TimeTravellingBlock(
529            time_now,
530            header.timestamp,
531        ));
532    } else if header.timestamp > time_now {
533        warn!(
534            "Got block from the future, but within clock drift threshold, {} > {}",
535            header.timestamp, time_now
536        );
537    }
538    Ok(())
539}