Skip to main content

forest/chain_sync/
chain_follower.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3//! This module contains the logic for driving Forest forward in the Filecoin
4//! blockchain.
5//!
6//! Forest keeps track of the current heaviest tipset, and receives information
7//! about new blocks and tipsets from peers as well as connected miners. The
8//! state machine has the following rules:
9//! - A tipset is invalid if its parent is invalid.
10//! - If a tipset's parent isn't in our database, request it from the network.
11//! - If a tipset's parent has been validated, validate the tipset.
12//! - If a tipset is 1 day older than the heaviest tipset, the tipset is
13//!   invalid. This prevents Forest from following forks that will never be
14//!   accepted.
15//!
16//! The state machine does not do any network requests or validation. Those are
17//! handled by an external actor.
18
19use super::network_context::SyncNetworkContext;
20use crate::{
21    blocks::{Block, FullTipset, Tipset, TipsetKey},
22    chain::{ChainStore, index::ResolveNullTipset},
23    chain_sync::{
24        ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
25        bad_block_cache::{BadBlockCache, SeenBlockCache},
26        metrics,
27        tipset_syncer::{TipsetSyncerError, validate_tipset},
28        validation::GossipBlockValidator,
29    },
30    libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
31    message_pool::MessagePool,
32    networks::calculate_expected_epoch,
33    shim::clock::ChainEpoch,
34    state_manager::StateManager,
35    utils::ShallowClone as _,
36};
37use ahash::{HashMap, HashSet};
38use chrono::Utc;
39use cid::Cid;
40use fvm_ipld_blockstore::Blockstore;
41use itertools::Itertools;
42use libp2p::PeerId;
43use parking_lot::{Mutex, RwLock};
44use std::{sync::Arc, time::Instant};
45use tokio::{sync::Notify, task::JoinSet};
46use tracing::{debug, error, info, trace, warn};
47
48pub struct ChainFollower<DB> {
49    /// Tasks
50    tasks: Arc<Mutex<HashSet<SyncTask>>>,
51
52    /// State machine
53    state_machine: Arc<Mutex<SyncStateMachine<DB>>>,
54
55    /// Syncing status of the chain
56    pub sync_status: SyncStatus,
57
58    /// manages retrieving and updates state objects
59    pub state_manager: Arc<StateManager<DB>>,
60
61    /// Context to be able to send requests to P2P network
62    pub network: SyncNetworkContext<DB>,
63
64    /// Genesis tipset
65    genesis: Tipset,
66
67    /// Bad blocks cache, updates based on invalid state transitions.
68    /// Will mark any invalid blocks and all children as bad in this bounded
69    /// cache
70    pub bad_blocks: Option<Arc<BadBlockCache>>,
71
72    /// Incoming network events to be handled by synchronizer
73    net_handler: flume::Receiver<NetworkEvent>,
74
75    /// Tipset channel sender
76    pub tipset_sender: flume::Sender<FullTipset>,
77
78    /// Tipset channel receiver
79    tipset_receiver: flume::Receiver<FullTipset>,
80
81    /// When `stateless_mode` is true, forest connects to the P2P network but
82    /// does not execute any state transitions. This drastically reduces the
83    /// memory and disk footprint of Forest but also means that Forest will not
84    /// be able to validate the correctness of the chain.
85    stateless_mode: bool,
86
87    /// Message pool
88    mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
89}
90
91impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
92    pub fn new(
93        state_manager: Arc<StateManager<DB>>,
94        network: SyncNetworkContext<DB>,
95        genesis: Tipset,
96        net_handler: flume::Receiver<NetworkEvent>,
97        stateless_mode: bool,
98        mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
99    ) -> Self {
100        crate::def_is_env_truthy!(cache_disabled, "FOREST_DISABLE_BAD_BLOCK_CACHE");
101        let (tipset_sender, tipset_receiver) = flume::bounded(20);
102        let tasks: Arc<Mutex<HashSet<SyncTask>>> = Arc::new(Mutex::new(HashSet::default()));
103        let bad_blocks = if cache_disabled() {
104            tracing::warn!("bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`");
105            None
106        } else {
107            Some(Default::default())
108        };
109        let state_machine = Arc::new(Mutex::new(SyncStateMachine::new(
110            state_manager.chain_store().clone(),
111            bad_blocks.clone(),
112            stateless_mode,
113        )));
114        Self {
115            tasks,
116            state_machine,
117            sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
118            state_manager,
119            network,
120            genesis,
121            bad_blocks,
122            net_handler,
123            tipset_sender,
124            tipset_receiver,
125            stateless_mode,
126            mem_pool,
127        }
128    }
129
130    /// Reset inner states
131    pub fn reset(&self) {
132        let start = Instant::now();
133        self.tasks.lock().clear();
134        self.state_manager
135            .chain_store()
136            .validated_blocks
137            .lock()
138            .clear();
139        self.state_machine.lock().tipsets.clear();
140        if let Some(bad_blocks) = &self.bad_blocks {
141            bad_blocks.clear();
142        }
143        tracing::info!(
144            "chain follower reset, took {}",
145            humantime::format_duration(start.elapsed())
146        );
147    }
148
149    pub async fn run(&self) -> anyhow::Result<()> {
150        chain_follower(
151            &self.tasks,
152            &self.state_machine,
153            &self.state_manager,
154            self.bad_blocks.clone(),
155            self.net_handler.clone(),
156            self.tipset_receiver.clone(),
157            &self.network,
158            &self.mem_pool,
159            &self.sync_status,
160            &self.genesis,
161            self.stateless_mode,
162        )
163        .await
164    }
165}
166
167#[allow(clippy::too_many_arguments)]
168// We receive new full tipsets from the p2p swarm, and from miners that use Forest as their frontend.
169async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
170    tasks: &Arc<Mutex<HashSet<SyncTask>>>,
171    state_machine: &Arc<Mutex<SyncStateMachine<DB>>>,
172    state_manager: &Arc<StateManager<DB>>,
173    bad_block_cache: Option<Arc<BadBlockCache>>,
174    network_rx: flume::Receiver<NetworkEvent>,
175    tipset_receiver: flume::Receiver<FullTipset>,
176    network: &SyncNetworkContext<DB>,
177    mem_pool: &Arc<MessagePool<Arc<ChainStore<DB>>>>,
178    sync_status: &SyncStatus,
179    genesis: &Tipset,
180    stateless_mode: bool,
181) -> anyhow::Result<()> {
182    let state_changed = Arc::new(Notify::new());
183
184    let seen_block_cache = SeenBlockCache::default();
185
186    let mut set = JoinSet::new();
187
188    // Increment metrics, update peer information, and forward tipsets to the state machine.
189    set.spawn({
190        let state_manager = state_manager.shallow_clone();
191        let state_changed = state_changed.shallow_clone();
192        let state_machine = state_machine.shallow_clone();
193        let network = network.shallow_clone();
194        let mem_pool = mem_pool.shallow_clone();
195        let genesis = genesis.shallow_clone();
196        let bad_block_cache = bad_block_cache.shallow_clone();
197        let seen_block_cache = seen_block_cache.shallow_clone();
198        async move {
199            while let Ok(event) = network_rx.recv_async().await {
200                inc_gossipsub_event_metrics(&event);
201
202                update_peer_info(
203                    &event,
204                    &network,
205                    state_manager.chain_store().clone(),
206                    &genesis,
207                );
208
209                let Ok(tipset) = (match event {
210                    NetworkEvent::HelloResponseOutbound { request, source } => {
211                        let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
212                        get_full_tipset(
213                            &network,
214                            state_manager.chain_store(),
215                            Some(source),
216                            &tipset_keys,
217                        )
218                        .await
219                        .inspect_err(|e| debug!("Querying full tipset failed: {}", e))
220                    }
221                    NetworkEvent::PubsubMessage { message } => match message {
222                        PubsubMessage::Block(b) => {
223                            let cs = state_manager.chain_store();
224                            let cfg = cs.chain_config();
225                            if let Err(reason) = GossipBlockValidator::new(&b).validate_pre_fetch(
226                                &genesis,
227                                cfg.block_delay_secs,
228                                cfg.policy.chain_finality,
229                                cs.heaviest_tipset().epoch(),
230                                bad_block_cache.as_deref(),
231                                &seen_block_cache,
232                            ) {
233                                metrics::GOSSIP_BLOCK_REJECTED_TOTAL
234                                    .get_or_create(&metrics::GossipRejectReasonLabel {
235                                        reason: reason.label(),
236                                    })
237                                    .inc();
238                                debug!("Rejected gossip block {}: {reason}", b.header.cid());
239                                continue;
240                            }
241                            let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
242                            get_full_tipset(&network, cs, None, &key).await
243                        }
244                        PubsubMessage::Message(m) => {
245                            if let Err(why) = mem_pool.add(m) {
246                                debug!("Received invalid GossipSub message: {}", why);
247                            }
248                            continue;
249                        }
250                    },
251                    _ => continue,
252                }) else {
253                    continue;
254                };
255                {
256                    state_machine
257                        .lock()
258                        .update(SyncEvent::NewFullTipsets(vec![tipset]));
259                    state_changed.notify_one();
260                }
261            }
262        }
263    });
264
265    // Forward tipsets from miners into the state machine.
266    set.spawn({
267        let state_changed = state_changed.clone();
268        let state_machine = state_machine.clone();
269
270        async move {
271            while let Ok(tipset) = tipset_receiver.recv_async().await {
272                state_machine
273                    .lock()
274                    .update(SyncEvent::NewFullTipsets(vec![tipset]));
275                state_changed.notify_one();
276            }
277        }
278    });
279
280    // When the state machine is updated, we need to update the sync status and spawn tasks
281    set.spawn({
282        let state_manager = state_manager.shallow_clone();
283        let state_machine = state_machine.shallow_clone();
284        let network = network.shallow_clone();
285        let sync_status = sync_status.shallow_clone();
286        let state_changed = state_changed.shallow_clone();
287        let tasks = tasks.shallow_clone();
288        let bad_block_cache = bad_block_cache.shallow_clone();
289        async move {
290            loop {
291                state_changed.notified().await;
292
293                let mut tasks_set = tasks.lock();
294                let (task_vec, current_active_forks) = state_machine.lock().tasks();
295
296                // Update the sync states
297                {
298                    let old_status_report = sync_status.read().clone();
299                    let new_status_report = old_status_report.update(
300                        &state_manager,
301                        current_active_forks,
302                        stateless_mode,
303                    );
304
305                    sync_status.write().clone_from(&new_status_report);
306                }
307
308                for task in task_vec {
309                    // insert task into tasks. If task is already in tasks, skip. If it is not, spawn it.
310                    let new = tasks_set.insert(task.clone());
311                    if new {
312                        let action = task.clone().execute(
313                            network.shallow_clone(),
314                            state_manager.shallow_clone(),
315                            stateless_mode,
316                            bad_block_cache.shallow_clone(),
317                        );
318                        tokio::spawn({
319                            let tasks = tasks.clone();
320                            let state_machine = state_machine.clone();
321                            let state_changed = state_changed.clone();
322                            async move {
323                                if let Some(event) = action.await {
324                                    state_machine.lock().update(event);
325                                    state_changed.notify_one();
326                                }
327                                tasks.lock().remove(&task);
328                            }
329                        });
330                    }
331                }
332            }
333        }
334    });
335
336    // Periodically report progress if there are any tipsets left to be fetched.
337    // Once we're in steady-state (i.e. caught up to HEAD) and there are no
338    // active forks, this will not report anything.
339    set.spawn({
340        let state_manager = state_manager.clone();
341        let state_machine = state_machine.clone();
342        async move {
343            loop {
344                tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
345                let (tasks_set, _) = state_machine.lock().tasks();
346                let heaviest_tipset = state_manager.chain_store().heaviest_tipset();
347                let heaviest_epoch = heaviest_tipset.epoch();
348
349                let to_download = tasks_set
350                    .iter()
351                    .filter_map(|task| match task {
352                        SyncTask::FetchTipset(_, epoch) => Some(epoch - heaviest_epoch),
353                        _ => None,
354                    })
355                    .max()
356                    .unwrap_or(0);
357
358                let expected_head = calculate_expected_epoch(
359                    Utc::now().timestamp() as u64,
360                    state_manager.chain_store().genesis_block_header().timestamp,
361                    state_manager.chain_config().block_delay_secs,
362                );
363
364                // Only print 'Catching up to HEAD' if we're more than 10 epochs
365                // behind. Otherwise it can be too spammy.
366                match (expected_head - heaviest_epoch > 10, to_download > 0) {
367                    (true, true) => info!(
368                        "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}, downloading {to_download} tipsets"
369                        , heaviest_tipset.key()
370                    ),
371                    (true, false) => info!(
372                        "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}"
373                        , heaviest_tipset.key()
374                    ),
375                    (false, true) => {
376                        info!("Downloading {to_download} tipsets")
377                    }
378                    (false, false) => {}
379                }
380            }
381        }
382    });
383
384    set.join_all().await;
385    Ok(())
386}
387
388// Increment the gossipsub event metrics.
389fn inc_gossipsub_event_metrics(event: &NetworkEvent) {
390    let label = match event {
391        NetworkEvent::HelloRequestInbound => metrics::values::HELLO_REQUEST_INBOUND,
392        NetworkEvent::HelloResponseOutbound { .. } => metrics::values::HELLO_RESPONSE_OUTBOUND,
393        NetworkEvent::HelloRequestOutbound => metrics::values::HELLO_REQUEST_OUTBOUND,
394        NetworkEvent::HelloResponseInbound => metrics::values::HELLO_RESPONSE_INBOUND,
395        NetworkEvent::PeerConnected(_) => metrics::values::PEER_CONNECTED,
396        NetworkEvent::PeerDisconnected(_) => metrics::values::PEER_DISCONNECTED,
397        NetworkEvent::PubsubMessage { message } => match message {
398            PubsubMessage::Block(_) => metrics::values::PUBSUB_BLOCK,
399            PubsubMessage::Message(_) => metrics::values::PUBSUB_MESSAGE,
400        },
401        NetworkEvent::ChainExchangeRequestOutbound => {
402            metrics::values::CHAIN_EXCHANGE_REQUEST_OUTBOUND
403        }
404        NetworkEvent::ChainExchangeResponseInbound => {
405            metrics::values::CHAIN_EXCHANGE_RESPONSE_INBOUND
406        }
407        NetworkEvent::ChainExchangeRequestInbound => {
408            metrics::values::CHAIN_EXCHANGE_REQUEST_INBOUND
409        }
410        NetworkEvent::ChainExchangeResponseOutbound => {
411            metrics::values::CHAIN_EXCHANGE_RESPONSE_OUTBOUND
412        }
413    };
414
415    metrics::LIBP2P_MESSAGE_TOTAL.get_or_create(&label).inc();
416}
417
418// Keep our peer manager up to date.
419fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
420    event: &NetworkEvent,
421    network: &SyncNetworkContext<DB>,
422    chain_store: Arc<ChainStore<DB>>,
423    genesis: &Tipset,
424) {
425    match event {
426        NetworkEvent::PeerConnected(peer_id) => {
427            let genesis_cid = *genesis.block_headers().first().cid();
428            // Spawn and immediately move on to the next event
429            tokio::task::spawn(handle_peer_connected_event(
430                network.shallow_clone(),
431                chain_store,
432                *peer_id,
433                genesis_cid,
434            ));
435        }
436        NetworkEvent::PeerDisconnected(peer_id) => {
437            handle_peer_disconnected_event(network, *peer_id);
438        }
439        _ => {}
440    }
441}
442
443async fn handle_peer_connected_event<DB: Blockstore + Sync + Send + 'static>(
444    network: SyncNetworkContext<DB>,
445    chain_store: Arc<ChainStore<DB>>,
446    peer_id: PeerId,
447    genesis_block_cid: Cid,
448) {
449    // Query the heaviest TipSet from the store
450    if network.peer_manager().is_peer_new(&peer_id) {
451        // Since the peer is new, send them a hello request
452        // Query the heaviest TipSet from the store
453        let heaviest = chain_store.heaviest_tipset();
454        let request = HelloRequest {
455            heaviest_tip_set: heaviest.cids(),
456            heaviest_tipset_height: heaviest.epoch(),
457            heaviest_tipset_weight: heaviest.weight().clone().into(),
458            genesis_cid: genesis_block_cid,
459        };
460        let (peer_id, moment_sent, response) = match network.hello_request(peer_id, request).await {
461            Ok(response) => response,
462            Err(e) => {
463                debug!("Hello request failed: {}", e);
464                return;
465            }
466        };
467        let dur = Instant::now().duration_since(moment_sent);
468
469        // Update the peer metadata based on the response
470        match response {
471            Some(_) => {
472                network.peer_manager().log_success(&peer_id, dur);
473            }
474            None => {
475                network.peer_manager().log_failure(&peer_id, dur);
476            }
477        }
478    }
479}
480
481fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
482    network: &SyncNetworkContext<DB>,
483    peer_id: PeerId,
484) {
485    network.peer_manager().remove_peer(&peer_id);
486    network.peer_manager().unmark_peer_bad(&peer_id);
487}
488
489pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
490    network: &SyncNetworkContext<DB>,
491    chain_store: &ChainStore<DB>,
492    peer_id: Option<PeerId>,
493    tipset_keys: &TipsetKey,
494) -> anyhow::Result<FullTipset> {
495    // Attempt to load from the store
496    if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
497        return Ok(full_tipset);
498    }
499    // Load from the network
500    let tipset = network
501        .chain_exchange_full_tipset(peer_id, tipset_keys)
502        .await
503        .map_err(|e| anyhow::anyhow!(e))?;
504    tipset.persist(chain_store.blockstore())?;
505
506    Ok(tipset)
507}
508
509async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
510    network: &SyncNetworkContext<DB>,
511    chain_store: &ChainStore<DB>,
512    peer_id: Option<PeerId>,
513    tipset_keys: &TipsetKey,
514) -> anyhow::Result<Vec<FullTipset>> {
515    // Attempt to load from the store
516    if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
517        return Ok(vec![full_tipset]);
518    }
519    // Load from the network
520    let tipsets = network
521        .chain_exchange_full_tipsets(peer_id, tipset_keys)
522        .await
523        .map_err(|e| anyhow::anyhow!(e))?;
524
525    for tipset in tipsets.iter() {
526        tipset.persist(chain_store.blockstore())?;
527    }
528
529    Ok(tipsets)
530}
531
532pub fn load_full_tipset<DB: Blockstore>(
533    chain_store: &ChainStore<DB>,
534    tipset_keys: &TipsetKey,
535) -> anyhow::Result<FullTipset> {
536    // Retrieve tipset from store based on passed in TipsetKey
537    let ts = chain_store
538        .chain_index()
539        .load_required_tipset(tipset_keys)?;
540    let blocks: Vec<_> = ts
541        .block_headers()
542        .iter()
543        .map(|header| -> anyhow::Result<Block> {
544            let (bls_msgs, secp_msgs) =
545                crate::chain::block_messages(chain_store.blockstore(), header)?;
546            Ok(Block {
547                header: header.clone(),
548                bls_messages: bls_msgs,
549                secp_messages: secp_msgs,
550            })
551        })
552        .try_collect()?;
553    // Construct FullTipset
554    let fts = FullTipset::new(blocks)?;
555    Ok(fts)
556}
557
558enum SyncEvent {
559    NewFullTipsets(Vec<FullTipset>),
560    BadTipset(FullTipset),
561    ValidatedTipset {
562        tipset: FullTipset,
563        is_proposed_head: bool,
564    },
565}
566
567impl std::fmt::Display for SyncEvent {
568    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569        fn tss_to_string(tss: &[FullTipset]) -> String {
570            format!(
571                "epoch: {}-{}",
572                tss.first().map(|ts| ts.epoch()).unwrap_or_default(),
573                tss.last().map(|ts| ts.epoch()).unwrap_or_default()
574            )
575        }
576
577        match self {
578            Self::NewFullTipsets(tss) => write!(f, "NewFullTipsets({})", tss_to_string(tss)),
579            Self::BadTipset(ts) => {
580                write!(f, "BadTipset(epoch: {}, key: {})", ts.epoch(), ts.key())
581            }
582            Self::ValidatedTipset {
583                tipset,
584                is_proposed_head,
585            } => write!(
586                f,
587                "ValidatedTipset(epoch: {}, key: {}, is_proposed_head: {is_proposed_head})",
588                tipset.epoch(),
589                tipset.key()
590            ),
591        }
592    }
593}
594
595struct SyncStateMachine<DB> {
596    cs: Arc<ChainStore<DB>>,
597    bad_block_cache: Option<Arc<BadBlockCache>>,
598    // Map from TipsetKey to FullTipset
599    tipsets: HashMap<TipsetKey, FullTipset>,
600    stateless_mode: bool,
601}
602
603impl<DB: Blockstore> SyncStateMachine<DB> {
604    pub fn new(
605        cs: Arc<ChainStore<DB>>,
606        bad_block_cache: Option<Arc<BadBlockCache>>,
607        stateless_mode: bool,
608    ) -> Self {
609        Self {
610            cs,
611            bad_block_cache,
612            tipsets: HashMap::default(),
613            stateless_mode,
614        }
615    }
616
617    // Compute the list of chains from the tipsets map
618    fn chains(&self) -> Vec<Vec<FullTipset>> {
619        let mut chains = Vec::new();
620        let mut remaining_tipsets = self.tipsets.clone();
621
622        while let Some(heaviest) = remaining_tipsets
623            .values()
624            .max_by_key(|ts| ts.weight())
625            .cloned()
626        {
627            // Build chain starting from heaviest
628            let mut chain = Vec::new();
629            let mut current = Some(heaviest);
630
631            while let Some(tipset) = current.take() {
632                remaining_tipsets.remove(tipset.key());
633
634                // Find parent in tipsets map
635                current = self.tipsets.get(tipset.parents()).cloned();
636
637                chain.push(tipset);
638            }
639            chain.reverse();
640            chains.push(chain);
641        }
642
643        chains
644    }
645
646    fn is_parent_validated(&self, tipset: &FullTipset) -> bool {
647        let db = self.cs.blockstore();
648        self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
649    }
650
651    fn is_ready_for_validation(&self, tipset: &FullTipset) -> bool {
652        if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
653            // Skip validation in stateless mode and for genesis tipset
654            true
655        } else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
656            let head_ts = self.cs.heaviest_tipset();
657            // Treat post-head-epoch tipsets as not validated to fix <https://github.com/ChainSafe/forest/issues/5677>
658            // basically, the follow task should always start from the current head which could be manually set
659            // to an old one. When a post-head-epoch tipset is considered validated, it could mess up the state machine
660            // in some edge cases and the node ends up being stuck with ever-empty sync task queue as reported
661            // in <https://github.com/ChainSafe/forest/issues/5679>.
662            if parent_ts.key() == head_ts.key() {
663                true
664            } else if parent_ts.epoch() >= head_ts.epoch() {
665                false
666            } else {
667                self.is_parent_validated(tipset)
668            }
669        } else {
670            false
671        }
672    }
673
674    fn add_full_tipset(&mut self, tipset: FullTipset) {
675        if let Err(why) = TipsetValidator(&tipset).validate(
676            &self.cs,
677            self.bad_block_cache.as_ref().map(AsRef::as_ref),
678            &self.cs.genesis_tipset(),
679            self.cs.chain_config().block_delay_secs,
680        ) {
681            metrics::INVALID_TIPSET_TOTAL.inc();
682            trace!("Skipping invalid tipset: {}", why);
683            self.mark_bad_tipset(tipset);
684            return;
685        }
686
687        // Check if tipset is outside the chain_finality window
688        let heaviest = self.cs.heaviest_tipset();
689        let epoch_diff = heaviest.epoch() - tipset.epoch();
690
691        if epoch_diff > self.cs.chain_config().policy.chain_finality {
692            self.mark_bad_tipset(tipset);
693            return;
694        }
695
696        // Check if tipset already exists
697        if self.tipsets.contains_key(tipset.key()) {
698            return;
699        }
700
701        // Skip if tipset is part of the current chain.
702        if let Ok(Some(ts)) = self.cs.chain_index().tipset_by_height(
703            tipset.epoch(),
704            self.cs.heaviest_tipset(),
705            ResolveNullTipset::TakeOlder,
706        ) && ts.key() == tipset.key()
707        {
708            return;
709        }
710
711        // Find any existing tipsets with same epoch and parents
712        let mut to_remove = Vec::new();
713        #[allow(clippy::mutable_key_type)]
714        let mut merged_blocks: HashSet<_> = tipset.blocks().iter().cloned().collect();
715
716        // Collect all parent references from existing tipsets
717        let parent_refs: HashSet<_> = self
718            .tipsets
719            .values()
720            .map(|ts| ts.parents().clone())
721            .collect();
722
723        for (key, existing_ts) in self.tipsets.iter() {
724            if existing_ts.epoch() == tipset.epoch() && existing_ts.parents() == tipset.parents() {
725                // Only mark for removal if not referenced as a parent
726                if !parent_refs.contains(key) {
727                    to_remove.push(key.clone());
728                }
729                // Add blocks from existing tipset - HashSet handles deduplication automatically
730                merged_blocks.extend(existing_ts.blocks().iter().cloned());
731            }
732        }
733
734        // Remove old tipsets that were merged and aren't referenced
735        for key in to_remove {
736            self.tipsets.remove(&key);
737        }
738
739        // Create and insert new merged tipset
740        if let Ok(merged_tipset) = FullTipset::new(merged_blocks) {
741            self.tipsets
742                .insert(merged_tipset.key().clone(), merged_tipset);
743        }
744    }
745
746    // Mark blocks in tipset as bad.
747    // Mark all descendants of tipsets as bad.
748    // Remove all bad tipsets from the tipset map.
749    fn mark_bad_tipset(&mut self, tipset: FullTipset) {
750        let mut stack = vec![tipset];
751        while let Some(tipset) = stack.pop() {
752            self.tipsets.remove(tipset.key());
753            // Find all descendant tipsets (tipsets that have this tipset as a parent)
754            let mut to_remove = Vec::new();
755            let mut descendants = Vec::new();
756
757            for (key, ts) in self.tipsets.iter() {
758                if ts.parents() == tipset.key() {
759                    to_remove.push(key.clone());
760                    descendants.push(ts.clone());
761                }
762            }
763
764            // Remove bad tipsets from the map
765            for key in to_remove {
766                self.tipsets.remove(&key);
767            }
768
769            // Mark descendants as bad
770            stack.extend(descendants);
771        }
772    }
773
774    fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) {
775        if !self.is_parent_validated(&tipset) {
776            tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), parent_state = %tipset.parent_state(), "Parent tipset must be validated");
777            return;
778        }
779
780        self.tipsets.remove(tipset.key());
781        let tipset = tipset.into_tipset();
782        // cs.put_tipset requires state and doesn't work in stateless mode
783        if self.stateless_mode {
784            let epoch = tipset.epoch();
785            let terse_key = tipset.key().terse();
786            if self.cs.heaviest_tipset().weight() < tipset.weight() {
787                if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
788                    error!("Error setting heaviest tipset: {}", e);
789                } else {
790                    info!("Heaviest tipset: {} ({})", epoch, terse_key);
791                }
792            }
793        } else if is_proposed_head {
794            if let Err(e) = self.cs.put_tipset(&tipset) {
795                error!("Error putting tipset: {e}");
796            }
797        } else if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
798            error!("Error setting heaviest tipset: {e}");
799        }
800    }
801
802    pub fn update(&mut self, event: SyncEvent) {
803        tracing::trace!("update: {event}");
804        match event {
805            SyncEvent::NewFullTipsets(tipsets) => {
806                for tipset in tipsets {
807                    self.add_full_tipset(tipset);
808                }
809            }
810            SyncEvent::BadTipset(tipset) => self.mark_bad_tipset(tipset),
811            SyncEvent::ValidatedTipset {
812                tipset,
813                is_proposed_head,
814            } => self.mark_validated_tipset(tipset, is_proposed_head),
815        }
816    }
817
818    pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
819        // Get the node's current validated head epoch once, as it's the same for all forks.
820        let current_validated_epoch = self.cs.heaviest_tipset().epoch();
821        let now = Utc::now();
822
823        let mut active_sync_info = Vec::new();
824        let mut tasks = Vec::new();
825        for chain in self.chains() {
826            if let Some(first_ts) = chain.first() {
827                let last_ts = chain.last().expect("Infallible");
828                let stage: ForkSyncStage;
829                let start_time = Some(now);
830
831                if !self.is_ready_for_validation(first_ts) {
832                    stage = ForkSyncStage::FetchingHeaders;
833                    tasks.push(SyncTask::FetchTipset(
834                        first_ts.parents().clone(),
835                        first_ts.epoch(),
836                    ));
837                } else {
838                    stage = ForkSyncStage::ValidatingTipsets;
839                    tasks.push(SyncTask::ValidateTipset {
840                        tipset: first_ts.clone(),
841                        is_proposed_head: chain.len() == 1,
842                    });
843                }
844
845                let fork_info = ForkSyncInfo {
846                    target_tipset_key: last_ts.key().clone(),
847                    target_epoch: last_ts.epoch(),
848                    target_sync_epoch_start: first_ts.epoch(),
849                    stage,
850                    validated_chain_head_epoch: current_validated_epoch,
851                    start_time,
852                    last_updated: Some(now),
853                };
854
855                active_sync_info.push(fork_info);
856            }
857        }
858        (tasks, active_sync_info)
859    }
860}
861
862#[derive(PartialEq, Eq, Hash, Clone, Debug)]
863enum SyncTask {
864    ValidateTipset {
865        tipset: FullTipset,
866        is_proposed_head: bool,
867    },
868    FetchTipset(TipsetKey, ChainEpoch),
869}
870
871impl std::fmt::Display for SyncTask {
872    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
873        match self {
874            SyncTask::ValidateTipset {
875                tipset,
876                is_proposed_head,
877            } => write!(
878                f,
879                "ValidateTipset(epoch: {}, is_proposed_head: {is_proposed_head})",
880                tipset.epoch()
881            ),
882            SyncTask::FetchTipset(key, epoch) => {
883                let s = key.to_string();
884                write!(
885                    f,
886                    "FetchTipset({}, epoch: {})",
887                    &s[s.len().saturating_sub(8)..],
888                    epoch
889                )
890            }
891        }
892    }
893}
894
895impl SyncTask {
896    async fn execute<DB: Blockstore + Sync + Send + 'static>(
897        self,
898        network: SyncNetworkContext<DB>,
899        state_manager: Arc<StateManager<DB>>,
900        stateless_mode: bool,
901        bad_block_cache: Option<Arc<BadBlockCache>>,
902    ) -> Option<SyncEvent> {
903        tracing::trace!("SyncTask::execute {self}");
904        match self {
905            SyncTask::ValidateTipset {
906                tipset,
907                is_proposed_head,
908            } if stateless_mode => Some(SyncEvent::ValidatedTipset {
909                tipset,
910                is_proposed_head,
911            }),
912            SyncTask::ValidateTipset {
913                tipset,
914                is_proposed_head,
915            } => match validate_tipset(&state_manager, tipset.clone(), bad_block_cache).await {
916                Ok(()) => Some(SyncEvent::ValidatedTipset {
917                    tipset,
918                    is_proposed_head,
919                }),
920                // If temporal drift error, don't mark as bad, just skip validation and try again
921                // later. This mirrors internal logic where temporal drift doesn't mark a block as
922                // bad permanently, since it could be valid later on. If not done, a single
923                // time-traveling block could cause the node to be stuck without making progress.
924                Err(e) if matches!(e, TipsetSyncerError::TimeTravellingBlock { .. }) => {
925                    warn!("Time travelling block detected, skipping tipset for now: {e}");
926                    None
927                }
928                Err(e) => {
929                    warn!("Error validating tipset: {e}");
930                    Some(SyncEvent::BadTipset(tipset))
931                }
932            },
933            SyncTask::FetchTipset(key, epoch) => {
934                match get_full_tipset_batch(&network, state_manager.chain_store(), None, &key).await
935                {
936                    Ok(parents) => Some(SyncEvent::NewFullTipsets(parents)),
937                    Err(e) => {
938                        tracing::warn!(%key, %epoch, "failed to fetch tipset: {e:#}");
939                        None
940                    }
941                }
942            }
943        }
944    }
945}
946
947#[cfg(test)]
948mod tests {
949    use super::*;
950    use crate::blocks::{Chain4U, HeaderBuilder, chain4u};
951    use crate::db::MemoryDB;
952    use crate::utils::db::CborStoreExt as _;
953    use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
954    use num_bigint::BigInt;
955    use num_traits::ToPrimitive;
956    use std::sync::Arc;
957    use tracing::level_filters::LevelFilter;
958    use tracing_subscriber::EnvFilter;
959
960    fn setup() -> (Arc<ChainStore<MemoryDB>>, Chain4U<Arc<MemoryDB>>) {
961        // Initialize test logger
962        let _ = tracing_subscriber::fmt()
963            .without_time()
964            .with_env_filter(
965                EnvFilter::builder()
966                    .with_default_directive(LevelFilter::DEBUG.into())
967                    .from_env()
968                    .unwrap(),
969            )
970            .try_init();
971
972        let db = Arc::new(MemoryDB::default());
973
974        // Populate DB with message roots used by chain4u
975        {
976            let empty_amt = Amt::<Cid, _>::new(&db).flush().unwrap();
977            db.put_cbor_default(&crate::blocks::TxMeta {
978                bls_message_root: empty_amt,
979                secp_message_root: empty_amt,
980            })
981            .unwrap();
982        }
983
984        // Create a chain of 5 tipsets using Chain4U
985        let c4u = Chain4U::with_blockstore(db.clone());
986        chain4u! {
987            in c4u;
988            [genesis_header = dummy_node(&db, 0)]
989        };
990
991        let cs = Arc::new(
992            ChainStore::new(
993                db.clone(),
994                db.clone(),
995                db.clone(),
996                Default::default(),
997                genesis_header.clone().into(),
998            )
999            .unwrap(),
1000        );
1001
1002        cs.set_heaviest_tipset(cs.genesis_tipset()).unwrap();
1003
1004        (cs, c4u)
1005    }
1006
1007    fn dummy_state(db: impl Blockstore, i: ChainEpoch) -> Cid {
1008        db.put_cbor_default(&i).unwrap()
1009    }
1010
1011    fn dummy_node(db: impl Blockstore, i: ChainEpoch) -> HeaderBuilder {
1012        HeaderBuilder {
1013            state_root: dummy_state(db, i).into(),
1014            weight: BigInt::from(i).into(),
1015            epoch: i.into(),
1016            ..Default::default()
1017        }
1018    }
1019
1020    #[test]
1021    fn test_state_machine_validation_order() {
1022        let (cs, c4u) = setup();
1023        let db = cs.blockstore().clone();
1024
1025        chain4u! {
1026            from [genesis_header] in c4u;
1027            [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)] -> [c = dummy_node(&db, 3)] -> [d = dummy_node(&db, 4)] -> [e = dummy_node(&db, 5)]
1028        };
1029
1030        // Create the state machine
1031        let mut state_machine = SyncStateMachine::new(cs, Default::default(), true);
1032
1033        // Insert tipsets in random order
1034        let tipsets = vec![e, b, d, c, a];
1035
1036        // Convert each block into a FullTipset and add it to the state machine
1037        for block in tipsets {
1038            let full_tipset = FullTipset::new(vec![Block {
1039                header: block.clone().into(),
1040                bls_messages: vec![],
1041                secp_messages: vec![],
1042            }])
1043            .unwrap();
1044            state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1045        }
1046
1047        // Record validation order by processing all validation tasks in each iteration
1048        let mut validation_tasks = Vec::new();
1049        loop {
1050            let (tasks, _) = state_machine.tasks();
1051
1052            // Find all validation tasks
1053            let validation_tipsets: Vec<_> = tasks
1054                .into_iter()
1055                .filter_map(|task| {
1056                    if let SyncTask::ValidateTipset {
1057                        tipset,
1058                        is_proposed_head,
1059                    } = task
1060                    {
1061                        Some((tipset, is_proposed_head))
1062                    } else {
1063                        None
1064                    }
1065                })
1066                .collect();
1067
1068            if validation_tipsets.is_empty() {
1069                break;
1070            }
1071
1072            // Record and mark all tipsets as validated
1073            for (ts, is_proposed_head) in validation_tipsets {
1074                validation_tasks.push(ts.epoch());
1075                db.put_cbor_default(&ts.epoch()).unwrap();
1076                state_machine.mark_validated_tipset(ts, is_proposed_head);
1077            }
1078        }
1079
1080        // We expect validation tasks for epochs 1 through 5 in order
1081        assert_eq!(validation_tasks, vec![1, 2, 3, 4, 5]);
1082    }
1083
1084    #[test]
1085    fn test_sync_state_machine_chain_fragments() {
1086        let (cs, c4u) = setup();
1087        let db = cs.blockstore().clone();
1088
1089        // Create a forked chain
1090        // genesis -> a -> b
1091        //            \--> c
1092        chain4u! {
1093            in c4u;
1094            [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)]
1095        };
1096        chain4u! {
1097            from [a] in c4u;
1098            [c = dummy_node(&db, 3)]
1099        };
1100
1101        // Create the state machine
1102        let mut state_machine = SyncStateMachine::new(cs, Default::default(), false);
1103
1104        // Convert each block into a FullTipset and add it to the state machine
1105        for block in [a, b, c] {
1106            let full_tipset = FullTipset::new(vec![Block {
1107                header: block.clone().into(),
1108                bls_messages: vec![],
1109                secp_messages: vec![],
1110            }])
1111            .unwrap();
1112            state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1113        }
1114
1115        let chains = state_machine
1116            .chains()
1117            .into_iter()
1118            .map(|v| {
1119                v.into_iter()
1120                    .map(|ts| ts.weight().to_i64().unwrap_or(0))
1121                    .collect_vec()
1122            })
1123            .collect_vec();
1124
1125        // Both chains should start at the same tipset
1126        assert_eq!(chains, vec![vec![1, 3], vec![1, 2]]);
1127    }
1128}