Skip to main content

forest/chain_sync/
chain_follower.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3//! This module contains the logic for driving Forest forward in the Filecoin
4//! blockchain.
5//!
6//! Forest keeps track of the current heaviest tipset, and receives information
7//! about new blocks and tipsets from peers as well as connected miners. The
8//! state machine has the following rules:
9//! - A tipset is invalid if its parent is invalid.
10//! - If a tipset's parent isn't in our database, request it from the network.
11//! - If a tipset's parent has been validated, validate the tipset.
12//! - If a tipset is 1 day older than the heaviest tipset, the tipset is
13//!   invalid. This prevents Forest from following forks that will never be
14//!   accepted.
15//!
16//! The state machine does not do any network requests or validation. Those are
17//! handled by an external actor.
18
19use super::network_context::SyncNetworkContext;
20use crate::{
21    blocks::{Block, FullTipset, Tipset, TipsetKey},
22    chain::{ChainStore, index::ResolveNullTipset},
23    chain_sync::{
24        ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
25        bad_block_cache::{BadBlockCache, SeenBlockCache},
26        metrics,
27        tipset_syncer::{TipsetSyncerError, validate_tipset},
28        validation::GossipBlockValidator,
29    },
30    db::EthMappingsStore,
31    libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
32    message_pool::MessagePool,
33    networks::calculate_expected_epoch,
34    shim::clock::ChainEpoch,
35    state_manager::StateManager,
36    utils::ShallowClone as _,
37};
38use ahash::{HashMap, HashSet};
39use chrono::Utc;
40use cid::Cid;
41use fvm_ipld_blockstore::Blockstore;
42use itertools::Itertools;
43use libp2p::PeerId;
44use parking_lot::{Mutex, RwLock};
45use std::{sync::Arc, time::Instant};
46use tokio::{sync::Notify, task::JoinSet};
47use tracing::{debug, error, info, trace, warn};
48
49pub struct ChainFollower<DB> {
50    /// Tasks
51    tasks: Arc<Mutex<HashSet<SyncTask>>>,
52
53    /// State machine
54    state_machine: Arc<Mutex<SyncStateMachine<DB>>>,
55
56    /// Syncing status of the chain
57    pub sync_status: SyncStatus,
58
59    /// manages retrieving and updates state objects
60    pub state_manager: Arc<StateManager<DB>>,
61
62    /// Context to be able to send requests to P2P network
63    pub network: SyncNetworkContext<DB>,
64
65    /// Genesis tipset
66    genesis: Tipset,
67
68    /// Bad blocks cache, updates based on invalid state transitions.
69    /// Will mark any invalid blocks and all children as bad in this bounded
70    /// cache
71    pub bad_blocks: Option<Arc<BadBlockCache>>,
72
73    /// Incoming network events to be handled by synchronizer
74    net_handler: flume::Receiver<NetworkEvent>,
75
76    /// Tipset channel sender
77    pub tipset_sender: flume::Sender<FullTipset>,
78
79    /// Tipset channel receiver
80    tipset_receiver: flume::Receiver<FullTipset>,
81
82    /// When `stateless_mode` is true, forest connects to the P2P network but
83    /// does not execute any state transitions. This drastically reduces the
84    /// memory and disk footprint of Forest but also means that Forest will not
85    /// be able to validate the correctness of the chain.
86    stateless_mode: bool,
87
88    /// Message pool
89    mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
90}
91
92impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
93    pub fn new(
94        state_manager: Arc<StateManager<DB>>,
95        network: SyncNetworkContext<DB>,
96        genesis: Tipset,
97        net_handler: flume::Receiver<NetworkEvent>,
98        stateless_mode: bool,
99        mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
100    ) -> Self {
101        crate::def_is_env_truthy!(cache_disabled, "FOREST_DISABLE_BAD_BLOCK_CACHE");
102        let (tipset_sender, tipset_receiver) = flume::bounded(20);
103        let tasks: Arc<Mutex<HashSet<SyncTask>>> = Arc::new(Mutex::new(HashSet::default()));
104        let bad_blocks = if cache_disabled() {
105            tracing::warn!("bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`");
106            None
107        } else {
108            Some(Default::default())
109        };
110        let state_machine = Arc::new(Mutex::new(SyncStateMachine::new(
111            state_manager.chain_store().clone(),
112            bad_blocks.clone(),
113            stateless_mode,
114        )));
115        Self {
116            tasks,
117            state_machine,
118            sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
119            state_manager,
120            network,
121            genesis,
122            bad_blocks,
123            net_handler,
124            tipset_sender,
125            tipset_receiver,
126            stateless_mode,
127            mem_pool,
128        }
129    }
130
131    /// Reset inner states
132    pub fn reset(&self) {
133        let start = Instant::now();
134        self.tasks.lock().clear();
135        self.state_manager
136            .chain_store()
137            .validated_blocks
138            .lock()
139            .clear();
140        self.state_machine.lock().tipsets.clear();
141        if let Some(bad_blocks) = &self.bad_blocks {
142            bad_blocks.clear();
143        }
144        tracing::info!(
145            "chain follower reset, took {}",
146            humantime::format_duration(start.elapsed())
147        );
148    }
149
150    pub async fn run(&self) -> anyhow::Result<()>
151    where
152        DB: EthMappingsStore,
153    {
154        chain_follower(
155            &self.tasks,
156            &self.state_machine,
157            &self.state_manager,
158            self.bad_blocks.clone(),
159            self.net_handler.clone(),
160            self.tipset_receiver.clone(),
161            &self.network,
162            &self.mem_pool,
163            &self.sync_status,
164            &self.genesis,
165            self.stateless_mode,
166        )
167        .await
168    }
169}
170
171#[allow(clippy::too_many_arguments)]
172// We receive new full tipsets from the p2p swarm, and from miners that use Forest as their frontend.
173async fn chain_follower<DB: Blockstore + EthMappingsStore + Sync + Send + 'static>(
174    tasks: &Arc<Mutex<HashSet<SyncTask>>>,
175    state_machine: &Arc<Mutex<SyncStateMachine<DB>>>,
176    state_manager: &Arc<StateManager<DB>>,
177    bad_block_cache: Option<Arc<BadBlockCache>>,
178    network_rx: flume::Receiver<NetworkEvent>,
179    tipset_receiver: flume::Receiver<FullTipset>,
180    network: &SyncNetworkContext<DB>,
181    mem_pool: &Arc<MessagePool<Arc<ChainStore<DB>>>>,
182    sync_status: &SyncStatus,
183    genesis: &Tipset,
184    stateless_mode: bool,
185) -> anyhow::Result<()> {
186    let state_changed = Arc::new(Notify::new());
187
188    let seen_block_cache = SeenBlockCache::default();
189
190    let mut set = JoinSet::new();
191
192    // Increment metrics, update peer information, and forward tipsets to the state machine.
193    set.spawn({
194        let state_manager = state_manager.shallow_clone();
195        let state_changed = state_changed.shallow_clone();
196        let state_machine = state_machine.shallow_clone();
197        let network = network.shallow_clone();
198        let mem_pool = mem_pool.shallow_clone();
199        let genesis = genesis.shallow_clone();
200        let bad_block_cache = bad_block_cache.shallow_clone();
201        let seen_block_cache = seen_block_cache.shallow_clone();
202        async move {
203            while let Ok(event) = network_rx.recv_async().await {
204                inc_gossipsub_event_metrics(&event);
205
206                update_peer_info(
207                    &event,
208                    &network,
209                    state_manager.chain_store().clone(),
210                    &genesis,
211                );
212
213                let Ok(tipset) = (match event {
214                    NetworkEvent::HelloResponseOutbound { request, source } => {
215                        let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
216                        get_full_tipset(
217                            &network,
218                            state_manager.chain_store(),
219                            Some(source),
220                            &tipset_keys,
221                        )
222                        .await
223                        .inspect_err(|e| debug!("Querying full tipset failed: {}", e))
224                    }
225                    NetworkEvent::PubsubMessage { message } => match message {
226                        PubsubMessage::Block(b) => {
227                            let cs = state_manager.chain_store();
228                            let cfg = cs.chain_config();
229                            if let Err(reason) = GossipBlockValidator::new(&b).validate_pre_fetch(
230                                &genesis,
231                                cfg.block_delay_secs,
232                                cfg.policy.chain_finality,
233                                cs.heaviest_tipset().epoch(),
234                                bad_block_cache.as_deref(),
235                                &seen_block_cache,
236                            ) {
237                                metrics::GOSSIP_BLOCK_REJECTED_TOTAL
238                                    .get_or_create(&metrics::GossipRejectReasonLabel {
239                                        reason: reason.label(),
240                                    })
241                                    .inc();
242                                debug!("Rejected gossip block {}: {reason}", b.header.cid());
243                                continue;
244                            }
245                            let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
246                            get_full_tipset(&network, cs, None, &key).await
247                        }
248                        PubsubMessage::Message(m) => {
249                            if let Err(why) = mem_pool.add(m) {
250                                debug!("Received invalid GossipSub message: {}", why);
251                            }
252                            continue;
253                        }
254                    },
255                    _ => continue,
256                }) else {
257                    continue;
258                };
259                {
260                    state_machine
261                        .lock()
262                        .update(SyncEvent::NewFullTipsets(vec![tipset]));
263                    state_changed.notify_one();
264                }
265            }
266        }
267    });
268
269    // Forward tipsets from miners into the state machine.
270    set.spawn({
271        let state_changed = state_changed.clone();
272        let state_machine = state_machine.clone();
273
274        async move {
275            while let Ok(tipset) = tipset_receiver.recv_async().await {
276                state_machine
277                    .lock()
278                    .update(SyncEvent::NewFullTipsets(vec![tipset]));
279                state_changed.notify_one();
280            }
281        }
282    });
283
284    // When the state machine is updated, we need to update the sync status and spawn tasks
285    set.spawn({
286        let state_manager = state_manager.shallow_clone();
287        let state_machine = state_machine.shallow_clone();
288        let network = network.shallow_clone();
289        let sync_status = sync_status.shallow_clone();
290        let state_changed = state_changed.shallow_clone();
291        let tasks = tasks.shallow_clone();
292        let bad_block_cache = bad_block_cache.shallow_clone();
293        async move {
294            loop {
295                state_changed.notified().await;
296
297                let mut tasks_set = tasks.lock();
298                let (task_vec, current_active_forks) = state_machine.lock().tasks();
299
300                // Update the sync states
301                {
302                    let old_status_report = sync_status.read().clone();
303                    let new_status_report = old_status_report.update(
304                        &state_manager,
305                        current_active_forks,
306                        stateless_mode,
307                    );
308
309                    sync_status.write().clone_from(&new_status_report);
310                }
311
312                for task in task_vec {
313                    // insert task into tasks. If task is already in tasks, skip. If it is not, spawn it.
314                    let new = tasks_set.insert(task.clone());
315                    if new {
316                        let action = task.clone().execute(
317                            network.shallow_clone(),
318                            state_manager.shallow_clone(),
319                            stateless_mode,
320                            bad_block_cache.shallow_clone(),
321                        );
322                        tokio::spawn({
323                            let tasks = tasks.clone();
324                            let state_machine = state_machine.clone();
325                            let state_changed = state_changed.clone();
326                            async move {
327                                if let Some(event) = action.await {
328                                    state_machine.lock().update(event);
329                                    state_changed.notify_one();
330                                }
331                                tasks.lock().remove(&task);
332                            }
333                        });
334                    }
335                }
336            }
337        }
338    });
339
340    // Periodically report progress if there are any tipsets left to be fetched.
341    // Once we're in steady-state (i.e. caught up to HEAD) and there are no
342    // active forks, this will not report anything.
343    set.spawn({
344        let state_manager = state_manager.clone();
345        let state_machine = state_machine.clone();
346        async move {
347            loop {
348                tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
349                let (tasks_set, _) = state_machine.lock().tasks();
350                let heaviest_tipset = state_manager.chain_store().heaviest_tipset();
351                let heaviest_epoch = heaviest_tipset.epoch();
352
353                let to_download = tasks_set
354                    .iter()
355                    .filter_map(|task| match task {
356                        SyncTask::FetchTipset(_, epoch) => Some(epoch - heaviest_epoch),
357                        _ => None,
358                    })
359                    .max()
360                    .unwrap_or(0);
361
362                let expected_head = calculate_expected_epoch(
363                    Utc::now().timestamp() as u64,
364                    state_manager.chain_store().genesis_block_header().timestamp,
365                    state_manager.chain_config().block_delay_secs,
366                );
367
368                // Only print 'Catching up to HEAD' if we're more than 10 epochs
369                // behind. Otherwise it can be too spammy.
370                match (expected_head - heaviest_epoch > 10, to_download > 0) {
371                    (true, true) => info!(
372                        "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}, downloading {to_download} tipsets"
373                        , heaviest_tipset.key()
374                    ),
375                    (true, false) => info!(
376                        "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}"
377                        , heaviest_tipset.key()
378                    ),
379                    (false, true) => {
380                        info!("Downloading {to_download} tipsets")
381                    }
382                    (false, false) => {}
383                }
384            }
385        }
386    });
387
388    set.join_all().await;
389    Ok(())
390}
391
392// Increment the gossipsub event metrics.
393fn inc_gossipsub_event_metrics(event: &NetworkEvent) {
394    let label = match event {
395        NetworkEvent::HelloRequestInbound => metrics::values::HELLO_REQUEST_INBOUND,
396        NetworkEvent::HelloResponseOutbound { .. } => metrics::values::HELLO_RESPONSE_OUTBOUND,
397        NetworkEvent::HelloRequestOutbound => metrics::values::HELLO_REQUEST_OUTBOUND,
398        NetworkEvent::HelloResponseInbound => metrics::values::HELLO_RESPONSE_INBOUND,
399        NetworkEvent::PeerConnected(_) => metrics::values::PEER_CONNECTED,
400        NetworkEvent::PeerDisconnected(_) => metrics::values::PEER_DISCONNECTED,
401        NetworkEvent::PubsubMessage { message } => match message {
402            PubsubMessage::Block(_) => metrics::values::PUBSUB_BLOCK,
403            PubsubMessage::Message(_) => metrics::values::PUBSUB_MESSAGE,
404        },
405        NetworkEvent::ChainExchangeRequestOutbound => {
406            metrics::values::CHAIN_EXCHANGE_REQUEST_OUTBOUND
407        }
408        NetworkEvent::ChainExchangeResponseInbound => {
409            metrics::values::CHAIN_EXCHANGE_RESPONSE_INBOUND
410        }
411        NetworkEvent::ChainExchangeRequestInbound => {
412            metrics::values::CHAIN_EXCHANGE_REQUEST_INBOUND
413        }
414        NetworkEvent::ChainExchangeResponseOutbound => {
415            metrics::values::CHAIN_EXCHANGE_RESPONSE_OUTBOUND
416        }
417    };
418
419    metrics::LIBP2P_MESSAGE_TOTAL.get_or_create(&label).inc();
420}
421
422// Keep our peer manager up to date.
423fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
424    event: &NetworkEvent,
425    network: &SyncNetworkContext<DB>,
426    chain_store: Arc<ChainStore<DB>>,
427    genesis: &Tipset,
428) {
429    match event {
430        NetworkEvent::PeerConnected(peer_id) => {
431            let genesis_cid = *genesis.block_headers().first().cid();
432            // Spawn and immediately move on to the next event
433            tokio::task::spawn(handle_peer_connected_event(
434                network.shallow_clone(),
435                chain_store,
436                *peer_id,
437                genesis_cid,
438            ));
439        }
440        NetworkEvent::PeerDisconnected(peer_id) => {
441            handle_peer_disconnected_event(network, *peer_id);
442        }
443        _ => {}
444    }
445}
446
447async fn handle_peer_connected_event<DB: Blockstore + Sync + Send + 'static>(
448    network: SyncNetworkContext<DB>,
449    chain_store: Arc<ChainStore<DB>>,
450    peer_id: PeerId,
451    genesis_block_cid: Cid,
452) {
453    // Query the heaviest TipSet from the store
454    if network.peer_manager().is_peer_new(&peer_id) {
455        // Since the peer is new, send them a hello request
456        // Query the heaviest TipSet from the store
457        let heaviest = chain_store.heaviest_tipset();
458        let request = HelloRequest {
459            heaviest_tip_set: heaviest.cids(),
460            heaviest_tipset_height: heaviest.epoch(),
461            heaviest_tipset_weight: heaviest.weight().clone().into(),
462            genesis_cid: genesis_block_cid,
463        };
464        let (peer_id, moment_sent, response) = match network.hello_request(peer_id, request).await {
465            Ok(response) => response,
466            Err(e) => {
467                debug!("Hello request failed: {}", e);
468                return;
469            }
470        };
471        let dur = Instant::now().duration_since(moment_sent);
472
473        // Update the peer metadata based on the response
474        match response {
475            Some(_) => {
476                network.peer_manager().log_success(&peer_id, dur);
477            }
478            None => {
479                network.peer_manager().log_failure(&peer_id, dur);
480            }
481        }
482    }
483}
484
485fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
486    network: &SyncNetworkContext<DB>,
487    peer_id: PeerId,
488) {
489    network.peer_manager().remove_peer(&peer_id);
490    network.peer_manager().unmark_peer_bad(&peer_id);
491}
492
493pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
494    network: &SyncNetworkContext<DB>,
495    chain_store: &ChainStore<DB>,
496    peer_id: Option<PeerId>,
497    tipset_keys: &TipsetKey,
498) -> anyhow::Result<FullTipset> {
499    // Attempt to load from the store
500    if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
501        return Ok(full_tipset);
502    }
503    // Load from the network
504    let tipset = network
505        .chain_exchange_full_tipset(peer_id, tipset_keys)
506        .await
507        .map_err(|e| anyhow::anyhow!(e))?;
508    tipset.persist(chain_store.blockstore())?;
509
510    Ok(tipset)
511}
512
513async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
514    network: &SyncNetworkContext<DB>,
515    chain_store: &ChainStore<DB>,
516    peer_id: Option<PeerId>,
517    tipset_keys: &TipsetKey,
518) -> anyhow::Result<Vec<FullTipset>> {
519    // Attempt to load from the store
520    if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
521        return Ok(vec![full_tipset]);
522    }
523    // Load from the network
524    let tipsets = network
525        .chain_exchange_full_tipsets(peer_id, tipset_keys)
526        .await
527        .map_err(|e| anyhow::anyhow!(e))?;
528
529    for tipset in tipsets.iter() {
530        tipset.persist(chain_store.blockstore())?;
531    }
532
533    Ok(tipsets)
534}
535
536pub fn load_full_tipset<DB: Blockstore>(
537    chain_store: &ChainStore<DB>,
538    tipset_keys: &TipsetKey,
539) -> anyhow::Result<FullTipset> {
540    // Retrieve tipset from store based on passed in TipsetKey
541    let ts = chain_store
542        .chain_index()
543        .load_required_tipset(tipset_keys)?;
544    let blocks: Vec<_> = ts
545        .block_headers()
546        .iter()
547        .map(|header| -> anyhow::Result<Block> {
548            let (bls_msgs, secp_msgs) =
549                crate::chain::block_messages(chain_store.blockstore(), header)?;
550            Ok(Block {
551                header: header.clone(),
552                bls_messages: bls_msgs,
553                secp_messages: secp_msgs,
554            })
555        })
556        .try_collect()?;
557    // Construct FullTipset
558    let fts = FullTipset::new(blocks)?;
559    Ok(fts)
560}
561
562enum SyncEvent {
563    NewFullTipsets(Vec<FullTipset>),
564    BadTipset(FullTipset),
565    ValidatedTipset {
566        tipset: FullTipset,
567        is_proposed_head: bool,
568    },
569}
570
571impl std::fmt::Display for SyncEvent {
572    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
573        fn tss_to_string(tss: &[FullTipset]) -> String {
574            format!(
575                "epoch: {}-{}",
576                tss.first().map(|ts| ts.epoch()).unwrap_or_default(),
577                tss.last().map(|ts| ts.epoch()).unwrap_or_default()
578            )
579        }
580
581        match self {
582            Self::NewFullTipsets(tss) => write!(f, "NewFullTipsets({})", tss_to_string(tss)),
583            Self::BadTipset(ts) => {
584                write!(f, "BadTipset(epoch: {}, key: {})", ts.epoch(), ts.key())
585            }
586            Self::ValidatedTipset {
587                tipset,
588                is_proposed_head,
589            } => write!(
590                f,
591                "ValidatedTipset(epoch: {}, key: {}, is_proposed_head: {is_proposed_head})",
592                tipset.epoch(),
593                tipset.key()
594            ),
595        }
596    }
597}
598
599struct SyncStateMachine<DB> {
600    cs: Arc<ChainStore<DB>>,
601    bad_block_cache: Option<Arc<BadBlockCache>>,
602    // Map from TipsetKey to FullTipset
603    tipsets: HashMap<TipsetKey, FullTipset>,
604    stateless_mode: bool,
605}
606
607impl<DB: Blockstore> SyncStateMachine<DB> {
608    pub fn new(
609        cs: Arc<ChainStore<DB>>,
610        bad_block_cache: Option<Arc<BadBlockCache>>,
611        stateless_mode: bool,
612    ) -> Self {
613        Self {
614            cs,
615            bad_block_cache,
616            tipsets: HashMap::default(),
617            stateless_mode,
618        }
619    }
620
621    // Compute the list of chains from the tipsets map
622    fn chains(&self) -> Vec<Vec<FullTipset>> {
623        let mut chains = Vec::new();
624        let mut remaining_tipsets = self.tipsets.clone();
625
626        while let Some(heaviest) = remaining_tipsets
627            .values()
628            .max_by_key(|ts| ts.weight())
629            .cloned()
630        {
631            // Build chain starting from heaviest
632            let mut chain = Vec::new();
633            let mut current = Some(heaviest);
634
635            while let Some(tipset) = current.take() {
636                remaining_tipsets.remove(tipset.key());
637
638                // Find parent in tipsets map
639                current = self.tipsets.get(tipset.parents()).cloned();
640
641                chain.push(tipset);
642            }
643            chain.reverse();
644            chains.push(chain);
645        }
646
647        chains
648    }
649
650    fn is_parent_validated(&self, tipset: &FullTipset) -> bool {
651        let db = self.cs.blockstore();
652        self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
653    }
654
655    fn is_ready_for_validation(&self, tipset: &FullTipset) -> bool {
656        if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
657            // Skip validation in stateless mode and for genesis tipset
658            true
659        } else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
660            let head_ts = self.cs.heaviest_tipset();
661            // Treat post-head-epoch tipsets as not validated to fix <https://github.com/ChainSafe/forest/issues/5677>
662            // basically, the follow task should always start from the current head which could be manually set
663            // to an old one. When a post-head-epoch tipset is considered validated, it could mess up the state machine
664            // in some edge cases and the node ends up being stuck with ever-empty sync task queue as reported
665            // in <https://github.com/ChainSafe/forest/issues/5679>.
666            if parent_ts.key() == head_ts.key() {
667                true
668            } else if parent_ts.epoch() >= head_ts.epoch() {
669                false
670            } else {
671                self.is_parent_validated(tipset)
672            }
673        } else {
674            false
675        }
676    }
677
678    fn add_full_tipset(&mut self, tipset: FullTipset)
679    where
680        DB: EthMappingsStore,
681    {
682        if let Err(why) = TipsetValidator(&tipset).validate(
683            &self.cs,
684            self.bad_block_cache.as_ref().map(AsRef::as_ref),
685            &self.cs.genesis_tipset(),
686            self.cs.chain_config().block_delay_secs,
687        ) {
688            metrics::INVALID_TIPSET_TOTAL.inc();
689            trace!("Skipping invalid tipset: {}", why);
690            self.mark_bad_tipset(tipset);
691            return;
692        }
693
694        // Check if tipset is outside the chain_finality window
695        let heaviest = self.cs.heaviest_tipset();
696        let epoch_diff = heaviest.epoch() - tipset.epoch();
697
698        if epoch_diff > self.cs.chain_config().policy.chain_finality {
699            self.mark_bad_tipset(tipset);
700            return;
701        }
702
703        // Check if tipset already exists
704        if self.tipsets.contains_key(tipset.key()) {
705            return;
706        }
707
708        // Skip if tipset is part of the current chain.
709        if let Ok(Some(ts)) = self.cs.chain_index().tipset_by_height(
710            tipset.epoch(),
711            self.cs.heaviest_tipset(),
712            ResolveNullTipset::TakeOlder,
713        ) && ts.key() == tipset.key()
714        {
715            return;
716        }
717
718        // Find any existing tipsets with same epoch and parents
719        let mut to_remove = Vec::new();
720        #[allow(clippy::mutable_key_type)]
721        let mut merged_blocks: HashSet<_> = tipset.blocks().iter().cloned().collect();
722
723        // Collect all parent references from existing tipsets
724        let parent_refs: HashSet<_> = self
725            .tipsets
726            .values()
727            .map(|ts| ts.parents().clone())
728            .collect();
729
730        for (key, existing_ts) in self.tipsets.iter() {
731            if existing_ts.epoch() == tipset.epoch() && existing_ts.parents() == tipset.parents() {
732                // Only mark for removal if not referenced as a parent
733                if !parent_refs.contains(key) {
734                    to_remove.push(key.clone());
735                }
736                // Add blocks from existing tipset - HashSet handles deduplication automatically
737                merged_blocks.extend(existing_ts.blocks().iter().cloned());
738            }
739        }
740
741        // Remove old tipsets that were merged and aren't referenced
742        for key in to_remove {
743            self.tipsets.remove(&key);
744        }
745
746        // Create and insert new merged tipset
747        if let Ok(merged_tipset) = FullTipset::new(merged_blocks) {
748            self.tipsets
749                .insert(merged_tipset.key().clone(), merged_tipset);
750        }
751    }
752
753    // Mark blocks in tipset as bad.
754    // Mark all descendants of tipsets as bad.
755    // Remove all bad tipsets from the tipset map.
756    fn mark_bad_tipset(&mut self, tipset: FullTipset) {
757        let mut stack = vec![tipset];
758        while let Some(tipset) = stack.pop() {
759            self.tipsets.remove(tipset.key());
760            // Find all descendant tipsets (tipsets that have this tipset as a parent)
761            let mut to_remove = Vec::new();
762            let mut descendants = Vec::new();
763
764            for (key, ts) in self.tipsets.iter() {
765                if ts.parents() == tipset.key() {
766                    to_remove.push(key.clone());
767                    descendants.push(ts.clone());
768                }
769            }
770
771            // Remove bad tipsets from the map
772            for key in to_remove {
773                self.tipsets.remove(&key);
774            }
775
776            // Mark descendants as bad
777            stack.extend(descendants);
778        }
779    }
780
781    fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) {
782        if !self.is_parent_validated(&tipset) {
783            tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), parent_state = %tipset.parent_state(), "Parent tipset must be validated");
784            return;
785        }
786
787        self.tipsets.remove(tipset.key());
788        let tipset = tipset.into_tipset();
789        // cs.put_tipset requires state and doesn't work in stateless mode
790        if self.stateless_mode {
791            let epoch = tipset.epoch();
792            let terse_key = tipset.key().terse();
793            if self.cs.heaviest_tipset().weight() < tipset.weight() {
794                if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
795                    error!("Error setting heaviest tipset: {}", e);
796                } else {
797                    info!("Heaviest tipset: {} ({})", epoch, terse_key);
798                }
799            }
800        } else if is_proposed_head {
801            if let Err(e) = self.cs.put_tipset(&tipset) {
802                error!("Error putting tipset: {e}");
803            }
804        } else if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
805            error!("Error setting heaviest tipset: {e}");
806        }
807    }
808
809    pub fn update(&mut self, event: SyncEvent)
810    where
811        DB: EthMappingsStore,
812    {
813        tracing::trace!("update: {event}");
814        match event {
815            SyncEvent::NewFullTipsets(tipsets) => {
816                for tipset in tipsets {
817                    self.add_full_tipset(tipset);
818                }
819            }
820            SyncEvent::BadTipset(tipset) => self.mark_bad_tipset(tipset),
821            SyncEvent::ValidatedTipset {
822                tipset,
823                is_proposed_head,
824            } => self.mark_validated_tipset(tipset, is_proposed_head),
825        }
826    }
827
828    pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
829        // Get the node's current validated head epoch once, as it's the same for all forks.
830        let current_validated_epoch = self.cs.heaviest_tipset().epoch();
831        let now = Utc::now();
832
833        let mut active_sync_info = Vec::new();
834        let mut tasks = Vec::new();
835        for chain in self.chains() {
836            if let Some(first_ts) = chain.first() {
837                let last_ts = chain.last().expect("Infallible");
838                let stage: ForkSyncStage;
839                let start_time = Some(now);
840
841                if !self.is_ready_for_validation(first_ts) {
842                    stage = ForkSyncStage::FetchingHeaders;
843                    tasks.push(SyncTask::FetchTipset(
844                        first_ts.parents().clone(),
845                        first_ts.epoch(),
846                    ));
847                } else {
848                    stage = ForkSyncStage::ValidatingTipsets;
849                    tasks.push(SyncTask::ValidateTipset {
850                        tipset: first_ts.clone(),
851                        is_proposed_head: chain.len() == 1,
852                    });
853                }
854
855                let fork_info = ForkSyncInfo {
856                    target_tipset_key: last_ts.key().clone(),
857                    target_epoch: last_ts.epoch(),
858                    target_sync_epoch_start: first_ts.epoch(),
859                    stage,
860                    validated_chain_head_epoch: current_validated_epoch,
861                    start_time,
862                    last_updated: Some(now),
863                };
864
865                active_sync_info.push(fork_info);
866            }
867        }
868        (tasks, active_sync_info)
869    }
870}
871
872#[derive(PartialEq, Eq, Hash, Clone, Debug)]
873enum SyncTask {
874    ValidateTipset {
875        tipset: FullTipset,
876        is_proposed_head: bool,
877    },
878    FetchTipset(TipsetKey, ChainEpoch),
879}
880
881impl std::fmt::Display for SyncTask {
882    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
883        match self {
884            SyncTask::ValidateTipset {
885                tipset,
886                is_proposed_head,
887            } => write!(
888                f,
889                "ValidateTipset(epoch: {}, is_proposed_head: {is_proposed_head})",
890                tipset.epoch()
891            ),
892            SyncTask::FetchTipset(key, epoch) => {
893                let s = key.to_string();
894                write!(
895                    f,
896                    "FetchTipset({}, epoch: {})",
897                    &s[s.len().saturating_sub(8)..],
898                    epoch
899                )
900            }
901        }
902    }
903}
904
905impl SyncTask {
906    async fn execute<DB: Blockstore + EthMappingsStore + Sync + Send + 'static>(
907        self,
908        network: SyncNetworkContext<DB>,
909        state_manager: Arc<StateManager<DB>>,
910        stateless_mode: bool,
911        bad_block_cache: Option<Arc<BadBlockCache>>,
912    ) -> Option<SyncEvent> {
913        tracing::trace!("SyncTask::execute {self}");
914        match self {
915            SyncTask::ValidateTipset {
916                tipset,
917                is_proposed_head,
918            } if stateless_mode => Some(SyncEvent::ValidatedTipset {
919                tipset,
920                is_proposed_head,
921            }),
922            SyncTask::ValidateTipset {
923                tipset,
924                is_proposed_head,
925            } => match validate_tipset(&state_manager, tipset.clone(), bad_block_cache).await {
926                Ok(()) => Some(SyncEvent::ValidatedTipset {
927                    tipset,
928                    is_proposed_head,
929                }),
930                // If temporal drift error, don't mark as bad, just skip validation and try again
931                // later. This mirrors internal logic where temporal drift doesn't mark a block as
932                // bad permanently, since it could be valid later on. If not done, a single
933                // time-traveling block could cause the node to be stuck without making progress.
934                Err(e) if matches!(e, TipsetSyncerError::TimeTravellingBlock { .. }) => {
935                    warn!("Time travelling block detected, skipping tipset for now: {e}");
936                    None
937                }
938                Err(e) => {
939                    warn!("Error validating tipset: {e}");
940                    Some(SyncEvent::BadTipset(tipset))
941                }
942            },
943            SyncTask::FetchTipset(key, epoch) => {
944                match get_full_tipset_batch(&network, state_manager.chain_store(), None, &key).await
945                {
946                    Ok(parents) => Some(SyncEvent::NewFullTipsets(parents)),
947                    Err(e) => {
948                        tracing::warn!(%key, %epoch, "failed to fetch tipset: {e:#}");
949                        None
950                    }
951                }
952            }
953        }
954    }
955}
956
957#[cfg(test)]
958mod tests {
959    use super::*;
960    use crate::blocks::{Chain4U, HeaderBuilder, chain4u};
961    use crate::db::MemoryDB;
962    use crate::utils::db::CborStoreExt as _;
963    use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
964    use num_bigint::BigInt;
965    use num_traits::ToPrimitive;
966    use std::sync::Arc;
967    use tracing::level_filters::LevelFilter;
968    use tracing_subscriber::EnvFilter;
969
970    fn setup() -> (Arc<ChainStore<MemoryDB>>, Chain4U<Arc<MemoryDB>>) {
971        // Initialize test logger
972        let _ = tracing_subscriber::fmt()
973            .without_time()
974            .with_env_filter(
975                EnvFilter::builder()
976                    .with_default_directive(LevelFilter::DEBUG.into())
977                    .from_env()
978                    .unwrap(),
979            )
980            .try_init();
981
982        let db = Arc::new(MemoryDB::default());
983
984        // Populate DB with message roots used by chain4u
985        {
986            let empty_amt = Amt::<Cid, _>::new(&db).flush().unwrap();
987            db.put_cbor_default(&crate::blocks::TxMeta {
988                bls_message_root: empty_amt,
989                secp_message_root: empty_amt,
990            })
991            .unwrap();
992        }
993
994        // Create a chain of 5 tipsets using Chain4U
995        let c4u = Chain4U::with_blockstore(db.clone());
996        chain4u! {
997            in c4u;
998            [genesis_header = dummy_node(&db, 0)]
999        };
1000
1001        let cs = Arc::new(
1002            ChainStore::new(
1003                db.clone(),
1004                db.clone(),
1005                db.clone(),
1006                Default::default(),
1007                genesis_header.clone().into(),
1008            )
1009            .unwrap(),
1010        );
1011
1012        cs.set_heaviest_tipset(cs.genesis_tipset()).unwrap();
1013
1014        (cs, c4u)
1015    }
1016
1017    fn dummy_state(db: impl Blockstore, i: ChainEpoch) -> Cid {
1018        db.put_cbor_default(&i).unwrap()
1019    }
1020
1021    fn dummy_node(db: impl Blockstore, i: ChainEpoch) -> HeaderBuilder {
1022        HeaderBuilder {
1023            state_root: dummy_state(db, i).into(),
1024            weight: BigInt::from(i).into(),
1025            epoch: i.into(),
1026            ..Default::default()
1027        }
1028    }
1029
1030    #[test]
1031    fn test_state_machine_validation_order() {
1032        let (cs, c4u) = setup();
1033        let db = cs.blockstore().clone();
1034
1035        chain4u! {
1036            from [genesis_header] in c4u;
1037            [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)] -> [c = dummy_node(&db, 3)] -> [d = dummy_node(&db, 4)] -> [e = dummy_node(&db, 5)]
1038        };
1039
1040        // Create the state machine
1041        let mut state_machine = SyncStateMachine::new(cs, Default::default(), true);
1042
1043        // Insert tipsets in random order
1044        let tipsets = vec![e, b, d, c, a];
1045
1046        // Convert each block into a FullTipset and add it to the state machine
1047        for block in tipsets {
1048            let full_tipset = FullTipset::new(vec![Block {
1049                header: block.clone().into(),
1050                bls_messages: vec![],
1051                secp_messages: vec![],
1052            }])
1053            .unwrap();
1054            state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1055        }
1056
1057        // Record validation order by processing all validation tasks in each iteration
1058        let mut validation_tasks = Vec::new();
1059        loop {
1060            let (tasks, _) = state_machine.tasks();
1061
1062            // Find all validation tasks
1063            let validation_tipsets: Vec<_> = tasks
1064                .into_iter()
1065                .filter_map(|task| {
1066                    if let SyncTask::ValidateTipset {
1067                        tipset,
1068                        is_proposed_head,
1069                    } = task
1070                    {
1071                        Some((tipset, is_proposed_head))
1072                    } else {
1073                        None
1074                    }
1075                })
1076                .collect();
1077
1078            if validation_tipsets.is_empty() {
1079                break;
1080            }
1081
1082            // Record and mark all tipsets as validated
1083            for (ts, is_proposed_head) in validation_tipsets {
1084                validation_tasks.push(ts.epoch());
1085                db.put_cbor_default(&ts.epoch()).unwrap();
1086                state_machine.mark_validated_tipset(ts, is_proposed_head);
1087            }
1088        }
1089
1090        // We expect validation tasks for epochs 1 through 5 in order
1091        assert_eq!(validation_tasks, vec![1, 2, 3, 4, 5]);
1092    }
1093
1094    #[test]
1095    fn test_sync_state_machine_chain_fragments() {
1096        let (cs, c4u) = setup();
1097        let db = cs.blockstore().clone();
1098
1099        // Create a forked chain
1100        // genesis -> a -> b
1101        //            \--> c
1102        chain4u! {
1103            in c4u;
1104            [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)]
1105        };
1106        chain4u! {
1107            from [a] in c4u;
1108            [c = dummy_node(&db, 3)]
1109        };
1110
1111        // Create the state machine
1112        let mut state_machine = SyncStateMachine::new(cs, Default::default(), false);
1113
1114        // Convert each block into a FullTipset and add it to the state machine
1115        for block in [a, b, c] {
1116            let full_tipset = FullTipset::new(vec![Block {
1117                header: block.clone().into(),
1118                bls_messages: vec![],
1119                secp_messages: vec![],
1120            }])
1121            .unwrap();
1122            state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1123        }
1124
1125        let chains = state_machine
1126            .chains()
1127            .into_iter()
1128            .map(|v| {
1129                v.into_iter()
1130                    .map(|ts| ts.weight().to_i64().unwrap_or(0))
1131                    .collect_vec()
1132            })
1133            .collect_vec();
1134
1135        // Both chains should start at the same tipset
1136        assert_eq!(chains, vec![vec![1, 3], vec![1, 2]]);
1137    }
1138}