Skip to main content

forest/chain_sync/
chain_follower.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3//! This module contains the logic for driving Forest forward in the Filecoin
4//! blockchain.
5//!
6//! Forest keeps track of the current heaviest tipset, and receives information
7//! about new blocks and tipsets from peers as well as connected miners. The
8//! state machine has the following rules:
9//! - A tipset is invalid if its parent is invalid.
10//! - If a tipset's parent isn't in our database, request it from the network.
11//! - If a tipset's parent has been validated, validate the tipset.
12//! - If a tipset is 1 day older than the heaviest tipset, the tipset is
13//!   invalid. This prevents Forest from following forks that will never be
14//!   accepted.
15//!
16//! The state machine does not do any network requests or validation. Those are
17//! handled by an external actor.
18
19use super::network_context::SyncNetworkContext;
20use crate::{
21    blocks::{Block, FullTipset, Tipset, TipsetKey},
22    chain::ChainStore,
23    chain_sync::{
24        ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
25        bad_block_cache::BadBlockCache,
26        metrics,
27        tipset_syncer::{TipsetSyncerError, validate_tipset},
28    },
29    libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
30    message_pool::{MessagePool, MpoolRpcProvider},
31    networks::calculate_expected_epoch,
32    shim::clock::ChainEpoch,
33    state_manager::StateManager,
34    utils::misc::env::is_env_truthy,
35};
36use ahash::{HashMap, HashSet};
37use chrono::Utc;
38use cid::Cid;
39use fvm_ipld_blockstore::Blockstore;
40use itertools::Itertools;
41use libp2p::PeerId;
42use parking_lot::{Mutex, RwLock};
43use std::{sync::Arc, time::Instant};
44use tokio::{sync::Notify, task::JoinSet};
45use tracing::{debug, error, info, trace, warn};
46
47pub struct ChainFollower<DB> {
48    /// Syncing status of the chain
49    pub sync_status: SyncStatus,
50
51    /// manages retrieving and updates state objects
52    state_manager: Arc<StateManager<DB>>,
53
54    /// Context to be able to send requests to P2P network
55    pub network: SyncNetworkContext<DB>,
56
57    /// Genesis tipset
58    genesis: Tipset,
59
60    /// Bad blocks cache, updates based on invalid state transitions.
61    /// Will mark any invalid blocks and all children as bad in this bounded
62    /// cache
63    pub bad_blocks: Option<Arc<BadBlockCache>>,
64
65    /// Incoming network events to be handled by synchronizer
66    net_handler: flume::Receiver<NetworkEvent>,
67
68    /// Tipset channel sender
69    pub tipset_sender: flume::Sender<FullTipset>,
70
71    /// Tipset channel receiver
72    tipset_receiver: flume::Receiver<FullTipset>,
73
74    /// When `stateless_mode` is true, forest connects to the P2P network but
75    /// does not execute any state transitions. This drastically reduces the
76    /// memory and disk footprint of Forest but also means that Forest will not
77    /// be able to validate the correctness of the chain.
78    stateless_mode: bool,
79
80    /// Message pool
81    mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
82}
83
84impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
85    pub fn new(
86        state_manager: Arc<StateManager<DB>>,
87        network: SyncNetworkContext<DB>,
88        genesis: Tipset,
89        net_handler: flume::Receiver<NetworkEvent>,
90        stateless_mode: bool,
91        mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
92    ) -> Self {
93        let (tipset_sender, tipset_receiver) = flume::bounded(20);
94        let disable_bad_block_cache = is_env_truthy("FOREST_DISABLE_BAD_BLOCK_CACHE");
95        Self {
96            sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
97            state_manager,
98            network,
99            genesis,
100            bad_blocks: if disable_bad_block_cache {
101                tracing::warn!("bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`");
102                None
103            } else {
104                Some(Default::default())
105            },
106            net_handler,
107            tipset_sender,
108            tipset_receiver,
109            stateless_mode,
110            mem_pool,
111        }
112    }
113
114    pub async fn run(self) -> anyhow::Result<()> {
115        chain_follower(
116            self.state_manager,
117            self.bad_blocks,
118            self.net_handler,
119            self.tipset_receiver,
120            self.network,
121            self.mem_pool,
122            self.sync_status,
123            self.genesis,
124            self.stateless_mode,
125        )
126        .await
127    }
128}
129
130#[allow(clippy::too_many_arguments)]
131// We receive new full tipsets from the p2p swarm, and from miners that use Forest as their frontend.
132pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
133    state_manager: Arc<StateManager<DB>>,
134    bad_block_cache: Option<Arc<BadBlockCache>>,
135    network_rx: flume::Receiver<NetworkEvent>,
136    tipset_receiver: flume::Receiver<FullTipset>,
137    network: SyncNetworkContext<DB>,
138    mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
139    sync_status: SyncStatus,
140    genesis: Tipset,
141    stateless_mode: bool,
142) -> anyhow::Result<()> {
143    let state_changed = Arc::new(Notify::new());
144    let state_machine = Arc::new(Mutex::new(SyncStateMachine::new(
145        state_manager.chain_store().clone(),
146        bad_block_cache.clone(),
147        stateless_mode,
148    )));
149    let tasks: Arc<Mutex<HashSet<SyncTask>>> = Arc::new(Mutex::new(HashSet::default()));
150
151    let mut set = JoinSet::new();
152
153    // Increment metrics, update peer information, and forward tipsets to the state machine.
154    set.spawn({
155        let state_manager = state_manager.clone();
156        let state_changed = state_changed.clone();
157        let state_machine = state_machine.clone();
158        let network = network.clone();
159        async move {
160            while let Ok(event) = network_rx.recv_async().await {
161                inc_gossipsub_event_metrics(&event);
162
163                update_peer_info(
164                    &event,
165                    &network,
166                    state_manager.chain_store().clone(),
167                    &genesis,
168                );
169
170                let Ok(tipset) = (match event {
171                    NetworkEvent::HelloResponseOutbound { request, source } => {
172                        let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
173                        get_full_tipset(
174                            &network,
175                            state_manager.chain_store(),
176                            Some(source),
177                            &tipset_keys,
178                        )
179                        .await
180                        .inspect_err(|e| debug!("Querying full tipset failed: {}", e))
181                    }
182                    NetworkEvent::PubsubMessage { message } => match message {
183                        PubsubMessage::Block(b) => {
184                            let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
185                            get_full_tipset(&network, state_manager.chain_store(), None, &key).await
186                        }
187                        PubsubMessage::Message(m) => {
188                            if let Err(why) = mem_pool.add(m) {
189                                debug!("Received invalid GossipSub message: {}", why);
190                            }
191                            continue;
192                        }
193                    },
194                    _ => continue,
195                }) else {
196                    continue;
197                };
198                {
199                    state_machine
200                        .lock()
201                        .update(SyncEvent::NewFullTipsets(vec![tipset]));
202                    state_changed.notify_one();
203                }
204            }
205        }
206    });
207
208    // Forward tipsets from miners into the state machine.
209    set.spawn({
210        let state_changed = state_changed.clone();
211        let state_machine = state_machine.clone();
212
213        async move {
214            while let Ok(tipset) = tipset_receiver.recv_async().await {
215                state_machine
216                    .lock()
217                    .update(SyncEvent::NewFullTipsets(vec![tipset]));
218                state_changed.notify_one();
219            }
220        }
221    });
222
223    // When the state machine is updated, we need to update the sync status and spawn tasks
224    set.spawn({
225        let state_manager = state_manager.clone();
226        let state_machine = state_machine.clone();
227        let state_changed = state_changed.clone();
228        let tasks = tasks.clone();
229        let bad_block_cache = bad_block_cache.clone();
230        async move {
231            loop {
232                state_changed.notified().await;
233
234                let mut tasks_set = tasks.lock();
235                let (task_vec, current_active_forks) = state_machine.lock().tasks();
236
237                // Update the sync states
238                {
239                    let old_status_report = sync_status.read().clone();
240                    let new_status_report = old_status_report.update(
241                        &state_manager,
242                        current_active_forks,
243                        stateless_mode,
244                    );
245
246                    sync_status.write().clone_from(&new_status_report);
247                }
248
249                for task in task_vec {
250                    // insert task into tasks. If task is already in tasks, skip. If it is not, spawn it.
251                    let new = tasks_set.insert(task.clone());
252                    if new {
253                        let action = task.clone().execute(
254                            network.clone(),
255                            state_manager.clone(),
256                            stateless_mode,
257                            bad_block_cache.clone(),
258                        );
259                        tokio::spawn({
260                            let tasks = tasks.clone();
261                            let state_machine = state_machine.clone();
262                            let state_changed = state_changed.clone();
263                            async move {
264                                if let Some(event) = action.await {
265                                    state_machine.lock().update(event);
266                                    state_changed.notify_one();
267                                }
268                                tasks.lock().remove(&task);
269                            }
270                        });
271                    }
272                }
273            }
274        }
275    });
276
277    // Periodically report progress if there are any tipsets left to be fetched.
278    // Once we're in steady-state (i.e. caught up to HEAD) and there are no
279    // active forks, this will not report anything.
280    set.spawn({
281        let state_manager = state_manager.clone();
282        let state_machine = state_machine.clone();
283        async move {
284            loop {
285                tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
286                let (tasks_set, _) = state_machine.lock().tasks();
287                let heaviest_tipset = state_manager.chain_store().heaviest_tipset();
288                let heaviest_epoch = heaviest_tipset.epoch();
289
290                let to_download = tasks_set
291                    .iter()
292                    .filter_map(|task| match task {
293                        SyncTask::FetchTipset(_, epoch) => Some(epoch - heaviest_epoch),
294                        _ => None,
295                    })
296                    .max()
297                    .unwrap_or(0);
298
299                let expected_head = calculate_expected_epoch(
300                    Utc::now().timestamp() as u64,
301                    state_manager.chain_store().genesis_block_header().timestamp,
302                    state_manager.chain_config().block_delay_secs,
303                );
304
305                // Only print 'Catching up to HEAD' if we're more than 10 epochs
306                // behind. Otherwise it can be too spammy.
307                match (expected_head - heaviest_epoch > 10, to_download > 0) {
308                    (true, true) => info!(
309                        "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}, downloading {to_download} tipsets"
310                        , heaviest_tipset.key()
311                    ),
312                    (true, false) => info!(
313                        "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}"
314                        , heaviest_tipset.key()
315                    ),
316                    (false, true) => {
317                        info!("Downloading {to_download} tipsets")
318                    }
319                    (false, false) => {}
320                }
321            }
322        }
323    });
324
325    set.join_all().await;
326    Ok(())
327}
328
329// Increment the gossipsub event metrics.
330fn inc_gossipsub_event_metrics(event: &NetworkEvent) {
331    let label = match event {
332        NetworkEvent::HelloRequestInbound => metrics::values::HELLO_REQUEST_INBOUND,
333        NetworkEvent::HelloResponseOutbound { .. } => metrics::values::HELLO_RESPONSE_OUTBOUND,
334        NetworkEvent::HelloRequestOutbound => metrics::values::HELLO_REQUEST_OUTBOUND,
335        NetworkEvent::HelloResponseInbound => metrics::values::HELLO_RESPONSE_INBOUND,
336        NetworkEvent::PeerConnected(_) => metrics::values::PEER_CONNECTED,
337        NetworkEvent::PeerDisconnected(_) => metrics::values::PEER_DISCONNECTED,
338        NetworkEvent::PubsubMessage { message } => match message {
339            PubsubMessage::Block(_) => metrics::values::PUBSUB_BLOCK,
340            PubsubMessage::Message(_) => metrics::values::PUBSUB_MESSAGE,
341        },
342        NetworkEvent::ChainExchangeRequestOutbound => {
343            metrics::values::CHAIN_EXCHANGE_REQUEST_OUTBOUND
344        }
345        NetworkEvent::ChainExchangeResponseInbound => {
346            metrics::values::CHAIN_EXCHANGE_RESPONSE_INBOUND
347        }
348        NetworkEvent::ChainExchangeRequestInbound => {
349            metrics::values::CHAIN_EXCHANGE_REQUEST_INBOUND
350        }
351        NetworkEvent::ChainExchangeResponseOutbound => {
352            metrics::values::CHAIN_EXCHANGE_RESPONSE_OUTBOUND
353        }
354    };
355
356    metrics::LIBP2P_MESSAGE_TOTAL.get_or_create(&label).inc();
357}
358
359// Keep our peer manager up to date.
360fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
361    event: &NetworkEvent,
362    network: &SyncNetworkContext<DB>,
363    chain_store: Arc<ChainStore<DB>>,
364    genesis: &Tipset,
365) {
366    match event {
367        NetworkEvent::PeerConnected(peer_id) => {
368            let genesis_cid = *genesis.block_headers().first().cid();
369            // Spawn and immediately move on to the next event
370            tokio::task::spawn(handle_peer_connected_event(
371                network.clone(),
372                chain_store,
373                *peer_id,
374                genesis_cid,
375            ));
376        }
377        NetworkEvent::PeerDisconnected(peer_id) => {
378            handle_peer_disconnected_event(network, *peer_id);
379        }
380        _ => {}
381    }
382}
383
384async fn handle_peer_connected_event<DB: Blockstore + Sync + Send + 'static>(
385    network: SyncNetworkContext<DB>,
386    chain_store: Arc<ChainStore<DB>>,
387    peer_id: PeerId,
388    genesis_block_cid: Cid,
389) {
390    // Query the heaviest TipSet from the store
391    if network.peer_manager().is_peer_new(&peer_id) {
392        // Since the peer is new, send them a hello request
393        // Query the heaviest TipSet from the store
394        let heaviest = chain_store.heaviest_tipset();
395        let request = HelloRequest {
396            heaviest_tip_set: heaviest.cids(),
397            heaviest_tipset_height: heaviest.epoch(),
398            heaviest_tipset_weight: heaviest.weight().clone().into(),
399            genesis_cid: genesis_block_cid,
400        };
401        let (peer_id, moment_sent, response) = match network.hello_request(peer_id, request).await {
402            Ok(response) => response,
403            Err(e) => {
404                debug!("Hello request failed: {}", e);
405                return;
406            }
407        };
408        let dur = Instant::now().duration_since(moment_sent);
409
410        // Update the peer metadata based on the response
411        match response {
412            Some(_) => {
413                network.peer_manager().log_success(&peer_id, dur);
414            }
415            None => {
416                network.peer_manager().log_failure(&peer_id, dur);
417            }
418        }
419    }
420}
421
422fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
423    network: &SyncNetworkContext<DB>,
424    peer_id: PeerId,
425) {
426    network.peer_manager().remove_peer(&peer_id);
427    network.peer_manager().unmark_peer_bad(&peer_id);
428}
429
430pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
431    network: &SyncNetworkContext<DB>,
432    chain_store: &ChainStore<DB>,
433    peer_id: Option<PeerId>,
434    tipset_keys: &TipsetKey,
435) -> anyhow::Result<FullTipset> {
436    // Attempt to load from the store
437    if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
438        return Ok(full_tipset);
439    }
440    // Load from the network
441    let tipset = network
442        .chain_exchange_full_tipset(peer_id, tipset_keys)
443        .await
444        .map_err(|e| anyhow::anyhow!(e))?;
445    tipset.persist(chain_store.blockstore())?;
446
447    Ok(tipset)
448}
449
450async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
451    network: &SyncNetworkContext<DB>,
452    chain_store: &ChainStore<DB>,
453    peer_id: Option<PeerId>,
454    tipset_keys: &TipsetKey,
455) -> anyhow::Result<Vec<FullTipset>> {
456    // Attempt to load from the store
457    if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
458        return Ok(vec![full_tipset]);
459    }
460    // Load from the network
461    let tipsets = network
462        .chain_exchange_full_tipsets(peer_id, tipset_keys)
463        .await
464        .map_err(|e| anyhow::anyhow!(e))?;
465
466    for tipset in tipsets.iter() {
467        tipset.persist(chain_store.blockstore())?;
468    }
469
470    Ok(tipsets)
471}
472
473pub fn load_full_tipset<DB: Blockstore>(
474    chain_store: &ChainStore<DB>,
475    tipset_keys: &TipsetKey,
476) -> anyhow::Result<FullTipset> {
477    // Retrieve tipset from store based on passed in TipsetKey
478    let ts = chain_store
479        .chain_index()
480        .load_required_tipset(tipset_keys)?;
481    let blocks: Vec<_> = ts
482        .block_headers()
483        .iter()
484        .map(|header| -> anyhow::Result<Block> {
485            let (bls_msgs, secp_msgs) =
486                crate::chain::block_messages(chain_store.blockstore(), header)?;
487            Ok(Block {
488                header: header.clone(),
489                bls_messages: bls_msgs,
490                secp_messages: secp_msgs,
491            })
492        })
493        .try_collect()?;
494    // Construct FullTipset
495    let fts = FullTipset::new(blocks)?;
496    Ok(fts)
497}
498
499enum SyncEvent {
500    NewFullTipsets(Vec<FullTipset>),
501    BadTipset(FullTipset),
502    ValidatedTipset {
503        tipset: FullTipset,
504        is_proposed_head: bool,
505    },
506}
507
508impl std::fmt::Display for SyncEvent {
509    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
510        fn tss_to_string(tss: &[FullTipset]) -> String {
511            format!(
512                "epoch: {}-{}",
513                tss.first().map(|ts| ts.epoch()).unwrap_or_default(),
514                tss.last().map(|ts| ts.epoch()).unwrap_or_default()
515            )
516        }
517
518        match self {
519            Self::NewFullTipsets(tss) => write!(f, "NewFullTipsets({})", tss_to_string(tss)),
520            Self::BadTipset(ts) => {
521                write!(f, "BadTipset(epoch: {}, key: {})", ts.epoch(), ts.key())
522            }
523            Self::ValidatedTipset {
524                tipset,
525                is_proposed_head,
526            } => write!(
527                f,
528                "ValidatedTipset(epoch: {}, key: {}, is_proposed_head: {is_proposed_head})",
529                tipset.epoch(),
530                tipset.key()
531            ),
532        }
533    }
534}
535
536struct SyncStateMachine<DB> {
537    cs: Arc<ChainStore<DB>>,
538    bad_block_cache: Option<Arc<BadBlockCache>>,
539    // Map from TipsetKey to FullTipset
540    tipsets: HashMap<TipsetKey, FullTipset>,
541    stateless_mode: bool,
542}
543
544impl<DB: Blockstore> SyncStateMachine<DB> {
545    pub fn new(
546        cs: Arc<ChainStore<DB>>,
547        bad_block_cache: Option<Arc<BadBlockCache>>,
548        stateless_mode: bool,
549    ) -> Self {
550        Self {
551            cs,
552            bad_block_cache,
553            tipsets: HashMap::default(),
554            stateless_mode,
555        }
556    }
557
558    // Compute the list of chains from the tipsets map
559    fn chains(&self) -> Vec<Vec<FullTipset>> {
560        let mut chains = Vec::new();
561        let mut remaining_tipsets = self.tipsets.clone();
562
563        while let Some(heaviest) = remaining_tipsets
564            .values()
565            .max_by_key(|ts| ts.weight())
566            .cloned()
567        {
568            // Build chain starting from heaviest
569            let mut chain = Vec::new();
570            let mut current = Some(heaviest);
571
572            while let Some(tipset) = current.take() {
573                remaining_tipsets.remove(tipset.key());
574
575                // Find parent in tipsets map
576                current = self.tipsets.get(tipset.parents()).cloned();
577
578                chain.push(tipset);
579            }
580            chain.reverse();
581            chains.push(chain);
582        }
583
584        chains
585    }
586
587    fn is_parent_validated(&self, tipset: &FullTipset) -> bool {
588        let db = self.cs.blockstore();
589        self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
590    }
591
592    fn is_ready_for_validation(&self, tipset: &FullTipset) -> bool {
593        if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
594            // Skip validation in stateless mode and for genesis tipset
595            true
596        } else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
597            let head_ts = self.cs.heaviest_tipset();
598            // Treat post-head-epoch tipsets as not validated to fix <https://github.com/ChainSafe/forest/issues/5677>
599            // basically, the follow task should always start from the current head which could be manually set
600            // to an old one. When a post-head-epoch tipset is considered validated, it could mess up the state machine
601            // in some edge cases and the node ends up being stuck with ever-empty sync task queue as reported
602            // in <https://github.com/ChainSafe/forest/issues/5679>.
603            if parent_ts.key() == head_ts.key() {
604                true
605            } else if parent_ts.epoch() >= head_ts.epoch() {
606                false
607            } else {
608                self.is_parent_validated(tipset)
609            }
610        } else {
611            false
612        }
613    }
614
615    fn add_full_tipset(&mut self, tipset: FullTipset) {
616        if let Err(why) = TipsetValidator(&tipset).validate(
617            &self.cs,
618            self.bad_block_cache.as_ref().map(AsRef::as_ref),
619            &self.cs.genesis_tipset(),
620            self.cs.chain_config().block_delay_secs,
621        ) {
622            metrics::INVALID_TIPSET_TOTAL.inc();
623            trace!("Skipping invalid tipset: {}", why);
624            self.mark_bad_tipset(tipset);
625            return;
626        }
627
628        // Check if tipset is outside the chain_finality window
629        let heaviest = self.cs.heaviest_tipset();
630        let epoch_diff = heaviest.epoch() - tipset.epoch();
631
632        if epoch_diff > self.cs.chain_config().policy.chain_finality {
633            self.mark_bad_tipset(tipset);
634            return;
635        }
636
637        // Check if tipset already exists
638        if self.tipsets.contains_key(tipset.key()) {
639            return;
640        }
641
642        // Find any existing tipsets with same epoch and parents
643        let mut to_remove = Vec::new();
644        #[allow(clippy::mutable_key_type)]
645        let mut merged_blocks: HashSet<_> = tipset.blocks().iter().cloned().collect();
646
647        // Collect all parent references from existing tipsets
648        let parent_refs: HashSet<_> = self
649            .tipsets
650            .values()
651            .map(|ts| ts.parents().clone())
652            .collect();
653
654        for (key, existing_ts) in self.tipsets.iter() {
655            if existing_ts.epoch() == tipset.epoch() && existing_ts.parents() == tipset.parents() {
656                // Only mark for removal if not referenced as a parent
657                if !parent_refs.contains(key) {
658                    to_remove.push(key.clone());
659                }
660                // Add blocks from existing tipset - HashSet handles deduplication automatically
661                merged_blocks.extend(existing_ts.blocks().iter().cloned());
662            }
663        }
664
665        // Remove old tipsets that were merged and aren't referenced
666        for key in to_remove {
667            self.tipsets.remove(&key);
668        }
669
670        // Create and insert new merged tipset
671        if let Ok(merged_tipset) = FullTipset::new(merged_blocks) {
672            self.tipsets
673                .insert(merged_tipset.key().clone(), merged_tipset);
674        }
675    }
676
677    // Mark blocks in tipset as bad.
678    // Mark all descendants of tipsets as bad.
679    // Remove all bad tipsets from the tipset map.
680    fn mark_bad_tipset(&mut self, tipset: FullTipset) {
681        let mut stack = vec![tipset];
682        while let Some(tipset) = stack.pop() {
683            self.tipsets.remove(tipset.key());
684            // Find all descendant tipsets (tipsets that have this tipset as a parent)
685            let mut to_remove = Vec::new();
686            let mut descendants = Vec::new();
687
688            for (key, ts) in self.tipsets.iter() {
689                if ts.parents() == tipset.key() {
690                    to_remove.push(key.clone());
691                    descendants.push(ts.clone());
692                }
693            }
694
695            // Remove bad tipsets from the map
696            for key in to_remove {
697                self.tipsets.remove(&key);
698            }
699
700            // Mark descendants as bad
701            stack.extend(descendants);
702        }
703    }
704
705    fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) {
706        if !self.is_parent_validated(&tipset) {
707            tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), "Tipset must be validated");
708            return;
709        }
710
711        self.tipsets.remove(tipset.key());
712        let tipset = tipset.into_tipset();
713        // cs.put_tipset requires state and doesn't work in stateless mode
714        if self.stateless_mode {
715            let epoch = tipset.epoch();
716            let terse_key = tipset.key().terse();
717            if self.cs.heaviest_tipset().weight() < tipset.weight() {
718                if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
719                    error!("Error setting heaviest tipset: {}", e);
720                } else {
721                    info!("Heaviest tipset: {} ({})", epoch, terse_key);
722                }
723            }
724        } else if is_proposed_head {
725            if let Err(e) = self.cs.put_tipset(&tipset) {
726                error!("Error putting tipset: {e}");
727            }
728        } else if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
729            error!("Error setting heaviest tipset: {e}");
730        }
731    }
732
733    pub fn update(&mut self, event: SyncEvent) {
734        tracing::trace!("update: {event}");
735        match event {
736            SyncEvent::NewFullTipsets(tipsets) => {
737                for tipset in tipsets {
738                    self.add_full_tipset(tipset);
739                }
740            }
741            SyncEvent::BadTipset(tipset) => self.mark_bad_tipset(tipset),
742            SyncEvent::ValidatedTipset {
743                tipset,
744                is_proposed_head,
745            } => self.mark_validated_tipset(tipset, is_proposed_head),
746        }
747    }
748
749    pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
750        // Get the node's current validated head epoch once, as it's the same for all forks.
751        let current_validated_epoch = self.cs.heaviest_tipset().epoch();
752        let now = Utc::now();
753
754        let mut active_sync_info = Vec::new();
755        let mut tasks = Vec::new();
756        for chain in self.chains() {
757            if let Some(first_ts) = chain.first() {
758                let last_ts = chain.last().expect("Infallible");
759                let stage: ForkSyncStage;
760                let start_time = Some(now);
761
762                if !self.is_ready_for_validation(first_ts) {
763                    stage = ForkSyncStage::FetchingHeaders;
764                    tasks.push(SyncTask::FetchTipset(
765                        first_ts.parents().clone(),
766                        first_ts.epoch(),
767                    ));
768                } else {
769                    stage = ForkSyncStage::ValidatingTipsets;
770                    tasks.push(SyncTask::ValidateTipset {
771                        tipset: first_ts.clone(),
772                        is_proposed_head: chain.len() == 1,
773                    });
774                }
775
776                let fork_info = ForkSyncInfo {
777                    target_tipset_key: last_ts.key().clone(),
778                    target_epoch: last_ts.epoch(),
779                    target_sync_epoch_start: first_ts.epoch(),
780                    stage,
781                    validated_chain_head_epoch: current_validated_epoch,
782                    start_time,
783                    last_updated: Some(now),
784                };
785
786                active_sync_info.push(fork_info);
787            }
788        }
789        (tasks, active_sync_info)
790    }
791}
792
793#[derive(PartialEq, Eq, Hash, Clone, Debug)]
794enum SyncTask {
795    ValidateTipset {
796        tipset: FullTipset,
797        is_proposed_head: bool,
798    },
799    FetchTipset(TipsetKey, ChainEpoch),
800}
801
802impl std::fmt::Display for SyncTask {
803    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
804        match self {
805            SyncTask::ValidateTipset {
806                tipset,
807                is_proposed_head,
808            } => write!(
809                f,
810                "ValidateTipset(epoch: {}, is_proposed_head: {is_proposed_head})",
811                tipset.epoch()
812            ),
813            SyncTask::FetchTipset(key, epoch) => {
814                let s = key.to_string();
815                write!(
816                    f,
817                    "FetchTipset({}, epoch: {})",
818                    &s[s.len().saturating_sub(8)..],
819                    epoch
820                )
821            }
822        }
823    }
824}
825
826impl SyncTask {
827    async fn execute<DB: Blockstore + Sync + Send + 'static>(
828        self,
829        network: SyncNetworkContext<DB>,
830        state_manager: Arc<StateManager<DB>>,
831        stateless_mode: bool,
832        bad_block_cache: Option<Arc<BadBlockCache>>,
833    ) -> Option<SyncEvent> {
834        tracing::trace!("SyncTask::execute {self}");
835        match self {
836            SyncTask::ValidateTipset {
837                tipset,
838                is_proposed_head,
839            } if stateless_mode => Some(SyncEvent::ValidatedTipset {
840                tipset,
841                is_proposed_head,
842            }),
843            SyncTask::ValidateTipset {
844                tipset,
845                is_proposed_head,
846            } => match validate_tipset(&state_manager, tipset.clone(), bad_block_cache).await {
847                Ok(()) => Some(SyncEvent::ValidatedTipset {
848                    tipset,
849                    is_proposed_head,
850                }),
851                // If temporal drift error, don't mark as bad, just skip validation and try again
852                // later. This mirrors internal logic where temporal drift doesn't mark a block as
853                // bad permanently, since it could be valid later on. If not done, a single
854                // time-traveling block could cause the node to be stuck without making progress.
855                Err(e) if matches!(e, TipsetSyncerError::TimeTravellingBlock { .. }) => {
856                    warn!("Time travelling block detected, skipping tipset for now: {e}");
857                    None
858                }
859                Err(e) => {
860                    warn!("Error validating tipset: {e}");
861                    Some(SyncEvent::BadTipset(tipset))
862                }
863            },
864            SyncTask::FetchTipset(key, epoch) => {
865                match get_full_tipset_batch(&network, state_manager.chain_store(), None, &key).await
866                {
867                    Ok(parents) => Some(SyncEvent::NewFullTipsets(parents)),
868                    Err(e) => {
869                        tracing::warn!(%key, %epoch, "failed to fetch tipset: {e}");
870                        None
871                    }
872                }
873            }
874        }
875    }
876}
877
878#[cfg(test)]
879mod tests {
880    use super::*;
881    use crate::blocks::{Chain4U, HeaderBuilder, chain4u};
882    use crate::db::MemoryDB;
883    use crate::utils::db::CborStoreExt as _;
884    use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
885    use num_bigint::BigInt;
886    use num_traits::ToPrimitive;
887    use std::sync::Arc;
888    use tracing::level_filters::LevelFilter;
889    use tracing_subscriber::EnvFilter;
890
891    fn setup() -> (Arc<ChainStore<MemoryDB>>, Chain4U<Arc<MemoryDB>>) {
892        // Initialize test logger
893        let _ = tracing_subscriber::fmt()
894            .without_time()
895            .with_env_filter(
896                EnvFilter::builder()
897                    .with_default_directive(LevelFilter::DEBUG.into())
898                    .from_env()
899                    .unwrap(),
900            )
901            .try_init();
902
903        let db = Arc::new(MemoryDB::default());
904
905        // Populate DB with message roots used by chain4u
906        {
907            let empty_amt = Amt::<Cid, _>::new(&db).flush().unwrap();
908            db.put_cbor_default(&crate::blocks::TxMeta {
909                bls_message_root: empty_amt,
910                secp_message_root: empty_amt,
911            })
912            .unwrap();
913        }
914
915        // Create a chain of 5 tipsets using Chain4U
916        let c4u = Chain4U::with_blockstore(db.clone());
917        chain4u! {
918            in c4u;
919            [genesis_header = dummy_node(&db, 0)]
920        };
921
922        let cs = Arc::new(
923            ChainStore::new(
924                db.clone(),
925                db.clone(),
926                db.clone(),
927                Default::default(),
928                genesis_header.clone().into(),
929            )
930            .unwrap(),
931        );
932
933        cs.set_heaviest_tipset(cs.genesis_tipset()).unwrap();
934
935        (cs, c4u)
936    }
937
938    fn dummy_state(db: impl Blockstore, i: ChainEpoch) -> Cid {
939        db.put_cbor_default(&i).unwrap()
940    }
941
942    fn dummy_node(db: impl Blockstore, i: ChainEpoch) -> HeaderBuilder {
943        HeaderBuilder {
944            state_root: dummy_state(db, i).into(),
945            weight: BigInt::from(i).into(),
946            epoch: i.into(),
947            ..Default::default()
948        }
949    }
950
951    #[test]
952    fn test_state_machine_validation_order() {
953        let (cs, c4u) = setup();
954        let db = cs.blockstore().clone();
955
956        chain4u! {
957            from [genesis_header] in c4u;
958            [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)] -> [c = dummy_node(&db, 3)] -> [d = dummy_node(&db, 4)] -> [e = dummy_node(&db, 5)]
959        };
960
961        // Create the state machine
962        let mut state_machine = SyncStateMachine::new(cs, Default::default(), true);
963
964        // Insert tipsets in random order
965        let tipsets = vec![e, b, d, c, a];
966
967        // Convert each block into a FullTipset and add it to the state machine
968        for block in tipsets {
969            let full_tipset = FullTipset::new(vec![Block {
970                header: block.clone().into(),
971                bls_messages: vec![],
972                secp_messages: vec![],
973            }])
974            .unwrap();
975            state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
976        }
977
978        // Record validation order by processing all validation tasks in each iteration
979        let mut validation_tasks = Vec::new();
980        loop {
981            let (tasks, _) = state_machine.tasks();
982
983            // Find all validation tasks
984            let validation_tipsets: Vec<_> = tasks
985                .into_iter()
986                .filter_map(|task| {
987                    if let SyncTask::ValidateTipset {
988                        tipset,
989                        is_proposed_head,
990                    } = task
991                    {
992                        Some((tipset, is_proposed_head))
993                    } else {
994                        None
995                    }
996                })
997                .collect();
998
999            if validation_tipsets.is_empty() {
1000                break;
1001            }
1002
1003            // Record and mark all tipsets as validated
1004            for (ts, is_proposed_head) in validation_tipsets {
1005                validation_tasks.push(ts.epoch());
1006                db.put_cbor_default(&ts.epoch()).unwrap();
1007                state_machine.mark_validated_tipset(ts, is_proposed_head);
1008            }
1009        }
1010
1011        // We expect validation tasks for epochs 1 through 5 in order
1012        assert_eq!(validation_tasks, vec![1, 2, 3, 4, 5]);
1013    }
1014
1015    #[test]
1016    fn test_sync_state_machine_chain_fragments() {
1017        let (cs, c4u) = setup();
1018        let db = cs.blockstore().clone();
1019
1020        // Create a forked chain
1021        // genesis -> a -> b
1022        //            \--> c
1023        chain4u! {
1024            in c4u;
1025            [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)]
1026        };
1027        chain4u! {
1028            from [a] in c4u;
1029            [c = dummy_node(&db, 3)]
1030        };
1031
1032        // Create the state machine
1033        let mut state_machine = SyncStateMachine::new(cs, Default::default(), false);
1034
1035        // Convert each block into a FullTipset and add it to the state machine
1036        for block in [a, b, c] {
1037            let full_tipset = FullTipset::new(vec![Block {
1038                header: block.clone().into(),
1039                bls_messages: vec![],
1040                secp_messages: vec![],
1041            }])
1042            .unwrap();
1043            state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1044        }
1045
1046        let chains = state_machine
1047            .chains()
1048            .into_iter()
1049            .map(|v| {
1050                v.into_iter()
1051                    .map(|ts| ts.weight().to_i64().unwrap_or(0))
1052                    .collect_vec()
1053            })
1054            .collect_vec();
1055
1056        // Both chains should start at the same tipset
1057        assert_eq!(chains, vec![vec![1, 3], vec![1, 2]]);
1058    }
1059}