Skip to main content

forest/chain_sync/
chain_follower.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3//! This module contains the logic for driving Forest forward in the Filecoin
4//! blockchain.
5//!
6//! Forest keeps track of the current heaviest tipset, and receives information
7//! about new blocks and tipsets from peers as well as connected miners. The
8//! state machine has the following rules:
9//! - A tipset is invalid if its parent is invalid.
10//! - If a tipset's parent isn't in our database, request it from the network.
11//! - If a tipset's parent has been validated, validate the tipset.
12//! - If a tipset is 1 day older than the heaviest tipset, the tipset is
13//!   invalid. This prevents Forest from following forks that will never be
14//!   accepted.
15//!
16//! The state machine does not do any network requests or validation. Those are
17//! handled by an external actor.
18
19use super::network_context::SyncNetworkContext;
20use crate::{
21    blocks::{Block, FullTipset, Tipset, TipsetKey},
22    chain::ChainStore,
23    chain_sync::{
24        ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
25        bad_block_cache::BadBlockCache, metrics, tipset_syncer::validate_tipset,
26    },
27    libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
28    message_pool::{MessagePool, MpoolRpcProvider},
29    networks::calculate_expected_epoch,
30    shim::clock::ChainEpoch,
31    state_manager::StateManager,
32    utils::misc::env::is_env_truthy,
33};
34use ahash::{HashMap, HashSet};
35use chrono::Utc;
36use cid::Cid;
37use fvm_ipld_blockstore::Blockstore;
38use itertools::Itertools;
39use libp2p::PeerId;
40use parking_lot::{Mutex, RwLock};
41use std::{sync::Arc, time::Instant};
42use tokio::{sync::Notify, task::JoinSet};
43use tracing::{debug, error, info, trace, warn};
44
45pub struct ChainFollower<DB> {
46    /// Syncing status of the chain
47    pub sync_status: SyncStatus,
48
49    /// manages retrieving and updates state objects
50    state_manager: Arc<StateManager<DB>>,
51
52    /// Context to be able to send requests to P2P network
53    pub network: SyncNetworkContext<DB>,
54
55    /// Genesis tipset
56    genesis: Tipset,
57
58    /// Bad blocks cache, updates based on invalid state transitions.
59    /// Will mark any invalid blocks and all children as bad in this bounded
60    /// cache
61    pub bad_blocks: Option<Arc<BadBlockCache>>,
62
63    /// Incoming network events to be handled by synchronizer
64    net_handler: flume::Receiver<NetworkEvent>,
65
66    /// Tipset channel sender
67    pub tipset_sender: flume::Sender<FullTipset>,
68
69    /// Tipset channel receiver
70    tipset_receiver: flume::Receiver<FullTipset>,
71
72    /// When `stateless_mode` is true, forest connects to the P2P network but
73    /// does not execute any state transitions. This drastically reduces the
74    /// memory and disk footprint of Forest but also means that Forest will not
75    /// be able to validate the correctness of the chain.
76    stateless_mode: bool,
77
78    /// Message pool
79    mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
80}
81
82impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
83    pub fn new(
84        state_manager: Arc<StateManager<DB>>,
85        network: SyncNetworkContext<DB>,
86        genesis: Tipset,
87        net_handler: flume::Receiver<NetworkEvent>,
88        stateless_mode: bool,
89        mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
90    ) -> Self {
91        let (tipset_sender, tipset_receiver) = flume::bounded(20);
92        let disable_bad_block_cache = is_env_truthy("FOREST_DISABLE_BAD_BLOCK_CACHE");
93        Self {
94            sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
95            state_manager,
96            network,
97            genesis,
98            bad_blocks: if disable_bad_block_cache {
99                tracing::warn!("bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`");
100                None
101            } else {
102                Some(Default::default())
103            },
104            net_handler,
105            tipset_sender,
106            tipset_receiver,
107            stateless_mode,
108            mem_pool,
109        }
110    }
111
112    pub async fn run(self) -> anyhow::Result<()> {
113        chain_follower(
114            self.state_manager,
115            self.bad_blocks,
116            self.net_handler,
117            self.tipset_receiver,
118            self.network,
119            self.mem_pool,
120            self.sync_status,
121            self.genesis,
122            self.stateless_mode,
123        )
124        .await
125    }
126}
127
128#[allow(clippy::too_many_arguments)]
129// We receive new full tipsets from the p2p swarm, and from miners that use Forest as their frontend.
130pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
131    state_manager: Arc<StateManager<DB>>,
132    bad_block_cache: Option<Arc<BadBlockCache>>,
133    network_rx: flume::Receiver<NetworkEvent>,
134    tipset_receiver: flume::Receiver<FullTipset>,
135    network: SyncNetworkContext<DB>,
136    mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
137    sync_status: SyncStatus,
138    genesis: Tipset,
139    stateless_mode: bool,
140) -> anyhow::Result<()> {
141    let state_changed = Arc::new(Notify::new());
142    let state_machine = Arc::new(Mutex::new(SyncStateMachine::new(
143        state_manager.chain_store().clone(),
144        bad_block_cache.clone(),
145        stateless_mode,
146    )));
147    let tasks: Arc<Mutex<HashSet<SyncTask>>> = Arc::new(Mutex::new(HashSet::default()));
148
149    let mut set = JoinSet::new();
150
151    // Increment metrics, update peer information, and forward tipsets to the state machine.
152    set.spawn({
153        let state_manager = state_manager.clone();
154        let state_changed = state_changed.clone();
155        let state_machine = state_machine.clone();
156        let network = network.clone();
157        async move {
158            while let Ok(event) = network_rx.recv_async().await {
159                inc_gossipsub_event_metrics(&event);
160
161                update_peer_info(
162                    &event,
163                    &network,
164                    state_manager.chain_store().clone(),
165                    &genesis,
166                );
167
168                let Ok(tipset) = (match event {
169                    NetworkEvent::HelloResponseOutbound { request, source } => {
170                        let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
171                        get_full_tipset(
172                            &network,
173                            state_manager.chain_store(),
174                            Some(source),
175                            &tipset_keys,
176                        )
177                        .await
178                        .inspect_err(|e| debug!("Querying full tipset failed: {}", e))
179                    }
180                    NetworkEvent::PubsubMessage { message } => match message {
181                        PubsubMessage::Block(b) => {
182                            let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
183                            get_full_tipset(&network, state_manager.chain_store(), None, &key).await
184                        }
185                        PubsubMessage::Message(m) => {
186                            if let Err(why) = mem_pool.add(m) {
187                                debug!("Received invalid GossipSub message: {}", why);
188                            }
189                            continue;
190                        }
191                    },
192                    _ => continue,
193                }) else {
194                    continue;
195                };
196                {
197                    state_machine
198                        .lock()
199                        .update(SyncEvent::NewFullTipsets(vec![tipset]));
200                    state_changed.notify_one();
201                }
202            }
203        }
204    });
205
206    // Forward tipsets from miners into the state machine.
207    set.spawn({
208        let state_changed = state_changed.clone();
209        let state_machine = state_machine.clone();
210
211        async move {
212            while let Ok(tipset) = tipset_receiver.recv_async().await {
213                state_machine
214                    .lock()
215                    .update(SyncEvent::NewFullTipsets(vec![tipset]));
216                state_changed.notify_one();
217            }
218        }
219    });
220
221    // When the state machine is updated, we need to update the sync status and spawn tasks
222    set.spawn({
223        let state_manager = state_manager.clone();
224        let state_machine = state_machine.clone();
225        let state_changed = state_changed.clone();
226        let tasks = tasks.clone();
227        let bad_block_cache = bad_block_cache.clone();
228        async move {
229            loop {
230                state_changed.notified().await;
231
232                let mut tasks_set = tasks.lock();
233                let (task_vec, current_active_forks) = state_machine.lock().tasks();
234
235                // Update the sync states
236                {
237                    let old_status_report = sync_status.read().clone();
238                    let new_status_report = old_status_report.update(
239                        &state_manager,
240                        current_active_forks,
241                        stateless_mode,
242                    );
243
244                    sync_status.write().clone_from(&new_status_report);
245                }
246
247                for task in task_vec {
248                    // insert task into tasks. If task is already in tasks, skip. If it is not, spawn it.
249                    let new = tasks_set.insert(task.clone());
250                    if new {
251                        let action = task.clone().execute(
252                            network.clone(),
253                            state_manager.clone(),
254                            stateless_mode,
255                            bad_block_cache.clone(),
256                        );
257                        tokio::spawn({
258                            let tasks = tasks.clone();
259                            let state_machine = state_machine.clone();
260                            let state_changed = state_changed.clone();
261                            async move {
262                                if let Some(event) = action.await {
263                                    state_machine.lock().update(event);
264                                    state_changed.notify_one();
265                                }
266                                tasks.lock().remove(&task);
267                            }
268                        });
269                    }
270                }
271            }
272        }
273    });
274
275    // Periodically report progress if there are any tipsets left to be fetched.
276    // Once we're in steady-state (i.e. caught up to HEAD) and there are no
277    // active forks, this will not report anything.
278    set.spawn({
279        let state_manager = state_manager.clone();
280        let state_machine = state_machine.clone();
281        async move {
282            loop {
283                tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
284                let (tasks_set, _) = state_machine.lock().tasks();
285                let heaviest_tipset = state_manager.chain_store().heaviest_tipset();
286                let heaviest_epoch = heaviest_tipset.epoch();
287
288                let to_download = tasks_set
289                    .iter()
290                    .filter_map(|task| match task {
291                        SyncTask::FetchTipset(_, epoch) => Some(epoch - heaviest_epoch),
292                        _ => None,
293                    })
294                    .max()
295                    .unwrap_or(0);
296
297                let expected_head = calculate_expected_epoch(
298                    Utc::now().timestamp() as u64,
299                    state_manager.chain_store().genesis_block_header().timestamp,
300                    state_manager.chain_config().block_delay_secs,
301                );
302
303                // Only print 'Catching up to HEAD' if we're more than 10 epochs
304                // behind. Otherwise it can be too spammy.
305                match (expected_head - heaviest_epoch > 10, to_download > 0) {
306                    (true, true) => info!(
307                        "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}, downloading {to_download} tipsets"
308                        , heaviest_tipset.key()
309                    ),
310                    (true, false) => info!(
311                        "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}"
312                        , heaviest_tipset.key()
313                    ),
314                    (false, true) => {
315                        info!("Downloading {to_download} tipsets")
316                    }
317                    (false, false) => {}
318                }
319            }
320        }
321    });
322
323    set.join_all().await;
324    Ok(())
325}
326
327// Increment the gossipsub event metrics.
328fn inc_gossipsub_event_metrics(event: &NetworkEvent) {
329    let label = match event {
330        NetworkEvent::HelloRequestInbound => metrics::values::HELLO_REQUEST_INBOUND,
331        NetworkEvent::HelloResponseOutbound { .. } => metrics::values::HELLO_RESPONSE_OUTBOUND,
332        NetworkEvent::HelloRequestOutbound => metrics::values::HELLO_REQUEST_OUTBOUND,
333        NetworkEvent::HelloResponseInbound => metrics::values::HELLO_RESPONSE_INBOUND,
334        NetworkEvent::PeerConnected(_) => metrics::values::PEER_CONNECTED,
335        NetworkEvent::PeerDisconnected(_) => metrics::values::PEER_DISCONNECTED,
336        NetworkEvent::PubsubMessage { message } => match message {
337            PubsubMessage::Block(_) => metrics::values::PUBSUB_BLOCK,
338            PubsubMessage::Message(_) => metrics::values::PUBSUB_MESSAGE,
339        },
340        NetworkEvent::ChainExchangeRequestOutbound => {
341            metrics::values::CHAIN_EXCHANGE_REQUEST_OUTBOUND
342        }
343        NetworkEvent::ChainExchangeResponseInbound => {
344            metrics::values::CHAIN_EXCHANGE_RESPONSE_INBOUND
345        }
346        NetworkEvent::ChainExchangeRequestInbound => {
347            metrics::values::CHAIN_EXCHANGE_REQUEST_INBOUND
348        }
349        NetworkEvent::ChainExchangeResponseOutbound => {
350            metrics::values::CHAIN_EXCHANGE_RESPONSE_OUTBOUND
351        }
352    };
353
354    metrics::LIBP2P_MESSAGE_TOTAL.get_or_create(&label).inc();
355}
356
357// Keep our peer manager up to date.
358fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
359    event: &NetworkEvent,
360    network: &SyncNetworkContext<DB>,
361    chain_store: Arc<ChainStore<DB>>,
362    genesis: &Tipset,
363) {
364    match event {
365        NetworkEvent::PeerConnected(peer_id) => {
366            let genesis_cid = *genesis.block_headers().first().cid();
367            // Spawn and immediately move on to the next event
368            tokio::task::spawn(handle_peer_connected_event(
369                network.clone(),
370                chain_store,
371                *peer_id,
372                genesis_cid,
373            ));
374        }
375        NetworkEvent::PeerDisconnected(peer_id) => {
376            handle_peer_disconnected_event(network, *peer_id);
377        }
378        _ => {}
379    }
380}
381
382async fn handle_peer_connected_event<DB: Blockstore + Sync + Send + 'static>(
383    network: SyncNetworkContext<DB>,
384    chain_store: Arc<ChainStore<DB>>,
385    peer_id: PeerId,
386    genesis_block_cid: Cid,
387) {
388    // Query the heaviest TipSet from the store
389    if network.peer_manager().is_peer_new(&peer_id) {
390        // Since the peer is new, send them a hello request
391        // Query the heaviest TipSet from the store
392        let heaviest = chain_store.heaviest_tipset();
393        let request = HelloRequest {
394            heaviest_tip_set: heaviest.cids(),
395            heaviest_tipset_height: heaviest.epoch(),
396            heaviest_tipset_weight: heaviest.weight().clone().into(),
397            genesis_cid: genesis_block_cid,
398        };
399        let (peer_id, moment_sent, response) = match network.hello_request(peer_id, request).await {
400            Ok(response) => response,
401            Err(e) => {
402                debug!("Hello request failed: {}", e);
403                return;
404            }
405        };
406        let dur = Instant::now().duration_since(moment_sent);
407
408        // Update the peer metadata based on the response
409        match response {
410            Some(_) => {
411                network.peer_manager().log_success(&peer_id, dur);
412            }
413            None => {
414                network.peer_manager().log_failure(&peer_id, dur);
415            }
416        }
417    }
418}
419
420fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
421    network: &SyncNetworkContext<DB>,
422    peer_id: PeerId,
423) {
424    network.peer_manager().remove_peer(&peer_id);
425    network.peer_manager().unmark_peer_bad(&peer_id);
426}
427
428pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
429    network: &SyncNetworkContext<DB>,
430    chain_store: &ChainStore<DB>,
431    peer_id: Option<PeerId>,
432    tipset_keys: &TipsetKey,
433) -> anyhow::Result<FullTipset> {
434    // Attempt to load from the store
435    if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
436        return Ok(full_tipset);
437    }
438    // Load from the network
439    let tipset = network
440        .chain_exchange_full_tipset(peer_id, tipset_keys)
441        .await
442        .map_err(|e| anyhow::anyhow!(e))?;
443    tipset.persist(chain_store.blockstore())?;
444
445    Ok(tipset)
446}
447
448async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
449    network: &SyncNetworkContext<DB>,
450    chain_store: &ChainStore<DB>,
451    peer_id: Option<PeerId>,
452    tipset_keys: &TipsetKey,
453) -> anyhow::Result<Vec<FullTipset>> {
454    // Attempt to load from the store
455    if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
456        return Ok(vec![full_tipset]);
457    }
458    // Load from the network
459    let tipsets = network
460        .chain_exchange_full_tipsets(peer_id, tipset_keys)
461        .await
462        .map_err(|e| anyhow::anyhow!(e))?;
463
464    for tipset in tipsets.iter() {
465        tipset.persist(chain_store.blockstore())?;
466    }
467
468    Ok(tipsets)
469}
470
471pub fn load_full_tipset<DB: Blockstore>(
472    chain_store: &ChainStore<DB>,
473    tipset_keys: &TipsetKey,
474) -> anyhow::Result<FullTipset> {
475    // Retrieve tipset from store based on passed in TipsetKey
476    let ts = chain_store
477        .chain_index()
478        .load_required_tipset(tipset_keys)?;
479    let blocks: Vec<_> = ts
480        .block_headers()
481        .iter()
482        .map(|header| -> anyhow::Result<Block> {
483            let (bls_msgs, secp_msgs) =
484                crate::chain::block_messages(chain_store.blockstore(), header)?;
485            Ok(Block {
486                header: header.clone(),
487                bls_messages: bls_msgs,
488                secp_messages: secp_msgs,
489            })
490        })
491        .try_collect()?;
492    // Construct FullTipset
493    let fts = FullTipset::new(blocks)?;
494    Ok(fts)
495}
496
497enum SyncEvent {
498    NewFullTipsets(Vec<FullTipset>),
499    BadTipset(FullTipset),
500    ValidatedTipset {
501        tipset: FullTipset,
502        is_proposed_head: bool,
503    },
504}
505
506impl std::fmt::Display for SyncEvent {
507    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
508        fn tss_to_string(tss: &[FullTipset]) -> String {
509            format!(
510                "epoch: {}-{}",
511                tss.first().map(|ts| ts.epoch()).unwrap_or_default(),
512                tss.last().map(|ts| ts.epoch()).unwrap_or_default()
513            )
514        }
515
516        match self {
517            Self::NewFullTipsets(tss) => write!(f, "NewFullTipsets({})", tss_to_string(tss)),
518            Self::BadTipset(ts) => {
519                write!(f, "BadTipset(epoch: {}, key: {})", ts.epoch(), ts.key())
520            }
521            Self::ValidatedTipset {
522                tipset,
523                is_proposed_head,
524            } => write!(
525                f,
526                "ValidatedTipset(epoch: {}, key: {}, is_proposed_head: {is_proposed_head})",
527                tipset.epoch(),
528                tipset.key()
529            ),
530        }
531    }
532}
533
534struct SyncStateMachine<DB> {
535    cs: Arc<ChainStore<DB>>,
536    bad_block_cache: Option<Arc<BadBlockCache>>,
537    // Map from TipsetKey to FullTipset
538    tipsets: HashMap<TipsetKey, FullTipset>,
539    stateless_mode: bool,
540}
541
542impl<DB: Blockstore> SyncStateMachine<DB> {
543    pub fn new(
544        cs: Arc<ChainStore<DB>>,
545        bad_block_cache: Option<Arc<BadBlockCache>>,
546        stateless_mode: bool,
547    ) -> Self {
548        Self {
549            cs,
550            bad_block_cache,
551            tipsets: HashMap::default(),
552            stateless_mode,
553        }
554    }
555
556    // Compute the list of chains from the tipsets map
557    fn chains(&self) -> Vec<Vec<FullTipset>> {
558        let mut chains = Vec::new();
559        let mut remaining_tipsets = self.tipsets.clone();
560
561        while let Some(heaviest) = remaining_tipsets
562            .values()
563            .max_by_key(|ts| ts.weight())
564            .cloned()
565        {
566            // Build chain starting from heaviest
567            let mut chain = Vec::new();
568            let mut current = Some(heaviest);
569
570            while let Some(tipset) = current.take() {
571                remaining_tipsets.remove(tipset.key());
572
573                // Find parent in tipsets map
574                current = self.tipsets.get(tipset.parents()).cloned();
575
576                chain.push(tipset);
577            }
578            chain.reverse();
579            chains.push(chain);
580        }
581
582        chains
583    }
584
585    fn is_parent_validated(&self, tipset: &FullTipset) -> bool {
586        let db = self.cs.blockstore();
587        self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
588    }
589
590    fn is_ready_for_validation(&self, tipset: &FullTipset) -> bool {
591        if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
592            // Skip validation in stateless mode and for genesis tipset
593            true
594        } else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
595            let head_ts = self.cs.heaviest_tipset();
596            // Treat post-head-epoch tipsets as not validated to fix <https://github.com/ChainSafe/forest/issues/5677>
597            // basically, the follow task should always start from the current head which could be manually set
598            // to an old one. When a post-head-epoch tipset is considered validated, it could mess up the state machine
599            // in some edge cases and the node ends up being stuck with ever-empty sync task queue as reported
600            // in <https://github.com/ChainSafe/forest/issues/5679>.
601            if parent_ts.key() == head_ts.key() {
602                true
603            } else if parent_ts.epoch() >= head_ts.epoch() {
604                false
605            } else {
606                self.is_parent_validated(tipset)
607            }
608        } else {
609            false
610        }
611    }
612
613    fn add_full_tipset(&mut self, tipset: FullTipset) {
614        if let Err(why) = TipsetValidator(&tipset).validate(
615            &self.cs,
616            self.bad_block_cache.as_ref().map(AsRef::as_ref),
617            &self.cs.genesis_tipset(),
618            self.cs.chain_config().block_delay_secs,
619        ) {
620            metrics::INVALID_TIPSET_TOTAL.inc();
621            trace!("Skipping invalid tipset: {}", why);
622            self.mark_bad_tipset(tipset);
623            return;
624        }
625
626        // Check if tipset is outside the chain_finality window
627        let heaviest = self.cs.heaviest_tipset();
628        let epoch_diff = heaviest.epoch() - tipset.epoch();
629
630        if epoch_diff > self.cs.chain_config().policy.chain_finality {
631            self.mark_bad_tipset(tipset);
632            return;
633        }
634
635        // Check if tipset already exists
636        if self.tipsets.contains_key(tipset.key()) {
637            return;
638        }
639
640        // Find any existing tipsets with same epoch and parents
641        let mut to_remove = Vec::new();
642        #[allow(clippy::mutable_key_type)]
643        let mut merged_blocks: HashSet<_> = tipset.blocks().iter().cloned().collect();
644
645        // Collect all parent references from existing tipsets
646        let parent_refs: HashSet<_> = self
647            .tipsets
648            .values()
649            .map(|ts| ts.parents().clone())
650            .collect();
651
652        for (key, existing_ts) in self.tipsets.iter() {
653            if existing_ts.epoch() == tipset.epoch() && existing_ts.parents() == tipset.parents() {
654                // Only mark for removal if not referenced as a parent
655                if !parent_refs.contains(key) {
656                    to_remove.push(key.clone());
657                }
658                // Add blocks from existing tipset - HashSet handles deduplication automatically
659                merged_blocks.extend(existing_ts.blocks().iter().cloned());
660            }
661        }
662
663        // Remove old tipsets that were merged and aren't referenced
664        for key in to_remove {
665            self.tipsets.remove(&key);
666        }
667
668        // Create and insert new merged tipset
669        if let Ok(merged_tipset) = FullTipset::new(merged_blocks) {
670            self.tipsets
671                .insert(merged_tipset.key().clone(), merged_tipset);
672        }
673    }
674
675    // Mark blocks in tipset as bad.
676    // Mark all descendants of tipsets as bad.
677    // Remove all bad tipsets from the tipset map.
678    fn mark_bad_tipset(&mut self, tipset: FullTipset) {
679        let mut stack = vec![tipset];
680        while let Some(tipset) = stack.pop() {
681            self.tipsets.remove(tipset.key());
682            // Find all descendant tipsets (tipsets that have this tipset as a parent)
683            let mut to_remove = Vec::new();
684            let mut descendants = Vec::new();
685
686            for (key, ts) in self.tipsets.iter() {
687                if ts.parents() == tipset.key() {
688                    to_remove.push(key.clone());
689                    descendants.push(ts.clone());
690                }
691            }
692
693            // Remove bad tipsets from the map
694            for key in to_remove {
695                self.tipsets.remove(&key);
696            }
697
698            // Mark descendants as bad
699            stack.extend(descendants);
700        }
701    }
702
703    fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) {
704        if !self.is_parent_validated(&tipset) {
705            tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), "Tipset must be validated");
706            return;
707        }
708
709        self.tipsets.remove(tipset.key());
710        let tipset = tipset.into_tipset();
711        // cs.put_tipset requires state and doesn't work in stateless mode
712        if self.stateless_mode {
713            let epoch = tipset.epoch();
714            let terse_key = tipset.key().terse();
715            if self.cs.heaviest_tipset().weight() < tipset.weight() {
716                if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
717                    error!("Error setting heaviest tipset: {}", e);
718                } else {
719                    info!("Heaviest tipset: {} ({})", epoch, terse_key);
720                }
721            }
722        } else if is_proposed_head {
723            if let Err(e) = self.cs.put_tipset(&tipset) {
724                error!("Error putting tipset: {e}");
725            }
726        } else if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
727            error!("Error setting heaviest tipset: {e}");
728        }
729    }
730
731    pub fn update(&mut self, event: SyncEvent) {
732        tracing::trace!("update: {event}");
733        match event {
734            SyncEvent::NewFullTipsets(tipsets) => {
735                for tipset in tipsets {
736                    self.add_full_tipset(tipset);
737                }
738            }
739            SyncEvent::BadTipset(tipset) => self.mark_bad_tipset(tipset),
740            SyncEvent::ValidatedTipset {
741                tipset,
742                is_proposed_head,
743            } => self.mark_validated_tipset(tipset, is_proposed_head),
744        }
745    }
746
747    pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
748        // Get the node's current validated head epoch once, as it's the same for all forks.
749        let current_validated_epoch = self.cs.heaviest_tipset().epoch();
750        let now = Utc::now();
751
752        let mut active_sync_info = Vec::new();
753        let mut tasks = Vec::new();
754        for chain in self.chains() {
755            if let Some(first_ts) = chain.first() {
756                let last_ts = chain.last().expect("Infallible");
757                let stage: ForkSyncStage;
758                let start_time = Some(now);
759
760                if !self.is_ready_for_validation(first_ts) {
761                    stage = ForkSyncStage::FetchingHeaders;
762                    tasks.push(SyncTask::FetchTipset(
763                        first_ts.parents().clone(),
764                        first_ts.epoch(),
765                    ));
766                } else {
767                    stage = ForkSyncStage::ValidatingTipsets;
768                    tasks.push(SyncTask::ValidateTipset {
769                        tipset: first_ts.clone(),
770                        is_proposed_head: chain.len() == 1,
771                    });
772                }
773
774                let fork_info = ForkSyncInfo {
775                    target_tipset_key: last_ts.key().clone(),
776                    target_epoch: last_ts.epoch(),
777                    target_sync_epoch_start: first_ts.epoch(),
778                    stage,
779                    validated_chain_head_epoch: current_validated_epoch,
780                    start_time,
781                    last_updated: Some(now),
782                };
783
784                active_sync_info.push(fork_info);
785            }
786        }
787        (tasks, active_sync_info)
788    }
789}
790
791#[derive(PartialEq, Eq, Hash, Clone, Debug)]
792enum SyncTask {
793    ValidateTipset {
794        tipset: FullTipset,
795        is_proposed_head: bool,
796    },
797    FetchTipset(TipsetKey, ChainEpoch),
798}
799
800impl std::fmt::Display for SyncTask {
801    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
802        match self {
803            SyncTask::ValidateTipset {
804                tipset,
805                is_proposed_head,
806            } => write!(
807                f,
808                "ValidateTipset(epoch: {}, is_proposed_head: {is_proposed_head})",
809                tipset.epoch()
810            ),
811            SyncTask::FetchTipset(key, epoch) => {
812                let s = key.to_string();
813                write!(
814                    f,
815                    "FetchTipset({}, epoch: {})",
816                    &s[s.len().saturating_sub(8)..],
817                    epoch
818                )
819            }
820        }
821    }
822}
823
824impl SyncTask {
825    async fn execute<DB: Blockstore + Sync + Send + 'static>(
826        self,
827        network: SyncNetworkContext<DB>,
828        state_manager: Arc<StateManager<DB>>,
829        stateless_mode: bool,
830        bad_block_cache: Option<Arc<BadBlockCache>>,
831    ) -> Option<SyncEvent> {
832        tracing::trace!("SyncTask::execute {self}");
833        match self {
834            SyncTask::ValidateTipset {
835                tipset,
836                is_proposed_head,
837            } if stateless_mode => Some(SyncEvent::ValidatedTipset {
838                tipset,
839                is_proposed_head,
840            }),
841            SyncTask::ValidateTipset {
842                tipset,
843                is_proposed_head,
844            } => match validate_tipset(&state_manager, tipset.clone(), bad_block_cache).await {
845                Ok(()) => Some(SyncEvent::ValidatedTipset {
846                    tipset,
847                    is_proposed_head,
848                }),
849                Err(e) => {
850                    warn!("Error validating tipset: {e}");
851                    Some(SyncEvent::BadTipset(tipset))
852                }
853            },
854            SyncTask::FetchTipset(key, epoch) => {
855                match get_full_tipset_batch(&network, state_manager.chain_store(), None, &key).await
856                {
857                    Ok(parents) => Some(SyncEvent::NewFullTipsets(parents)),
858                    Err(e) => {
859                        tracing::warn!(%key, %epoch, "failed to fetch tipset: {e}");
860                        None
861                    }
862                }
863            }
864        }
865    }
866}
867
868#[cfg(test)]
869mod tests {
870    use super::*;
871    use crate::blocks::{Chain4U, HeaderBuilder, chain4u};
872    use crate::db::MemoryDB;
873    use crate::utils::db::CborStoreExt as _;
874    use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
875    use num_bigint::BigInt;
876    use num_traits::ToPrimitive;
877    use std::sync::Arc;
878    use tracing::level_filters::LevelFilter;
879    use tracing_subscriber::EnvFilter;
880
881    fn setup() -> (Arc<ChainStore<MemoryDB>>, Chain4U<Arc<MemoryDB>>) {
882        // Initialize test logger
883        let _ = tracing_subscriber::fmt()
884            .without_time()
885            .with_env_filter(
886                EnvFilter::builder()
887                    .with_default_directive(LevelFilter::DEBUG.into())
888                    .from_env()
889                    .unwrap(),
890            )
891            .try_init();
892
893        let db = Arc::new(MemoryDB::default());
894
895        // Populate DB with message roots used by chain4u
896        {
897            let empty_amt = Amt::<Cid, _>::new(&db).flush().unwrap();
898            db.put_cbor_default(&crate::blocks::TxMeta {
899                bls_message_root: empty_amt,
900                secp_message_root: empty_amt,
901            })
902            .unwrap();
903        }
904
905        // Create a chain of 5 tipsets using Chain4U
906        let c4u = Chain4U::with_blockstore(db.clone());
907        chain4u! {
908            in c4u;
909            [genesis_header = dummy_node(&db, 0)]
910        };
911
912        let cs = Arc::new(
913            ChainStore::new(
914                db.clone(),
915                db.clone(),
916                db.clone(),
917                Default::default(),
918                genesis_header.clone().into(),
919            )
920            .unwrap(),
921        );
922
923        cs.set_heaviest_tipset(cs.genesis_tipset()).unwrap();
924
925        (cs, c4u)
926    }
927
928    fn dummy_state(db: impl Blockstore, i: ChainEpoch) -> Cid {
929        db.put_cbor_default(&i).unwrap()
930    }
931
932    fn dummy_node(db: impl Blockstore, i: ChainEpoch) -> HeaderBuilder {
933        HeaderBuilder {
934            state_root: dummy_state(db, i).into(),
935            weight: BigInt::from(i).into(),
936            epoch: i.into(),
937            ..Default::default()
938        }
939    }
940
941    #[test]
942    fn test_state_machine_validation_order() {
943        let (cs, c4u) = setup();
944        let db = cs.blockstore().clone();
945
946        chain4u! {
947            from [genesis_header] in c4u;
948            [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)] -> [c = dummy_node(&db, 3)] -> [d = dummy_node(&db, 4)] -> [e = dummy_node(&db, 5)]
949        };
950
951        // Create the state machine
952        let mut state_machine = SyncStateMachine::new(cs, Default::default(), true);
953
954        // Insert tipsets in random order
955        let tipsets = vec![e, b, d, c, a];
956
957        // Convert each block into a FullTipset and add it to the state machine
958        for block in tipsets {
959            let full_tipset = FullTipset::new(vec![Block {
960                header: block.clone().into(),
961                bls_messages: vec![],
962                secp_messages: vec![],
963            }])
964            .unwrap();
965            state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
966        }
967
968        // Record validation order by processing all validation tasks in each iteration
969        let mut validation_tasks = Vec::new();
970        loop {
971            let (tasks, _) = state_machine.tasks();
972
973            // Find all validation tasks
974            let validation_tipsets: Vec<_> = tasks
975                .into_iter()
976                .filter_map(|task| {
977                    if let SyncTask::ValidateTipset {
978                        tipset,
979                        is_proposed_head,
980                    } = task
981                    {
982                        Some((tipset, is_proposed_head))
983                    } else {
984                        None
985                    }
986                })
987                .collect();
988
989            if validation_tipsets.is_empty() {
990                break;
991            }
992
993            // Record and mark all tipsets as validated
994            for (ts, is_proposed_head) in validation_tipsets {
995                validation_tasks.push(ts.epoch());
996                db.put_cbor_default(&ts.epoch()).unwrap();
997                state_machine.mark_validated_tipset(ts, is_proposed_head);
998            }
999        }
1000
1001        // We expect validation tasks for epochs 1 through 5 in order
1002        assert_eq!(validation_tasks, vec![1, 2, 3, 4, 5]);
1003    }
1004
1005    #[test]
1006    fn test_sync_state_machine_chain_fragments() {
1007        let (cs, c4u) = setup();
1008        let db = cs.blockstore().clone();
1009
1010        // Create a forked chain
1011        // genesis -> a -> b
1012        //            \--> c
1013        chain4u! {
1014            in c4u;
1015            [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)]
1016        };
1017        chain4u! {
1018            from [a] in c4u;
1019            [c = dummy_node(&db, 3)]
1020        };
1021
1022        // Create the state machine
1023        let mut state_machine = SyncStateMachine::new(cs, Default::default(), false);
1024
1025        // Convert each block into a FullTipset and add it to the state machine
1026        for block in [a, b, c] {
1027            let full_tipset = FullTipset::new(vec![Block {
1028                header: block.clone().into(),
1029                bls_messages: vec![],
1030                secp_messages: vec![],
1031            }])
1032            .unwrap();
1033            state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1034        }
1035
1036        let chains = state_machine
1037            .chains()
1038            .into_iter()
1039            .map(|v| {
1040                v.into_iter()
1041                    .map(|ts| ts.weight().to_i64().unwrap_or(0))
1042                    .collect_vec()
1043            })
1044            .collect_vec();
1045
1046        // Both chains should start at the same tipset
1047        assert_eq!(chains, vec![vec![1, 3], vec![1, 2]]);
1048    }
1049}