Skip to main content

forest/chain_sync/
tipset_syncer.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::sync::Arc;
5
6use crate::chain_sync::BadBlockCache;
7use crate::networks::Height;
8use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
9use crate::shim::crypto::SignatureType;
10use crate::shim::{
11    address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
12    gas::price_list_by_network_version, message::Message, state_tree::StateTree,
13};
14use crate::state_manager::ExecutedTipset;
15use crate::state_manager::{Error as StateManagerError, StateManager, utils::is_valid_for_sending};
16use crate::utils::ShallowClone as _;
17use crate::{
18    blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
19    fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
20};
21use crate::{
22    chain::{ChainStore, Error as ChainStoreError},
23    metrics::HistogramTimerExt,
24};
25use crate::{
26    eth::is_valid_eth_tx_for_sending,
27    message::{MessageRead as _, valid_for_block_inclusion},
28};
29use ahash::HashMap;
30use cid::Cid;
31use futures::TryFutureExt;
32use fvm_ipld_blockstore::Blockstore;
33use fvm_ipld_encoding::to_vec;
34use itertools::Itertools;
35use nunny::Vec as NonEmpty;
36use thiserror::Error;
37use tokio::task::JoinSet;
38use tracing::{trace, warn};
39
40use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
41
42#[derive(Debug, Error)]
43pub enum TipsetSyncerError {
44    #[error("Block must have a signature")]
45    BlockWithoutSignature,
46    #[error("Block without BLS aggregate signature")]
47    BlockWithoutBlsAggregate,
48    #[error("Block received from the future: now = {0}, block = {1}")]
49    TimeTravellingBlock(u64, u64),
50    #[error("Validation error: {0}")]
51    Validation(String),
52    #[error("Processing error: {0}")]
53    Calculation(String),
54    #[error("Chain store error: {0}")]
55    ChainStore(#[from] ChainStoreError),
56    #[error("StateManager error: {0}")]
57    StateManager(#[from] StateManagerError),
58    #[error("Block error: {0}")]
59    BlockError(#[from] ForestBlockError),
60    #[error("Querying tipsets from the network failed: {0}")]
61    NetworkTipsetQueryFailed(String),
62    #[error("BLS aggregate signature {0} was invalid for msgs {1}")]
63    BlsAggregateSignatureInvalid(String, String),
64    #[error("Message signature invalid: {0}")]
65    MessageSignatureInvalid(String),
66    #[error("Block message root does not match: expected {0}, computed {1}")]
67    BlockMessageRootInvalid(String, String),
68    #[error("Computing message root failed: {0}")]
69    ComputingMessageRoot(String),
70    #[error("Resolving address from message failed: {0}")]
71    ResolvingAddressFromMessage(String),
72    #[error("Loading tipset parent from the store failed: {0}")]
73    TipsetParentNotFound(ChainStoreError),
74    #[error("Consensus error: {0}")]
75    ConsensusError(FilecoinConsensusError),
76}
77
78impl From<tokio::task::JoinError> for TipsetSyncerError {
79    fn from(err: tokio::task::JoinError) -> Self {
80        TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
81    }
82}
83
84impl TipsetSyncerError {
85    /// Concatenate all validation error messages into one comma separated
86    /// version.
87    fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
88        let msg = errs.iter().map(|e| e.to_string()).collect_vec().join(", ");
89
90        TipsetSyncerError::Validation(msg)
91    }
92}
93
94/// Validates full blocks in the tipset in parallel (since the messages are not
95/// executed), adding the successful ones to the tipset tracker, and the failed
96/// ones to the bad block cache, depending on strategy. Any bad block fails
97/// validation.
98pub async fn validate_tipset<DB: Blockstore + Send + Sync + 'static>(
99    state_manager: &Arc<StateManager<DB>>,
100    full_tipset: FullTipset,
101    bad_block_cache: Option<Arc<BadBlockCache>>,
102) -> Result<(), TipsetSyncerError> {
103    if full_tipset
104        .key()
105        .eq(state_manager.chain_store().genesis_tipset().key())
106    {
107        trace!("Skipping genesis tipset validation");
108        return Ok(());
109    }
110
111    let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
112
113    let epoch = full_tipset.epoch();
114    let parent_state = *full_tipset.parent_state();
115    let tipset_key = full_tipset.key();
116    trace!("Tipset keys: {tipset_key}");
117    let blocks = full_tipset.into_blocks();
118    let mut validations = JoinSet::new();
119    for b in blocks {
120        validations.spawn(validate_block(state_manager.clone(), Arc::new(b)));
121    }
122
123    while let Some(result) = validations.join_next().await {
124        match result? {
125            Ok(block) => {
126                state_manager
127                    .chain_store()
128                    .add_to_tipset_tracker(block.header());
129            }
130            Err((cid, why)) => {
131                warn!(
132                    "Validating block [CID = {cid}, PARENT_STATE = {parent_state}] in EPOCH = {epoch} failed: {why}",
133                );
134                match &why {
135                    TipsetSyncerError::TimeTravellingBlock(_, _) => {
136                        // Do not mark a block as bad for temporary errors.
137                        // See <https://github.com/filecoin-project/lotus/blob/v1.34.1/chain/sync.go#L602> in Lotus
138                    }
139                    _ => {
140                        // Do not mark block as bad if the parent state tree does not exist
141                        if StateTree::new_from_root(state_manager.blockstore_owned(), &parent_state)
142                            .is_ok()
143                            && let Some(bad_block_cache) = bad_block_cache
144                        {
145                            bad_block_cache.push(cid);
146                        }
147                    }
148                };
149                return Err(why);
150            }
151        }
152    }
153    drop(timer);
154    Ok(())
155}
156
157/// Validate the block according to the rules specific to the consensus being
158/// used, and the common rules that pertain to the assumptions of the
159/// `ChainSync` protocol.
160///
161/// Returns the validated block if `Ok`.
162/// Returns the block CID (for marking bad) and `Error` if invalid (`Err`).
163///
164/// Common validation includes:
165/// * Sanity checks
166/// * Clock drifts
167/// * Signatures
168/// * Message inclusion (fees, sequences)
169/// * Parent related fields: base fee, weight, the state root
170/// * NB: This is where the messages in the *parent* tipset are executed.
171///
172/// Consensus specific validation should include:
173/// * Checking that the messages in the block correspond to the agreed upon
174///   total ordering
175/// * That the block is a deterministic derivative of the underlying consensus
176async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
177    state_manager: Arc<StateManager<DB>>,
178    block: Arc<Block>,
179) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
180    let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
181    trace!(
182        "Validating block: epoch = {}, weight = {}, key = {}",
183        block.header().epoch,
184        block.header().weight,
185        block.header().cid(),
186    );
187    let chain_store = state_manager.chain_store().clone();
188    let block_cid = block.cid();
189
190    // Check block validation cache in store
191    let is_validated = chain_store.is_block_validated(block_cid);
192    if is_validated {
193        return Ok(block);
194    }
195
196    let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
197
198    let header = block.header();
199
200    // Check to ensure all optional values exist
201    block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
202    block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
203
204    let base_tipset = chain_store
205        .chain_index()
206        .load_required_tipset(&header.parents)
207        // The parent tipset will always be there when calling validate_block
208        // as part of the sync_tipset_range flow because all of the headers in the range
209        // have been committed to the store. When validate_block is called from sync_tipset
210        // this guarantee does not exist, so we create a specific error to inform the caller
211        // not to add this block to the bad blocks cache.
212        .map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
213
214    // Retrieve lookback tipset for validation
215    let lookback_state = ChainStore::get_lookback_tipset_for_round(
216        state_manager.chain_store().chain_index(),
217        state_manager.chain_config(),
218        &base_tipset,
219        block.header().epoch,
220    )
221    .map_err(|e| (*block_cid, e.into()))
222    .map(|(_, s)| Arc::new(s))?;
223
224    // Work address needed for async validations, so necessary
225    // to do sync to avoid duplication
226    let work_addr = state_manager
227        .get_miner_work_addr(*lookback_state, &header.miner_address)
228        .map_err(|e| (*block_cid, e.into()))?;
229
230    // Async validations
231    let mut validations = JoinSet::new();
232
233    // Check block messages
234    validations.spawn(check_block_messages(
235        state_manager.shallow_clone(),
236        block.shallow_clone(),
237        base_tipset.shallow_clone(),
238    ));
239
240    // Base fee check
241    validations.spawn_blocking({
242        let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
243        let firehorse_height = state_manager.chain_config().epoch(Height::FireHorse);
244        let base_tipset = base_tipset.shallow_clone();
245        let block_store = state_manager.blockstore_owned();
246        let block = block.shallow_clone();
247        move || {
248            let base_fee = crate::chain::compute_base_fee(
249                &block_store,
250                &base_tipset,
251                smoke_height,
252                firehorse_height,
253            )
254            .map_err(|e| {
255                TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
256            })?;
257            let parent_base_fee = &block.header.parent_base_fee;
258            if &base_fee != parent_base_fee {
259                return Err(TipsetSyncerError::Validation(format!(
260                    "base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
261                )));
262            }
263            Ok(())
264        }
265    });
266
267    // Parent weight calculation check
268    validations.spawn_blocking({
269        let block_store = state_manager.blockstore_owned();
270        let base_tipset = base_tipset.shallow_clone();
271        let weight = header.weight.clone();
272        move || {
273            let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
274                TipsetSyncerError::Calculation(format!("Error calculating weight: {e:#}"))
275            })?;
276            if weight != calc_weight {
277                return Err(TipsetSyncerError::Validation(format!(
278                    "Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
279                )));
280            }
281            Ok(())
282        }
283    });
284
285    // State root and receipt root validations
286    validations.spawn({
287        let state_manager = state_manager.clone();
288        let block = block.clone();
289        async move {
290            let header = block.header();
291            let ExecutedTipset {
292                state_root,
293                receipt_root,
294                ..
295            } = state_manager
296                .load_executed_tipset(&base_tipset)
297                .await
298                .map_err(|e| {
299                    TipsetSyncerError::Calculation(format!("Failed to calculate state: {e:#}"))
300                })?;
301
302            if state_root != header.state_root {
303                return Err(TipsetSyncerError::Validation(format!(
304                    "Parent state root did not match computed state: {} (header), {} (computed)",
305                    header.state_root, state_root,
306                )));
307            }
308
309            if receipt_root != header.message_receipts {
310                return Err(TipsetSyncerError::Validation(format!(
311                    "Parent receipt root did not match computed root: {} (header), {} (computed)",
312                    header.message_receipts, receipt_root
313                )));
314            }
315            Ok(())
316        }
317    });
318
319    // Block signature check
320    validations.spawn_blocking({
321        let block = block.clone();
322        move || {
323            block.header().verify_signature_against(&work_addr)?;
324            Ok(())
325        }
326    });
327
328    validations.spawn({
329        let block = block.clone();
330        async move {
331            consensus
332                .validate_block(state_manager, block)
333                .map_err(|errs| {
334                    // NOTE: Concatenating errors here means the wrapper type of error
335                    // never surfaces, yet we always pay the cost of the generic argument.
336                    // But there's no reason `validate_block` couldn't return a list of all
337                    // errors instead of a single one that has all the error messages,
338                    // removing the caller's ability to distinguish between them.
339
340                    TipsetSyncerError::concat(
341                        errs.into_iter_ne()
342                            .map(TipsetSyncerError::ConsensusError)
343                            .collect_vec(),
344                    )
345                })
346                .await
347        }
348    });
349
350    // Collect the errors from the async validations
351    if let Err(errs) = collect_errs(validations).await {
352        return Err((*block_cid, TipsetSyncerError::concat(errs)));
353    }
354
355    chain_store.mark_block_as_validated(block_cid);
356
357    Ok(block)
358}
359
360/// Validate messages in a full block, relative to the parent tipset.
361///
362/// This includes:
363/// * signature checks
364/// * gas limits, and prices
365/// * account nonce values
366/// * the message root in the header
367///
368/// NB: This loads/computes the state resulting from the execution of the parent
369/// tipset.
370async fn check_block_messages<DB: Blockstore + Send + Sync + 'static>(
371    state_manager: Arc<StateManager<DB>>,
372    block: Arc<Block>,
373    base_tipset: Tipset,
374) -> Result<(), TipsetSyncerError> {
375    let network_version = state_manager
376        .chain_config()
377        .network_version(block.header.epoch);
378    let eth_chain_id = state_manager.chain_config().eth_chain_id;
379
380    if let Some(sig) = &block.header().bls_aggregate {
381        // Do the initial loop here
382        // check block message and signatures in them
383        let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
384        let mut cids = Vec::with_capacity(block.bls_msgs().len());
385        let db = state_manager.blockstore();
386        for m in block.bls_msgs() {
387            let pk = StateManager::get_bls_public_key(db, &m.from, *base_tipset.parent_state())?;
388            pub_keys.push(pk);
389            cids.push(m.cid().to_bytes());
390        }
391
392        if !verify_bls_aggregate(
393            &cids.iter().map(|x| x.as_slice()).collect_vec(),
394            &pub_keys,
395            sig,
396        ) {
397            return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
398                format!("{sig:?}"),
399                format!("{cids:?}"),
400            ));
401        }
402    } else {
403        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
404    }
405
406    let price_list = price_list_by_network_version(network_version);
407    let mut sum_gas_limit = 0;
408
409    // Check messages for validity
410    let mut check_msg = |msg: &Message,
411                         account_sequences: &mut HashMap<Address, u64>,
412                         tree: &StateTree<DB>|
413     -> anyhow::Result<()> {
414        // Phase 1: Syntactic validation
415        let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
416        valid_for_block_inclusion(msg, min_gas.total(), network_version)
417            .map_err(|e| anyhow::anyhow!("{}", e))?;
418        sum_gas_limit += msg.gas_limit;
419        if sum_gas_limit > BLOCK_GAS_LIMIT {
420            anyhow::bail!("block gas limit exceeded");
421        }
422
423        // Phase 2: (Partial) Semantic validation
424        // Send exists and is an account actor, and sequence is correct
425        let sequence: u64 = match account_sequences.get(&msg.from()) {
426            Some(sequence) => *sequence,
427            None => {
428                let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
429                    anyhow::anyhow!(
430                        "Failed to retrieve nonce for addr: Actor does not exist in state"
431                    )
432                })?;
433                let network_version = state_manager
434                    .chain_config()
435                    .network_version(block.header.epoch);
436                if !is_valid_for_sending(network_version, &actor) {
437                    anyhow::bail!("not valid for sending!");
438                }
439                actor.sequence
440            }
441        };
442
443        // Sequence equality check
444        if sequence != msg.sequence {
445            anyhow::bail!(
446                "Message has incorrect sequence (exp: {} got: {})",
447                sequence,
448                msg.sequence
449            );
450        }
451        account_sequences.insert(msg.from(), sequence + 1);
452        Ok(())
453    };
454
455    let mut account_sequences: HashMap<Address, u64> = HashMap::default();
456    let ExecutedTipset { state_root, .. } = state_manager
457        .load_executed_tipset(&base_tipset)
458        .await
459        .map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e:#}")))?;
460    let tree =
461        StateTree::new_from_root(state_manager.blockstore_owned(), &state_root).map_err(|e| {
462            TipsetSyncerError::Calculation(format!(
463                "Could not load from new state root in state manager: {e:#}"
464            ))
465        })?;
466
467    // Check validity for BLS messages
468    for (i, msg) in block.bls_msgs().iter().enumerate() {
469        check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
470            TipsetSyncerError::Validation(format!(
471                "Block had invalid BLS message at index {i}: {e:#}"
472            ))
473        })?;
474    }
475
476    // Check validity for SECP messages
477    for (i, msg) in block.secp_msgs().iter().enumerate() {
478        if msg.signature().signature_type() == SignatureType::Delegated
479            && !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
480        {
481            return Err(TipsetSyncerError::Validation(
482                "Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
483            ));
484        }
485        check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
486            TipsetSyncerError::Validation(format!(
487                "block had an invalid secp message at index {i}: {e:#}"
488            ))
489        })?;
490        // Resolve key address for signature verification
491        let key_addr = state_manager
492            .resolve_to_key_addr(&msg.from(), &base_tipset)
493            .await
494            .map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
495        // SecP256K1 Signature validation
496        msg.signature
497            .authenticate_msg(eth_chain_id, msg, &key_addr)
498            .map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
499    }
500
501    // Validate message root from header matches message root
502    let msg_root = TipsetValidator::compute_msg_root(
503        state_manager.blockstore(),
504        block.bls_msgs(),
505        block.secp_msgs(),
506    )
507    .map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
508    if block.header().messages != msg_root {
509        return Err(TipsetSyncerError::BlockMessageRootInvalid(
510            format!("{:?}", block.header().messages),
511            format!("{msg_root:?}"),
512        ));
513    }
514
515    Ok(())
516}
517
518/// Checks optional values in header.
519///
520/// It only looks for fields which are common to all consensus types.
521fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
522    if header.signature.is_none() {
523        return Err(TipsetSyncerError::BlockWithoutSignature);
524    }
525    if header.bls_aggregate.is_none() {
526        return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
527    }
528    Ok(())
529}
530
531/// Check the clock drift.
532fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
533    let time_now = chrono::Utc::now().timestamp() as u64;
534    if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
535        return Err(TipsetSyncerError::TimeTravellingBlock(
536            time_now,
537            header.timestamp,
538        ));
539    } else if header.timestamp > time_now {
540        warn!(
541            "Got block from the future, but within clock drift threshold, {} > {}",
542            header.timestamp, time_now
543        );
544    }
545    Ok(())
546}