Skip to main content

forest/chain_sync/
tipset_syncer.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::chain_sync::BadBlockCache;
5use crate::db::DbImpl;
6use crate::networks::Height;
7use crate::prelude::*;
8use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
9use crate::shim::crypto::SignatureType;
10use crate::shim::message::Message;
11use crate::shim::{
12    address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
13    gas::price_list_by_network_version, state_tree::StateTree,
14};
15use crate::state_manager::ExecutedTipset;
16use crate::state_manager::{Error as StateManagerError, StateManager, utils::is_valid_for_sending};
17use crate::{
18    blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
19    fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
20};
21use crate::{
22    chain::{ChainStore, Error as ChainStoreError},
23    metrics::HistogramTimerExt,
24};
25use crate::{
26    eth::is_valid_eth_tx_for_sending,
27    message::{MessageRead as _, valid_for_block_inclusion},
28};
29use ahash::HashMap;
30use futures::TryFutureExt;
31use fvm_ipld_encoding::to_vec;
32use nunny::Vec as NonEmpty;
33use thiserror::Error;
34use tokio::task::JoinSet;
35use tracing::{trace, warn};
36
37use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
38
39#[derive(Debug, Error)]
40pub enum TipsetSyncerError {
41    #[error("Block must have a signature")]
42    BlockWithoutSignature,
43    #[error("Block without BLS aggregate signature")]
44    BlockWithoutBlsAggregate,
45    #[error("Block received from the future: now = {0}, block = {1}")]
46    TimeTravellingBlock(u64, u64),
47    #[error("Validation error: {0}")]
48    Validation(String),
49    #[error("Processing error: {0}")]
50    Calculation(String),
51    #[error("Chain store error: {0}")]
52    ChainStore(#[from] ChainStoreError),
53    #[error("StateManager error: {0}")]
54    StateManager(#[from] StateManagerError),
55    #[error("Block error: {0}")]
56    BlockError(#[from] ForestBlockError),
57    #[error("Querying tipsets from the network failed: {0}")]
58    NetworkTipsetQueryFailed(String),
59    #[error("BLS aggregate signature {0} was invalid for msgs {1}")]
60    BlsAggregateSignatureInvalid(String, String),
61    #[error("Message signature invalid: {0}")]
62    MessageSignatureInvalid(String),
63    #[error("Block message root does not match: expected {0}, computed {1}")]
64    BlockMessageRootInvalid(String, String),
65    #[error("Computing message root failed: {0}")]
66    ComputingMessageRoot(String),
67    #[error("Resolving address from message failed: {0}")]
68    ResolvingAddressFromMessage(String),
69    #[error("Loading tipset parent from the store failed: {0}")]
70    TipsetParentNotFound(ChainStoreError),
71    #[error("Consensus error: {0}")]
72    ConsensusError(FilecoinConsensusError),
73}
74
75impl From<tokio::task::JoinError> for TipsetSyncerError {
76    fn from(err: tokio::task::JoinError) -> Self {
77        TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
78    }
79}
80
81impl TipsetSyncerError {
82    /// Concatenate all validation error messages into one comma separated
83    /// version.
84    fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
85        let msg = errs.iter().map(|e| e.to_string()).collect_vec().join(", ");
86
87        TipsetSyncerError::Validation(msg)
88    }
89}
90
91/// Validates full blocks in the tipset in parallel (since the messages are not
92/// executed), adding the successful ones to the tipset tracker, and the failed
93/// ones to the bad block cache, depending on strategy. Any bad block fails
94/// validation.
95pub async fn validate_tipset(
96    state_manager: &StateManager,
97    full_tipset: FullTipset,
98    bad_block_cache: Option<BadBlockCache>,
99) -> Result<(), TipsetSyncerError> {
100    if full_tipset
101        .key()
102        .eq(state_manager.chain_store().genesis_tipset().key())
103    {
104        trace!("Skipping genesis tipset validation");
105        return Ok(());
106    }
107
108    let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
109
110    let epoch = full_tipset.epoch();
111    let parent_state = *full_tipset.parent_state();
112    let tipset_key = full_tipset.key();
113    trace!("Tipset keys: {tipset_key}");
114    let blocks = full_tipset.into_blocks();
115    let mut validations = JoinSet::new();
116    for b in blocks {
117        validations.spawn(validate_block(state_manager.shallow_clone(), Arc::new(b)));
118    }
119
120    while let Some(result) = validations.join_next().await {
121        match result? {
122            Ok(block) => {
123                state_manager
124                    .chain_store()
125                    .add_to_tipset_tracker(block.header());
126            }
127            Err((cid, why)) => {
128                warn!(
129                    "Validating block [CID = {cid}, PARENT_STATE = {parent_state}] in EPOCH = {epoch} failed: {why}",
130                );
131                match &why {
132                    TipsetSyncerError::TimeTravellingBlock(_, _) => {
133                        // Do not mark a block as bad for temporary errors.
134                        // See <https://github.com/filecoin-project/lotus/blob/v1.34.1/chain/sync.go#L602> in Lotus
135                    }
136                    _ => {
137                        // Do not mark block as bad if the parent state tree does not exist
138                        if StateTree::new_from_root(state_manager.db(), &parent_state).is_ok()
139                            && let Some(bad_block_cache) = bad_block_cache
140                        {
141                            bad_block_cache.push(cid);
142                        }
143                    }
144                };
145                return Err(why);
146            }
147        }
148    }
149    drop(timer);
150    Ok(())
151}
152
153/// Validate the block according to the rules specific to the consensus being
154/// used, and the common rules that pertain to the assumptions of the
155/// `ChainSync` protocol.
156///
157/// Returns the validated block if `Ok`.
158/// Returns the block CID (for marking bad) and `Error` if invalid (`Err`).
159///
160/// Common validation includes:
161/// * Sanity checks
162/// * Clock drifts
163/// * Signatures
164/// * Message inclusion (fees, sequences)
165/// * Parent related fields: base fee, weight, the state root
166/// * NB: This is where the messages in the *parent* tipset are executed.
167///
168/// Consensus specific validation should include:
169/// * Checking that the messages in the block correspond to the agreed upon
170///   total ordering
171/// * That the block is a deterministic derivative of the underlying consensus
172async fn validate_block(
173    state_manager: StateManager,
174    block: Arc<Block>,
175) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
176    let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
177    trace!(
178        "Validating block: epoch = {}, weight = {}, key = {}",
179        block.header().epoch,
180        block.header().weight,
181        block.header().cid(),
182    );
183    let chain_store = state_manager.chain_store().shallow_clone();
184    let block_cid = block.cid();
185
186    // Check block validation cache in store
187    let is_validated = chain_store.is_block_validated(block_cid);
188    if is_validated {
189        return Ok(block);
190    }
191
192    let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
193
194    let header = block.header();
195
196    // Check to ensure all optional values exist
197    block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
198    block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
199
200    let base_tipset = chain_store
201        .chain_index()
202        .load_required_tipset(&header.parents)
203        // The parent tipset will always be there when calling validate_block
204        // as part of the sync_tipset_range flow because all of the headers in the range
205        // have been committed to the store. When validate_block is called from sync_tipset
206        // this guarantee does not exist, so we create a specific error to inform the caller
207        // not to add this block to the bad blocks cache.
208        .map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
209
210    // Retrieve lookback tipset for validation
211    let lookback_state = ChainStore::get_lookback_tipset_for_round(
212        state_manager.chain_store().chain_index(),
213        state_manager.chain_config(),
214        &base_tipset,
215        block.header().epoch,
216    )
217    .map_err(|e| (*block_cid, e.into()))
218    .map(|(_, s)| Arc::new(s))?;
219
220    // Work address needed for async validations, so necessary
221    // to do sync to avoid duplication
222    let work_addr = state_manager
223        .get_miner_work_addr(*lookback_state, &header.miner_address)
224        .map_err(|e| (*block_cid, e.into()))?;
225
226    // Async validations
227    let mut validations = JoinSet::new();
228
229    // Check block messages
230    validations.spawn(check_block_messages(
231        state_manager.shallow_clone(),
232        block.shallow_clone(),
233        base_tipset.shallow_clone(),
234    ));
235
236    // Base fee check
237    validations.spawn_blocking({
238        let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
239        let firehorse_height = state_manager.chain_config().epoch(Height::FireHorse);
240        let base_tipset = base_tipset.shallow_clone();
241        let block_store = state_manager.db_owned();
242        let block = block.shallow_clone();
243        move || {
244            let base_fee = crate::chain::compute_base_fee(
245                &block_store,
246                &base_tipset,
247                smoke_height,
248                firehorse_height,
249            )
250            .map_err(|e| {
251                TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
252            })?;
253            let parent_base_fee = &block.header.parent_base_fee;
254            if &base_fee != parent_base_fee {
255                return Err(TipsetSyncerError::Validation(format!(
256                    "base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
257                )));
258            }
259            Ok(())
260        }
261    });
262
263    // Parent weight calculation check
264    validations.spawn_blocking({
265        let block_store = state_manager.db_owned();
266        let base_tipset = base_tipset.shallow_clone();
267        let weight = header.weight.clone();
268        move || {
269            let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
270                TipsetSyncerError::Calculation(format!("Error calculating weight: {e:#}"))
271            })?;
272            if weight != calc_weight {
273                return Err(TipsetSyncerError::Validation(format!(
274                    "Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
275                )));
276            }
277            Ok(())
278        }
279    });
280
281    // State root and receipt root validations
282    validations.spawn({
283        let state_manager = state_manager.shallow_clone();
284        let block = block.shallow_clone();
285        async move {
286            let header = block.header();
287            let ExecutedTipset {
288                state_root,
289                receipt_root,
290                ..
291            } = state_manager
292                .load_executed_tipset(&base_tipset)
293                .await
294                .map_err(|e| {
295                    TipsetSyncerError::Calculation(format!("Failed to calculate state: {e:#}"))
296                })?;
297
298            if state_root != header.state_root {
299                return Err(TipsetSyncerError::Validation(format!(
300                    "Parent state root did not match computed state: {} (header), {} (computed)",
301                    header.state_root, state_root,
302                )));
303            }
304
305            if receipt_root != header.message_receipts {
306                return Err(TipsetSyncerError::Validation(format!(
307                    "Parent receipt root did not match computed root: {} (header), {} (computed)",
308                    header.message_receipts, receipt_root
309                )));
310            }
311            Ok(())
312        }
313    });
314
315    // Block signature check
316    validations.spawn_blocking({
317        let block = block.shallow_clone();
318        move || {
319            block.header().verify_signature_against(&work_addr)?;
320            Ok(())
321        }
322    });
323
324    validations.spawn({
325        let block = block.shallow_clone();
326        async move {
327            consensus
328                .validate_block(state_manager, block)
329                .map_err(|errs| {
330                    // NOTE: Concatenating errors here means the wrapper type of error
331                    // never surfaces, yet we always pay the cost of the generic argument.
332                    // But there's no reason `validate_block` couldn't return a list of all
333                    // errors instead of a single one that has all the error messages,
334                    // removing the caller's ability to distinguish between them.
335
336                    TipsetSyncerError::concat(
337                        errs.into_iter_ne()
338                            .map(TipsetSyncerError::ConsensusError)
339                            .collect_vec(),
340                    )
341                })
342                .await
343        }
344    });
345
346    // Collect the errors from the async validations
347    if let Err(errs) = collect_errs(validations).await {
348        return Err((*block_cid, TipsetSyncerError::concat(errs)));
349    }
350
351    chain_store.mark_block_as_validated(block_cid);
352
353    Ok(block)
354}
355
356/// Validate messages in a full block, relative to the parent tipset.
357///
358/// This includes:
359/// * signature checks
360/// * gas limits, and prices
361/// * account nonce values
362/// * the message root in the header
363///
364/// NB: This loads/computes the state resulting from the execution of the parent
365/// tipset.
366async fn check_block_messages(
367    state_manager: StateManager,
368    block: Arc<Block>,
369    base_tipset: Tipset,
370) -> Result<(), TipsetSyncerError> {
371    let network_version = state_manager
372        .chain_config()
373        .network_version(block.header.epoch);
374    let eth_chain_id = state_manager.chain_config().eth_chain_id;
375
376    if let Some(sig) = &block.header().bls_aggregate {
377        // Do the initial loop here
378        // check block message and signatures in them
379        let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
380        let mut cids = Vec::with_capacity(block.bls_msgs().len());
381        let db = state_manager.db();
382        for m in block.bls_msgs() {
383            let pk = StateManager::get_bls_public_key(db, m.from, *base_tipset.parent_state())?;
384            pub_keys.push(pk);
385            cids.push(m.cid().to_bytes());
386        }
387
388        if !verify_bls_aggregate(
389            &cids.iter().map(|x| x.as_slice()).collect_vec(),
390            &pub_keys,
391            sig,
392        ) {
393            return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
394                format!("{sig:?}"),
395                format!("{cids:?}"),
396            ));
397        }
398    } else {
399        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
400    }
401
402    let price_list = price_list_by_network_version(network_version);
403    let mut sum_gas_limit = 0;
404
405    // Check messages for validity
406    let mut check_msg = |msg: &Message,
407                         account_sequences: &mut HashMap<Address, u64>,
408                         tree: &StateTree<DbImpl>|
409     -> anyhow::Result<()> {
410        // Phase 1: Syntactic validation
411        let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
412        valid_for_block_inclusion(msg, min_gas.total(), network_version)
413            .map_err(|e| anyhow::anyhow!("{}", e))?;
414        sum_gas_limit += msg.gas_limit;
415        if sum_gas_limit > BLOCK_GAS_LIMIT {
416            anyhow::bail!("block gas limit exceeded");
417        }
418
419        // Phase 2: (Partial) Semantic validation
420        // Send exists and is an account actor, and sequence is correct
421        let sequence: u64 = match account_sequences.get(&msg.from()) {
422            Some(sequence) => *sequence,
423            None => {
424                let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
425                    anyhow::anyhow!(
426                        "Failed to retrieve nonce for addr: Actor does not exist in state"
427                    )
428                })?;
429                let network_version = state_manager
430                    .chain_config()
431                    .network_version(block.header.epoch);
432                if !is_valid_for_sending(network_version, &actor) {
433                    anyhow::bail!("not valid for sending!");
434                }
435                actor.sequence
436            }
437        };
438
439        // Sequence equality check
440        if sequence != msg.sequence {
441            anyhow::bail!(
442                "Message has incorrect sequence (exp: {} got: {})",
443                sequence,
444                msg.sequence
445            );
446        }
447        account_sequences.insert(msg.from(), sequence + 1);
448        Ok(())
449    };
450
451    let mut account_sequences: HashMap<Address, u64> = HashMap::default();
452    let ExecutedTipset { state_root, .. } = state_manager
453        .load_executed_tipset(&base_tipset)
454        .await
455        .map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e:#}")))?;
456    let tree = StateTree::new_from_root(state_manager.db(), &state_root).map_err(|e| {
457        TipsetSyncerError::Calculation(format!(
458            "Could not load from new state root in state manager: {e:#}"
459        ))
460    })?;
461
462    // Check validity for BLS messages
463    for (i, msg) in block.bls_msgs().iter().enumerate() {
464        check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
465            TipsetSyncerError::Validation(format!(
466                "Block had invalid BLS message at index {i}: {e:#}"
467            ))
468        })?;
469    }
470
471    // Check validity for SECP messages
472    for (i, msg) in block.secp_msgs().iter().enumerate() {
473        if msg.signature().signature_type() == SignatureType::Delegated
474            && !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
475        {
476            return Err(TipsetSyncerError::Validation(
477                "Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
478            ));
479        }
480        check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
481            TipsetSyncerError::Validation(format!(
482                "block had an invalid secp message at index {i}: {e:#}"
483            ))
484        })?;
485        // Resolve key address for signature verification
486        let key_addr = state_manager
487            .resolve_to_deterministic_address(msg.from(), &base_tipset)
488            .await
489            .map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
490        // SecP256K1 Signature validation
491        msg.signature
492            .authenticate_msg(eth_chain_id, msg, &key_addr)
493            .map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
494    }
495
496    // Validate message root from header matches message root
497    let msg_root =
498        TipsetValidator::compute_msg_root(state_manager.db(), block.bls_msgs(), block.secp_msgs())
499            .map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
500    if block.header().messages != msg_root {
501        return Err(TipsetSyncerError::BlockMessageRootInvalid(
502            format!("{:?}", block.header().messages),
503            format!("{msg_root:?}"),
504        ));
505    }
506
507    Ok(())
508}
509
510/// Checks optional values in header.
511///
512/// It only looks for fields which are common to all consensus types.
513fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
514    if header.signature.is_none() {
515        return Err(TipsetSyncerError::BlockWithoutSignature);
516    }
517    if header.bls_aggregate.is_none() {
518        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
519    }
520    Ok(())
521}
522
523/// Check the clock drift.
524fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
525    let time_now = chrono::Utc::now().timestamp() as u64;
526    if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
527        return Err(TipsetSyncerError::TimeTravellingBlock(
528            time_now,
529            header.timestamp,
530        ));
531    } else if header.timestamp > time_now {
532        warn!(
533            "Got block from the future, but within clock drift threshold, {} > {}",
534            header.timestamp, time_now
535        );
536    }
537    Ok(())
538}