Skip to main content

forest/chain_sync/
chain_follower.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3//! This module contains the logic for driving Forest forward in the Filecoin
4//! blockchain.
5//!
6//! Forest keeps track of the current heaviest tipset, and receives information
7//! about new blocks and tipsets from peers as well as connected miners. The
8//! state machine has the following rules:
9//! - A tipset is invalid if its parent is invalid.
10//! - If a tipset's parent isn't in our database, request it from the network.
11//! - If a tipset's parent has been validated, validate the tipset.
12//! - If a tipset is 1 day older than the heaviest tipset, the tipset is
13//!   invalid. This prevents Forest from following forks that will never be
14//!   accepted.
15//!
16//! The state machine does not do any network requests or validation. Those are
17//! handled by an external actor.
18
19use super::network_context::SyncNetworkContext;
20use crate::{
21    blocks::{Block, FullTipset, Tipset, TipsetKey},
22    chain::ChainStore,
23    chain_sync::{
24        ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
25        bad_block_cache::{BadBlockCache, SeenBlockCache},
26        metrics,
27        tipset_syncer::{TipsetSyncerError, validate_tipset},
28        validation::GossipBlockValidator,
29    },
30    libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
31    message_pool::MessagePool,
32    networks::calculate_expected_epoch,
33    shim::clock::ChainEpoch,
34    state_manager::StateManager,
35    utils::ShallowClone as _,
36};
37use ahash::{HashMap, HashSet};
38use chrono::Utc;
39use cid::Cid;
40use fvm_ipld_blockstore::Blockstore;
41use itertools::Itertools;
42use libp2p::PeerId;
43use parking_lot::{Mutex, RwLock};
44use std::{sync::Arc, time::Instant};
45use tokio::{sync::Notify, task::JoinSet};
46use tracing::{debug, error, info, trace, warn};
47
48pub struct ChainFollower<DB> {
49    /// Tasks
50    tasks: Arc<Mutex<HashSet<SyncTask>>>,
51
52    /// State machine
53    state_machine: Arc<Mutex<SyncStateMachine<DB>>>,
54
55    /// Syncing status of the chain
56    pub sync_status: SyncStatus,
57
58    /// manages retrieving and updates state objects
59    pub state_manager: Arc<StateManager<DB>>,
60
61    /// Context to be able to send requests to P2P network
62    pub network: SyncNetworkContext<DB>,
63
64    /// Genesis tipset
65    genesis: Tipset,
66
67    /// Bad blocks cache, updates based on invalid state transitions.
68    /// Will mark any invalid blocks and all children as bad in this bounded
69    /// cache
70    pub bad_blocks: Option<Arc<BadBlockCache>>,
71
72    /// Incoming network events to be handled by synchronizer
73    net_handler: flume::Receiver<NetworkEvent>,
74
75    /// Tipset channel sender
76    pub tipset_sender: flume::Sender<FullTipset>,
77
78    /// Tipset channel receiver
79    tipset_receiver: flume::Receiver<FullTipset>,
80
81    /// When `stateless_mode` is true, forest connects to the P2P network but
82    /// does not execute any state transitions. This drastically reduces the
83    /// memory and disk footprint of Forest but also means that Forest will not
84    /// be able to validate the correctness of the chain.
85    stateless_mode: bool,
86
87    /// Message pool
88    mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
89}
90
91impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
92    pub fn new(
93        state_manager: Arc<StateManager<DB>>,
94        network: SyncNetworkContext<DB>,
95        genesis: Tipset,
96        net_handler: flume::Receiver<NetworkEvent>,
97        stateless_mode: bool,
98        mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
99    ) -> Self {
100        crate::def_is_env_truthy!(cache_disabled, "FOREST_DISABLE_BAD_BLOCK_CACHE");
101        let (tipset_sender, tipset_receiver) = flume::bounded(20);
102        let tasks: Arc<Mutex<HashSet<SyncTask>>> = Arc::new(Mutex::new(HashSet::default()));
103        let bad_blocks = if cache_disabled() {
104            tracing::warn!("bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`");
105            None
106        } else {
107            Some(Default::default())
108        };
109        let state_machine = Arc::new(Mutex::new(SyncStateMachine::new(
110            state_manager.chain_store().clone(),
111            bad_blocks.clone(),
112            stateless_mode,
113        )));
114        Self {
115            tasks,
116            state_machine,
117            sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
118            state_manager,
119            network,
120            genesis,
121            bad_blocks,
122            net_handler,
123            tipset_sender,
124            tipset_receiver,
125            stateless_mode,
126            mem_pool,
127        }
128    }
129
130    /// Reset inner states
131    pub fn reset(&self) {
132        let start = Instant::now();
133        self.tasks.lock().clear();
134        self.state_manager
135            .chain_store()
136            .validated_blocks
137            .lock()
138            .clear();
139        self.state_machine.lock().tipsets.clear();
140        if let Some(bad_blocks) = &self.bad_blocks {
141            bad_blocks.clear();
142        }
143        tracing::info!(
144            "chain follower reset, took {}",
145            humantime::format_duration(start.elapsed())
146        );
147    }
148
149    pub async fn run(&self) -> anyhow::Result<()> {
150        chain_follower(
151            &self.tasks,
152            &self.state_machine,
153            &self.state_manager,
154            self.bad_blocks.clone(),
155            self.net_handler.clone(),
156            self.tipset_receiver.clone(),
157            &self.network,
158            &self.mem_pool,
159            &self.sync_status,
160            &self.genesis,
161            self.stateless_mode,
162        )
163        .await
164    }
165}
166
167#[allow(clippy::too_many_arguments)]
168// We receive new full tipsets from the p2p swarm, and from miners that use Forest as their frontend.
169async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
170    tasks: &Arc<Mutex<HashSet<SyncTask>>>,
171    state_machine: &Arc<Mutex<SyncStateMachine<DB>>>,
172    state_manager: &Arc<StateManager<DB>>,
173    bad_block_cache: Option<Arc<BadBlockCache>>,
174    network_rx: flume::Receiver<NetworkEvent>,
175    tipset_receiver: flume::Receiver<FullTipset>,
176    network: &SyncNetworkContext<DB>,
177    mem_pool: &Arc<MessagePool<Arc<ChainStore<DB>>>>,
178    sync_status: &SyncStatus,
179    genesis: &Tipset,
180    stateless_mode: bool,
181) -> anyhow::Result<()> {
182    let state_changed = Arc::new(Notify::new());
183
184    let seen_block_cache = SeenBlockCache::default();
185
186    let mut set = JoinSet::new();
187
188    // Increment metrics, update peer information, and forward tipsets to the state machine.
189    set.spawn({
190        let state_manager = state_manager.shallow_clone();
191        let state_changed = state_changed.shallow_clone();
192        let state_machine = state_machine.shallow_clone();
193        let network = network.shallow_clone();
194        let mem_pool = mem_pool.shallow_clone();
195        let genesis = genesis.shallow_clone();
196        let bad_block_cache = bad_block_cache.shallow_clone();
197        let seen_block_cache = seen_block_cache.shallow_clone();
198        async move {
199            while let Ok(event) = network_rx.recv_async().await {
200                inc_gossipsub_event_metrics(&event);
201
202                update_peer_info(
203                    &event,
204                    &network,
205                    state_manager.chain_store().clone(),
206                    &genesis,
207                );
208
209                let Ok(tipset) = (match event {
210                    NetworkEvent::HelloResponseOutbound { request, source } => {
211                        let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
212                        get_full_tipset(
213                            &network,
214                            state_manager.chain_store(),
215                            Some(source),
216                            &tipset_keys,
217                        )
218                        .await
219                        .inspect_err(|e| debug!("Querying full tipset failed: {}", e))
220                    }
221                    NetworkEvent::PubsubMessage { message } => match message {
222                        PubsubMessage::Block(b) => {
223                            let cs = state_manager.chain_store();
224                            let cfg = cs.chain_config();
225                            if let Err(reason) = GossipBlockValidator::new(&b).validate_pre_fetch(
226                                &genesis,
227                                cfg.block_delay_secs,
228                                cfg.policy.chain_finality,
229                                cs.heaviest_tipset().epoch(),
230                                bad_block_cache.as_deref(),
231                                &seen_block_cache,
232                            ) {
233                                metrics::GOSSIP_BLOCK_REJECTED_TOTAL
234                                    .get_or_create(&metrics::GossipRejectReasonLabel {
235                                        reason: reason.label(),
236                                    })
237                                    .inc();
238                                debug!("Rejected gossip block {}: {reason}", b.header.cid());
239                                continue;
240                            }
241                            let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
242                            get_full_tipset(&network, cs, None, &key).await
243                        }
244                        PubsubMessage::Message(m) => {
245                            if let Err(why) = mem_pool.add(m) {
246                                debug!("Received invalid GossipSub message: {}", why);
247                            }
248                            continue;
249                        }
250                    },
251                    _ => continue,
252                }) else {
253                    continue;
254                };
255                {
256                    state_machine
257                        .lock()
258                        .update(SyncEvent::NewFullTipsets(vec![tipset]));
259                    state_changed.notify_one();
260                }
261            }
262        }
263    });
264
265    // Forward tipsets from miners into the state machine.
266    set.spawn({
267        let state_changed = state_changed.clone();
268        let state_machine = state_machine.clone();
269
270        async move {
271            while let Ok(tipset) = tipset_receiver.recv_async().await {
272                state_machine
273                    .lock()
274                    .update(SyncEvent::NewFullTipsets(vec![tipset]));
275                state_changed.notify_one();
276            }
277        }
278    });
279
280    // When the state machine is updated, we need to update the sync status and spawn tasks
281    set.spawn({
282        let state_manager = state_manager.shallow_clone();
283        let state_machine = state_machine.shallow_clone();
284        let network = network.shallow_clone();
285        let sync_status = sync_status.shallow_clone();
286        let state_changed = state_changed.shallow_clone();
287        let tasks = tasks.shallow_clone();
288        let bad_block_cache = bad_block_cache.shallow_clone();
289        async move {
290            loop {
291                state_changed.notified().await;
292
293                let mut tasks_set = tasks.lock();
294                let (task_vec, current_active_forks) = state_machine.lock().tasks();
295
296                // Update the sync states
297                {
298                    let old_status_report = sync_status.read().clone();
299                    let new_status_report = old_status_report.update(
300                        &state_manager,
301                        current_active_forks,
302                        stateless_mode,
303                    );
304
305                    sync_status.write().clone_from(&new_status_report);
306                }
307
308                for task in task_vec {
309                    // insert task into tasks. If task is already in tasks, skip. If it is not, spawn it.
310                    let new = tasks_set.insert(task.clone());
311                    if new {
312                        let action = task.clone().execute(
313                            network.shallow_clone(),
314                            state_manager.shallow_clone(),
315                            stateless_mode,
316                            bad_block_cache.shallow_clone(),
317                        );
318                        tokio::spawn({
319                            let tasks = tasks.clone();
320                            let state_machine = state_machine.clone();
321                            let state_changed = state_changed.clone();
322                            async move {
323                                if let Some(event) = action.await {
324                                    state_machine.lock().update(event);
325                                    state_changed.notify_one();
326                                }
327                                tasks.lock().remove(&task);
328                            }
329                        });
330                    }
331                }
332            }
333        }
334    });
335
336    // Periodically report progress if there are any tipsets left to be fetched.
337    // Once we're in steady-state (i.e. caught up to HEAD) and there are no
338    // active forks, this will not report anything.
339    set.spawn({
340        let state_manager = state_manager.clone();
341        let state_machine = state_machine.clone();
342        async move {
343            loop {
344                tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
345                let (tasks_set, _) = state_machine.lock().tasks();
346                let heaviest_tipset = state_manager.chain_store().heaviest_tipset();
347                let heaviest_epoch = heaviest_tipset.epoch();
348
349                let to_download = tasks_set
350                    .iter()
351                    .filter_map(|task| match task {
352                        SyncTask::FetchTipset(_, epoch) => Some(epoch - heaviest_epoch),
353                        _ => None,
354                    })
355                    .max()
356                    .unwrap_or(0);
357
358                let expected_head = calculate_expected_epoch(
359                    Utc::now().timestamp() as u64,
360                    state_manager.chain_store().genesis_block_header().timestamp,
361                    state_manager.chain_config().block_delay_secs,
362                );
363
364                // Only print 'Catching up to HEAD' if we're more than 10 epochs
365                // behind. Otherwise it can be too spammy.
366                match (expected_head - heaviest_epoch > 10, to_download > 0) {
367                    (true, true) => info!(
368                        "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}, downloading {to_download} tipsets"
369                        , heaviest_tipset.key()
370                    ),
371                    (true, false) => info!(
372                        "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}"
373                        , heaviest_tipset.key()
374                    ),
375                    (false, true) => {
376                        info!("Downloading {to_download} tipsets")
377                    }
378                    (false, false) => {}
379                }
380            }
381        }
382    });
383
384    set.join_all().await;
385    Ok(())
386}
387
388// Increment the gossipsub event metrics.
389fn inc_gossipsub_event_metrics(event: &NetworkEvent) {
390    let label = match event {
391        NetworkEvent::HelloRequestInbound => metrics::values::HELLO_REQUEST_INBOUND,
392        NetworkEvent::HelloResponseOutbound { .. } => metrics::values::HELLO_RESPONSE_OUTBOUND,
393        NetworkEvent::HelloRequestOutbound => metrics::values::HELLO_REQUEST_OUTBOUND,
394        NetworkEvent::HelloResponseInbound => metrics::values::HELLO_RESPONSE_INBOUND,
395        NetworkEvent::PeerConnected(_) => metrics::values::PEER_CONNECTED,
396        NetworkEvent::PeerDisconnected(_) => metrics::values::PEER_DISCONNECTED,
397        NetworkEvent::PubsubMessage { message } => match message {
398            PubsubMessage::Block(_) => metrics::values::PUBSUB_BLOCK,
399            PubsubMessage::Message(_) => metrics::values::PUBSUB_MESSAGE,
400        },
401        NetworkEvent::ChainExchangeRequestOutbound => {
402            metrics::values::CHAIN_EXCHANGE_REQUEST_OUTBOUND
403        }
404        NetworkEvent::ChainExchangeResponseInbound => {
405            metrics::values::CHAIN_EXCHANGE_RESPONSE_INBOUND
406        }
407        NetworkEvent::ChainExchangeRequestInbound => {
408            metrics::values::CHAIN_EXCHANGE_REQUEST_INBOUND
409        }
410        NetworkEvent::ChainExchangeResponseOutbound => {
411            metrics::values::CHAIN_EXCHANGE_RESPONSE_OUTBOUND
412        }
413    };
414
415    metrics::LIBP2P_MESSAGE_TOTAL.get_or_create(&label).inc();
416}
417
418// Keep our peer manager up to date.
419fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
420    event: &NetworkEvent,
421    network: &SyncNetworkContext<DB>,
422    chain_store: Arc<ChainStore<DB>>,
423    genesis: &Tipset,
424) {
425    match event {
426        NetworkEvent::PeerConnected(peer_id) => {
427            let genesis_cid = *genesis.block_headers().first().cid();
428            // Spawn and immediately move on to the next event
429            tokio::task::spawn(handle_peer_connected_event(
430                network.shallow_clone(),
431                chain_store,
432                *peer_id,
433                genesis_cid,
434            ));
435        }
436        NetworkEvent::PeerDisconnected(peer_id) => {
437            handle_peer_disconnected_event(network, *peer_id);
438        }
439        _ => {}
440    }
441}
442
443async fn handle_peer_connected_event<DB: Blockstore + Sync + Send + 'static>(
444    network: SyncNetworkContext<DB>,
445    chain_store: Arc<ChainStore<DB>>,
446    peer_id: PeerId,
447    genesis_block_cid: Cid,
448) {
449    // Query the heaviest TipSet from the store
450    if network.peer_manager().is_peer_new(&peer_id) {
451        // Since the peer is new, send them a hello request
452        // Query the heaviest TipSet from the store
453        let heaviest = chain_store.heaviest_tipset();
454        let request = HelloRequest {
455            heaviest_tip_set: heaviest.cids(),
456            heaviest_tipset_height: heaviest.epoch(),
457            heaviest_tipset_weight: heaviest.weight().clone().into(),
458            genesis_cid: genesis_block_cid,
459        };
460        let (peer_id, moment_sent, response) = match network.hello_request(peer_id, request).await {
461            Ok(response) => response,
462            Err(e) => {
463                debug!("Hello request failed: {}", e);
464                return;
465            }
466        };
467        let dur = Instant::now().duration_since(moment_sent);
468
469        // Update the peer metadata based on the response
470        match response {
471            Some(_) => {
472                network.peer_manager().log_success(&peer_id, dur);
473            }
474            None => {
475                network.peer_manager().log_failure(&peer_id, dur);
476            }
477        }
478    }
479}
480
481fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
482    network: &SyncNetworkContext<DB>,
483    peer_id: PeerId,
484) {
485    network.peer_manager().remove_peer(&peer_id);
486    network.peer_manager().unmark_peer_bad(&peer_id);
487}
488
489pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
490    network: &SyncNetworkContext<DB>,
491    chain_store: &ChainStore<DB>,
492    peer_id: Option<PeerId>,
493    tipset_keys: &TipsetKey,
494) -> anyhow::Result<FullTipset> {
495    // Attempt to load from the store
496    if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
497        return Ok(full_tipset);
498    }
499    // Load from the network
500    let tipset = network
501        .chain_exchange_full_tipset(peer_id, tipset_keys)
502        .await
503        .map_err(|e| anyhow::anyhow!(e))?;
504    tipset.persist(chain_store.blockstore())?;
505
506    Ok(tipset)
507}
508
509async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
510    network: &SyncNetworkContext<DB>,
511    chain_store: &ChainStore<DB>,
512    peer_id: Option<PeerId>,
513    tipset_keys: &TipsetKey,
514) -> anyhow::Result<Vec<FullTipset>> {
515    // Attempt to load from the store
516    if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
517        return Ok(vec![full_tipset]);
518    }
519    // Load from the network
520    let tipsets = network
521        .chain_exchange_full_tipsets(peer_id, tipset_keys)
522        .await
523        .map_err(|e| anyhow::anyhow!(e))?;
524
525    for tipset in tipsets.iter() {
526        tipset.persist(chain_store.blockstore())?;
527    }
528
529    Ok(tipsets)
530}
531
532pub fn load_full_tipset<DB: Blockstore>(
533    chain_store: &ChainStore<DB>,
534    tipset_keys: &TipsetKey,
535) -> anyhow::Result<FullTipset> {
536    // Retrieve tipset from store based on passed in TipsetKey
537    let ts = chain_store
538        .chain_index()
539        .load_required_tipset(tipset_keys)?;
540    let blocks: Vec<_> = ts
541        .block_headers()
542        .iter()
543        .map(|header| -> anyhow::Result<Block> {
544            let (bls_msgs, secp_msgs) =
545                crate::chain::block_messages(chain_store.blockstore(), header)?;
546            Ok(Block {
547                header: header.clone(),
548                bls_messages: bls_msgs,
549                secp_messages: secp_msgs,
550            })
551        })
552        .try_collect()?;
553    // Construct FullTipset
554    let fts = FullTipset::new(blocks)?;
555    Ok(fts)
556}
557
558enum SyncEvent {
559    NewFullTipsets(Vec<FullTipset>),
560    BadTipset(FullTipset),
561    ValidatedTipset {
562        tipset: FullTipset,
563        is_proposed_head: bool,
564    },
565}
566
567impl std::fmt::Display for SyncEvent {
568    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569        fn tss_to_string(tss: &[FullTipset]) -> String {
570            format!(
571                "epoch: {}-{}",
572                tss.first().map(|ts| ts.epoch()).unwrap_or_default(),
573                tss.last().map(|ts| ts.epoch()).unwrap_or_default()
574            )
575        }
576
577        match self {
578            Self::NewFullTipsets(tss) => write!(f, "NewFullTipsets({})", tss_to_string(tss)),
579            Self::BadTipset(ts) => {
580                write!(f, "BadTipset(epoch: {}, key: {})", ts.epoch(), ts.key())
581            }
582            Self::ValidatedTipset {
583                tipset,
584                is_proposed_head,
585            } => write!(
586                f,
587                "ValidatedTipset(epoch: {}, key: {}, is_proposed_head: {is_proposed_head})",
588                tipset.epoch(),
589                tipset.key()
590            ),
591        }
592    }
593}
594
595struct SyncStateMachine<DB> {
596    cs: Arc<ChainStore<DB>>,
597    bad_block_cache: Option<Arc<BadBlockCache>>,
598    // Map from TipsetKey to FullTipset
599    tipsets: HashMap<TipsetKey, FullTipset>,
600    stateless_mode: bool,
601}
602
603impl<DB: Blockstore> SyncStateMachine<DB> {
604    pub fn new(
605        cs: Arc<ChainStore<DB>>,
606        bad_block_cache: Option<Arc<BadBlockCache>>,
607        stateless_mode: bool,
608    ) -> Self {
609        Self {
610            cs,
611            bad_block_cache,
612            tipsets: HashMap::default(),
613            stateless_mode,
614        }
615    }
616
617    // Compute the list of chains from the tipsets map
618    fn chains(&self) -> Vec<Vec<FullTipset>> {
619        let mut chains = Vec::new();
620        let mut remaining_tipsets = self.tipsets.clone();
621
622        while let Some(heaviest) = remaining_tipsets
623            .values()
624            .max_by_key(|ts| ts.weight())
625            .cloned()
626        {
627            // Build chain starting from heaviest
628            let mut chain = Vec::new();
629            let mut current = Some(heaviest);
630
631            while let Some(tipset) = current.take() {
632                remaining_tipsets.remove(tipset.key());
633
634                // Find parent in tipsets map
635                current = self.tipsets.get(tipset.parents()).cloned();
636
637                chain.push(tipset);
638            }
639            chain.reverse();
640            chains.push(chain);
641        }
642
643        chains
644    }
645
646    fn is_parent_validated(&self, tipset: &FullTipset) -> bool {
647        let db = self.cs.blockstore();
648        self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
649    }
650
651    fn is_ready_for_validation(&self, tipset: &FullTipset) -> bool {
652        if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
653            // Skip validation in stateless mode and for genesis tipset
654            true
655        } else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
656            let head_ts = self.cs.heaviest_tipset();
657            // Treat post-head-epoch tipsets as not validated to fix <https://github.com/ChainSafe/forest/issues/5677>
658            // basically, the follow task should always start from the current head which could be manually set
659            // to an old one. When a post-head-epoch tipset is considered validated, it could mess up the state machine
660            // in some edge cases and the node ends up being stuck with ever-empty sync task queue as reported
661            // in <https://github.com/ChainSafe/forest/issues/5679>.
662            if parent_ts.key() == head_ts.key() {
663                true
664            } else if parent_ts.epoch() >= head_ts.epoch() {
665                false
666            } else {
667                self.is_parent_validated(tipset)
668            }
669        } else {
670            false
671        }
672    }
673
674    fn add_full_tipset(&mut self, tipset: FullTipset) {
675        if let Err(why) = TipsetValidator(&tipset).validate(
676            &self.cs,
677            self.bad_block_cache.as_ref().map(AsRef::as_ref),
678            &self.cs.genesis_tipset(),
679            self.cs.chain_config().block_delay_secs,
680        ) {
681            metrics::INVALID_TIPSET_TOTAL.inc();
682            trace!("Skipping invalid tipset: {}", why);
683            self.mark_bad_tipset(tipset);
684            return;
685        }
686
687        // Check if tipset is outside the chain_finality window
688        let heaviest = self.cs.heaviest_tipset();
689        let epoch_diff = heaviest.epoch() - tipset.epoch();
690
691        if epoch_diff > self.cs.chain_config().policy.chain_finality {
692            self.mark_bad_tipset(tipset);
693            return;
694        }
695
696        // Check if tipset already exists
697        if self.tipsets.contains_key(tipset.key()) {
698            return;
699        }
700
701        // Find any existing tipsets with same epoch and parents
702        let mut to_remove = Vec::new();
703        #[allow(clippy::mutable_key_type)]
704        let mut merged_blocks: HashSet<_> = tipset.blocks().iter().cloned().collect();
705
706        // Collect all parent references from existing tipsets
707        let parent_refs: HashSet<_> = self
708            .tipsets
709            .values()
710            .map(|ts| ts.parents().clone())
711            .collect();
712
713        for (key, existing_ts) in self.tipsets.iter() {
714            if existing_ts.epoch() == tipset.epoch() && existing_ts.parents() == tipset.parents() {
715                // Only mark for removal if not referenced as a parent
716                if !parent_refs.contains(key) {
717                    to_remove.push(key.clone());
718                }
719                // Add blocks from existing tipset - HashSet handles deduplication automatically
720                merged_blocks.extend(existing_ts.blocks().iter().cloned());
721            }
722        }
723
724        // Remove old tipsets that were merged and aren't referenced
725        for key in to_remove {
726            self.tipsets.remove(&key);
727        }
728
729        // Create and insert new merged tipset
730        if let Ok(merged_tipset) = FullTipset::new(merged_blocks) {
731            self.tipsets
732                .insert(merged_tipset.key().clone(), merged_tipset);
733        }
734    }
735
736    // Mark blocks in tipset as bad.
737    // Mark all descendants of tipsets as bad.
738    // Remove all bad tipsets from the tipset map.
739    fn mark_bad_tipset(&mut self, tipset: FullTipset) {
740        let mut stack = vec![tipset];
741        while let Some(tipset) = stack.pop() {
742            self.tipsets.remove(tipset.key());
743            // Find all descendant tipsets (tipsets that have this tipset as a parent)
744            let mut to_remove = Vec::new();
745            let mut descendants = Vec::new();
746
747            for (key, ts) in self.tipsets.iter() {
748                if ts.parents() == tipset.key() {
749                    to_remove.push(key.clone());
750                    descendants.push(ts.clone());
751                }
752            }
753
754            // Remove bad tipsets from the map
755            for key in to_remove {
756                self.tipsets.remove(&key);
757            }
758
759            // Mark descendants as bad
760            stack.extend(descendants);
761        }
762    }
763
764    fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) {
765        if !self.is_parent_validated(&tipset) {
766            tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), parent_state = %tipset.parent_state(), "Parent tipset must be validated");
767            return;
768        }
769
770        self.tipsets.remove(tipset.key());
771        let tipset = tipset.into_tipset();
772        // cs.put_tipset requires state and doesn't work in stateless mode
773        if self.stateless_mode {
774            let epoch = tipset.epoch();
775            let terse_key = tipset.key().terse();
776            if self.cs.heaviest_tipset().weight() < tipset.weight() {
777                if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
778                    error!("Error setting heaviest tipset: {}", e);
779                } else {
780                    info!("Heaviest tipset: {} ({})", epoch, terse_key);
781                }
782            }
783        } else if is_proposed_head {
784            if let Err(e) = self.cs.put_tipset(&tipset) {
785                error!("Error putting tipset: {e}");
786            }
787        } else if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
788            error!("Error setting heaviest tipset: {e}");
789        }
790    }
791
792    pub fn update(&mut self, event: SyncEvent) {
793        tracing::trace!("update: {event}");
794        match event {
795            SyncEvent::NewFullTipsets(tipsets) => {
796                for tipset in tipsets {
797                    self.add_full_tipset(tipset);
798                }
799            }
800            SyncEvent::BadTipset(tipset) => self.mark_bad_tipset(tipset),
801            SyncEvent::ValidatedTipset {
802                tipset,
803                is_proposed_head,
804            } => self.mark_validated_tipset(tipset, is_proposed_head),
805        }
806    }
807
808    pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
809        // Get the node's current validated head epoch once, as it's the same for all forks.
810        let current_validated_epoch = self.cs.heaviest_tipset().epoch();
811        let now = Utc::now();
812
813        let mut active_sync_info = Vec::new();
814        let mut tasks = Vec::new();
815        for chain in self.chains() {
816            if let Some(first_ts) = chain.first() {
817                let last_ts = chain.last().expect("Infallible");
818                let stage: ForkSyncStage;
819                let start_time = Some(now);
820
821                if !self.is_ready_for_validation(first_ts) {
822                    stage = ForkSyncStage::FetchingHeaders;
823                    tasks.push(SyncTask::FetchTipset(
824                        first_ts.parents().clone(),
825                        first_ts.epoch(),
826                    ));
827                } else {
828                    stage = ForkSyncStage::ValidatingTipsets;
829                    tasks.push(SyncTask::ValidateTipset {
830                        tipset: first_ts.clone(),
831                        is_proposed_head: chain.len() == 1,
832                    });
833                }
834
835                let fork_info = ForkSyncInfo {
836                    target_tipset_key: last_ts.key().clone(),
837                    target_epoch: last_ts.epoch(),
838                    target_sync_epoch_start: first_ts.epoch(),
839                    stage,
840                    validated_chain_head_epoch: current_validated_epoch,
841                    start_time,
842                    last_updated: Some(now),
843                };
844
845                active_sync_info.push(fork_info);
846            }
847        }
848        (tasks, active_sync_info)
849    }
850}
851
852#[derive(PartialEq, Eq, Hash, Clone, Debug)]
853enum SyncTask {
854    ValidateTipset {
855        tipset: FullTipset,
856        is_proposed_head: bool,
857    },
858    FetchTipset(TipsetKey, ChainEpoch),
859}
860
861impl std::fmt::Display for SyncTask {
862    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
863        match self {
864            SyncTask::ValidateTipset {
865                tipset,
866                is_proposed_head,
867            } => write!(
868                f,
869                "ValidateTipset(epoch: {}, is_proposed_head: {is_proposed_head})",
870                tipset.epoch()
871            ),
872            SyncTask::FetchTipset(key, epoch) => {
873                let s = key.to_string();
874                write!(
875                    f,
876                    "FetchTipset({}, epoch: {})",
877                    &s[s.len().saturating_sub(8)..],
878                    epoch
879                )
880            }
881        }
882    }
883}
884
885impl SyncTask {
886    async fn execute<DB: Blockstore + Sync + Send + 'static>(
887        self,
888        network: SyncNetworkContext<DB>,
889        state_manager: Arc<StateManager<DB>>,
890        stateless_mode: bool,
891        bad_block_cache: Option<Arc<BadBlockCache>>,
892    ) -> Option<SyncEvent> {
893        tracing::trace!("SyncTask::execute {self}");
894        match self {
895            SyncTask::ValidateTipset {
896                tipset,
897                is_proposed_head,
898            } if stateless_mode => Some(SyncEvent::ValidatedTipset {
899                tipset,
900                is_proposed_head,
901            }),
902            SyncTask::ValidateTipset {
903                tipset,
904                is_proposed_head,
905            } => match validate_tipset(&state_manager, tipset.clone(), bad_block_cache).await {
906                Ok(()) => Some(SyncEvent::ValidatedTipset {
907                    tipset,
908                    is_proposed_head,
909                }),
910                // If temporal drift error, don't mark as bad, just skip validation and try again
911                // later. This mirrors internal logic where temporal drift doesn't mark a block as
912                // bad permanently, since it could be valid later on. If not done, a single
913                // time-traveling block could cause the node to be stuck without making progress.
914                Err(e) if matches!(e, TipsetSyncerError::TimeTravellingBlock { .. }) => {
915                    warn!("Time travelling block detected, skipping tipset for now: {e}");
916                    None
917                }
918                Err(e) => {
919                    warn!("Error validating tipset: {e}");
920                    Some(SyncEvent::BadTipset(tipset))
921                }
922            },
923            SyncTask::FetchTipset(key, epoch) => {
924                match get_full_tipset_batch(&network, state_manager.chain_store(), None, &key).await
925                {
926                    Ok(parents) => Some(SyncEvent::NewFullTipsets(parents)),
927                    Err(e) => {
928                        tracing::warn!(%key, %epoch, "failed to fetch tipset: {e:#}");
929                        None
930                    }
931                }
932            }
933        }
934    }
935}
936
937#[cfg(test)]
938mod tests {
939    use super::*;
940    use crate::blocks::{Chain4U, HeaderBuilder, chain4u};
941    use crate::db::MemoryDB;
942    use crate::utils::db::CborStoreExt as _;
943    use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
944    use num_bigint::BigInt;
945    use num_traits::ToPrimitive;
946    use std::sync::Arc;
947    use tracing::level_filters::LevelFilter;
948    use tracing_subscriber::EnvFilter;
949
950    fn setup() -> (Arc<ChainStore<MemoryDB>>, Chain4U<Arc<MemoryDB>>) {
951        // Initialize test logger
952        let _ = tracing_subscriber::fmt()
953            .without_time()
954            .with_env_filter(
955                EnvFilter::builder()
956                    .with_default_directive(LevelFilter::DEBUG.into())
957                    .from_env()
958                    .unwrap(),
959            )
960            .try_init();
961
962        let db = Arc::new(MemoryDB::default());
963
964        // Populate DB with message roots used by chain4u
965        {
966            let empty_amt = Amt::<Cid, _>::new(&db).flush().unwrap();
967            db.put_cbor_default(&crate::blocks::TxMeta {
968                bls_message_root: empty_amt,
969                secp_message_root: empty_amt,
970            })
971            .unwrap();
972        }
973
974        // Create a chain of 5 tipsets using Chain4U
975        let c4u = Chain4U::with_blockstore(db.clone());
976        chain4u! {
977            in c4u;
978            [genesis_header = dummy_node(&db, 0)]
979        };
980
981        let cs = Arc::new(
982            ChainStore::new(
983                db.clone(),
984                db.clone(),
985                db.clone(),
986                Default::default(),
987                genesis_header.clone().into(),
988            )
989            .unwrap(),
990        );
991
992        cs.set_heaviest_tipset(cs.genesis_tipset()).unwrap();
993
994        (cs, c4u)
995    }
996
997    fn dummy_state(db: impl Blockstore, i: ChainEpoch) -> Cid {
998        db.put_cbor_default(&i).unwrap()
999    }
1000
1001    fn dummy_node(db: impl Blockstore, i: ChainEpoch) -> HeaderBuilder {
1002        HeaderBuilder {
1003            state_root: dummy_state(db, i).into(),
1004            weight: BigInt::from(i).into(),
1005            epoch: i.into(),
1006            ..Default::default()
1007        }
1008    }
1009
1010    #[test]
1011    fn test_state_machine_validation_order() {
1012        let (cs, c4u) = setup();
1013        let db = cs.blockstore().clone();
1014
1015        chain4u! {
1016            from [genesis_header] in c4u;
1017            [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)] -> [c = dummy_node(&db, 3)] -> [d = dummy_node(&db, 4)] -> [e = dummy_node(&db, 5)]
1018        };
1019
1020        // Create the state machine
1021        let mut state_machine = SyncStateMachine::new(cs, Default::default(), true);
1022
1023        // Insert tipsets in random order
1024        let tipsets = vec![e, b, d, c, a];
1025
1026        // Convert each block into a FullTipset and add it to the state machine
1027        for block in tipsets {
1028            let full_tipset = FullTipset::new(vec![Block {
1029                header: block.clone().into(),
1030                bls_messages: vec![],
1031                secp_messages: vec![],
1032            }])
1033            .unwrap();
1034            state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1035        }
1036
1037        // Record validation order by processing all validation tasks in each iteration
1038        let mut validation_tasks = Vec::new();
1039        loop {
1040            let (tasks, _) = state_machine.tasks();
1041
1042            // Find all validation tasks
1043            let validation_tipsets: Vec<_> = tasks
1044                .into_iter()
1045                .filter_map(|task| {
1046                    if let SyncTask::ValidateTipset {
1047                        tipset,
1048                        is_proposed_head,
1049                    } = task
1050                    {
1051                        Some((tipset, is_proposed_head))
1052                    } else {
1053                        None
1054                    }
1055                })
1056                .collect();
1057
1058            if validation_tipsets.is_empty() {
1059                break;
1060            }
1061
1062            // Record and mark all tipsets as validated
1063            for (ts, is_proposed_head) in validation_tipsets {
1064                validation_tasks.push(ts.epoch());
1065                db.put_cbor_default(&ts.epoch()).unwrap();
1066                state_machine.mark_validated_tipset(ts, is_proposed_head);
1067            }
1068        }
1069
1070        // We expect validation tasks for epochs 1 through 5 in order
1071        assert_eq!(validation_tasks, vec![1, 2, 3, 4, 5]);
1072    }
1073
1074    #[test]
1075    fn test_sync_state_machine_chain_fragments() {
1076        let (cs, c4u) = setup();
1077        let db = cs.blockstore().clone();
1078
1079        // Create a forked chain
1080        // genesis -> a -> b
1081        //            \--> c
1082        chain4u! {
1083            in c4u;
1084            [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)]
1085        };
1086        chain4u! {
1087            from [a] in c4u;
1088            [c = dummy_node(&db, 3)]
1089        };
1090
1091        // Create the state machine
1092        let mut state_machine = SyncStateMachine::new(cs, Default::default(), false);
1093
1094        // Convert each block into a FullTipset and add it to the state machine
1095        for block in [a, b, c] {
1096            let full_tipset = FullTipset::new(vec![Block {
1097                header: block.clone().into(),
1098                bls_messages: vec![],
1099                secp_messages: vec![],
1100            }])
1101            .unwrap();
1102            state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1103        }
1104
1105        let chains = state_machine
1106            .chains()
1107            .into_iter()
1108            .map(|v| {
1109                v.into_iter()
1110                    .map(|ts| ts.weight().to_i64().unwrap_or(0))
1111                    .collect_vec()
1112            })
1113            .collect_vec();
1114
1115        // Both chains should start at the same tipset
1116        assert_eq!(chains, vec![vec![1, 3], vec![1, 2]]);
1117    }
1118}