forest/chain_sync/
chain_follower.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3//! This module contains the logic for driving Forest forward in the Filecoin
4//! blockchain.
5//!
6//! Forest keeps track of the current heaviest tipset, and receives information
7//! about new blocks and tipsets from peers as well as connected miners. The
8//! state machine has the following rules:
9//! - A tipset is invalid if its parent is invalid.
10//! - If a tipset's parent isn't in our database, request it from the network.
11//! - If a tipset's parent has been validated, validate the tipset.
12//! - If a tipset is 1 day older than the heaviest tipset, the tipset is
13//!   invalid. This prevents Forest from following forks that will never be
14//!   accepted.
15//!
16//! The state machine does not do any network requests or validation. Those are
17//! handled by an external actor.
18
19use super::network_context::SyncNetworkContext;
20use crate::{
21    blocks::{Block, FullTipset, Tipset, TipsetKey},
22    chain::ChainStore,
23    chain_sync::{
24        ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
25        bad_block_cache::BadBlockCache, metrics, tipset_syncer::validate_tipset,
26    },
27    libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
28    message_pool::{MessagePool, MpoolRpcProvider},
29    networks::calculate_expected_epoch,
30    shim::clock::ChainEpoch,
31    state_manager::StateManager,
32    utils::misc::env::is_env_truthy,
33};
34use ahash::{HashMap, HashSet};
35use chrono::Utc;
36use cid::Cid;
37use fvm_ipld_blockstore::Blockstore;
38use itertools::Itertools;
39use libp2p::PeerId;
40use parking_lot::{Mutex, RwLock};
41use std::{ops::Deref as _, sync::Arc, time::Instant};
42use tokio::{sync::Notify, task::JoinSet};
43use tracing::{debug, error, info, trace, warn};
44
45pub struct ChainFollower<DB> {
46    /// Syncing status of the chain
47    pub sync_status: SyncStatus,
48
49    /// manages retrieving and updates state objects
50    state_manager: Arc<StateManager<DB>>,
51
52    /// Context to be able to send requests to P2P network
53    pub network: SyncNetworkContext<DB>,
54
55    /// Genesis tipset
56    genesis: Arc<Tipset>,
57
58    /// Bad blocks cache, updates based on invalid state transitions.
59    /// Will mark any invalid blocks and all children as bad in this bounded
60    /// cache
61    pub bad_blocks: Option<Arc<BadBlockCache>>,
62
63    /// Incoming network events to be handled by synchronizer
64    net_handler: flume::Receiver<NetworkEvent>,
65
66    /// Tipset channel sender
67    pub tipset_sender: flume::Sender<Arc<FullTipset>>,
68
69    /// Tipset channel receiver
70    tipset_receiver: flume::Receiver<Arc<FullTipset>>,
71
72    /// When `stateless_mode` is true, forest connects to the P2P network but
73    /// does not execute any state transitions. This drastically reduces the
74    /// memory and disk footprint of Forest but also means that Forest will not
75    /// be able to validate the correctness of the chain.
76    stateless_mode: bool,
77
78    /// Message pool
79    mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
80}
81
82impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
83    pub fn new(
84        state_manager: Arc<StateManager<DB>>,
85        network: SyncNetworkContext<DB>,
86        genesis: Arc<Tipset>,
87        net_handler: flume::Receiver<NetworkEvent>,
88        stateless_mode: bool,
89        mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
90    ) -> Self {
91        let (tipset_sender, tipset_receiver) = flume::bounded(20);
92        let disable_bad_block_cache = is_env_truthy("FOREST_DISABLE_BAD_BLOCK_CACHE");
93        Self {
94            sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
95            state_manager,
96            network,
97            genesis,
98            bad_blocks: if disable_bad_block_cache {
99                tracing::warn!("bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`");
100                None
101            } else {
102                Some(Default::default())
103            },
104            net_handler,
105            tipset_sender,
106            tipset_receiver,
107            stateless_mode,
108            mem_pool,
109        }
110    }
111
112    pub async fn run(self) -> anyhow::Result<()> {
113        chain_follower(
114            self.state_manager,
115            self.bad_blocks,
116            self.net_handler,
117            self.tipset_receiver,
118            self.network,
119            self.mem_pool,
120            self.sync_status,
121            self.genesis,
122            self.stateless_mode,
123        )
124        .await
125    }
126}
127
128#[allow(clippy::too_many_arguments)]
129// We receive new full tipsets from the p2p swarm, and from miners that use Forest as their frontend.
130pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
131    state_manager: Arc<StateManager<DB>>,
132    bad_block_cache: Option<Arc<BadBlockCache>>,
133    network_rx: flume::Receiver<NetworkEvent>,
134    tipset_receiver: flume::Receiver<Arc<FullTipset>>,
135    network: SyncNetworkContext<DB>,
136    mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
137    sync_status: SyncStatus,
138    genesis: Arc<Tipset>,
139    stateless_mode: bool,
140) -> anyhow::Result<()> {
141    let state_changed = Arc::new(Notify::new());
142    let state_machine = Arc::new(Mutex::new(SyncStateMachine::new(
143        state_manager.chain_store().clone(),
144        bad_block_cache.clone(),
145        stateless_mode,
146    )));
147    let tasks: Arc<Mutex<HashSet<SyncTask>>> = Arc::new(Mutex::new(HashSet::default()));
148
149    let mut set = JoinSet::new();
150
151    // Increment metrics, update peer information, and forward tipsets to the state machine.
152    set.spawn({
153        let state_manager = state_manager.clone();
154        let state_changed = state_changed.clone();
155        let state_machine = state_machine.clone();
156        let network = network.clone();
157        async move {
158            while let Ok(event) = network_rx.recv_async().await {
159                inc_gossipsub_event_metrics(&event);
160
161                update_peer_info(
162                    &event,
163                    network.clone(),
164                    state_manager.chain_store().clone(),
165                    &genesis,
166                );
167
168                let Ok(tipset) = (match event {
169                    NetworkEvent::HelloResponseOutbound { request, source } => {
170                        let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
171                        get_full_tipset(
172                            network.clone(),
173                            state_manager.chain_store().clone(),
174                            Some(source),
175                            &tipset_keys,
176                        )
177                        .await
178                        .inspect_err(|e| debug!("Querying full tipset failed: {}", e))
179                    }
180                    NetworkEvent::PubsubMessage { message } => match message {
181                        PubsubMessage::Block(b) => {
182                            let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
183                            get_full_tipset(
184                                network.clone(),
185                                state_manager.chain_store().clone(),
186                                None,
187                                &key,
188                            )
189                            .await
190                        }
191                        PubsubMessage::Message(m) => {
192                            if let Err(why) = mem_pool.add(m) {
193                                debug!("Received invalid GossipSub message: {}", why);
194                            }
195                            continue;
196                        }
197                    },
198                    _ => continue,
199                }) else {
200                    continue;
201                };
202                {
203                    state_machine
204                        .lock()
205                        .update(SyncEvent::NewFullTipsets(vec![Arc::new(tipset)]));
206                    state_changed.notify_one();
207                }
208            }
209        }
210    });
211
212    // Forward tipsets from miners into the state machine.
213    set.spawn({
214        let state_changed = state_changed.clone();
215        let state_machine = state_machine.clone();
216
217        async move {
218            while let Ok(tipset) = tipset_receiver.recv_async().await {
219                state_machine
220                    .lock()
221                    .update(SyncEvent::NewFullTipsets(vec![tipset]));
222                state_changed.notify_one();
223            }
224        }
225    });
226
227    // When the state machine is updated, we need to update the sync status and spawn tasks
228    set.spawn({
229        let state_manager = state_manager.clone();
230        let state_machine = state_machine.clone();
231        let state_changed = state_changed.clone();
232        let tasks = tasks.clone();
233        let bad_block_cache = bad_block_cache.clone();
234        async move {
235            loop {
236                state_changed.notified().await;
237
238                let mut tasks_set = tasks.lock();
239                let (task_vec, current_active_forks) = state_machine.lock().tasks();
240
241                // Update the sync states
242                {
243                    let mut status_report_guard = sync_status.write();
244                    status_report_guard.update(
245                        &state_manager,
246                        current_active_forks,
247                        stateless_mode,
248                    );
249                }
250
251                for task in task_vec {
252                    // insert task into tasks. If task is already in tasks, skip. If it is not, spawn it.
253                    let new = tasks_set.insert(task.clone());
254                    if new {
255                        let tasks_clone = tasks.clone();
256                        let action = task.clone().execute(
257                            network.clone(),
258                            state_manager.clone(),
259                            stateless_mode,
260                            bad_block_cache.clone(),
261                        );
262                        tokio::spawn({
263                            let state_machine = state_machine.clone();
264                            let state_changed = state_changed.clone();
265                            async move {
266                                if let Some(event) = action.await {
267                                    state_machine.lock().update(event);
268                                    state_changed.notify_one();
269                                }
270                                tasks_clone.lock().remove(&task);
271                            }
272                        });
273                    }
274                }
275            }
276        }
277    });
278
279    // Periodically report progress if there are any tipsets left to be fetched.
280    // Once we're in steady-state (i.e. caught up to HEAD) and there are no
281    // active forks, this will not report anything.
282    set.spawn({
283        let state_manager = state_manager.clone();
284        let state_machine = state_machine.clone();
285        async move {
286            loop {
287                tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
288                let (tasks_set, _) = state_machine.lock().tasks();
289                let heaviest_tipset = state_manager.chain_store().heaviest_tipset();
290                let heaviest_epoch = heaviest_tipset.epoch();
291
292                let to_download = tasks_set
293                    .iter()
294                    .filter_map(|task| match task {
295                        SyncTask::FetchTipset(_, epoch) => Some(epoch - heaviest_epoch),
296                        _ => None,
297                    })
298                    .max()
299                    .unwrap_or(0);
300
301                let expected_head = calculate_expected_epoch(
302                    Utc::now().timestamp() as u64,
303                    state_manager.chain_store().genesis_block_header().timestamp,
304                    state_manager.chain_config().block_delay_secs,
305                );
306
307                // Only print 'Catching up to HEAD' if we're more than 10 epochs
308                // behind. Otherwise it can be too spammy.
309                match (expected_head - heaviest_epoch > 10, to_download > 0) {
310                    (true, true) => info!(
311                        "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}, downloading {to_download} tipsets"
312                        , heaviest_tipset.key()
313                    ),
314                    (true, false) => info!(
315                        "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}"
316                        , heaviest_tipset.key()
317                    ),
318                    (false, true) => {
319                        info!("Downloading {to_download} tipsets")
320                    }
321                    (false, false) => {}
322                }
323            }
324        }
325    });
326
327    set.join_all().await;
328    Ok(())
329}
330
331// Increment the gossipsub event metrics.
332fn inc_gossipsub_event_metrics(event: &NetworkEvent) {
333    let label = match event {
334        NetworkEvent::HelloRequestInbound => metrics::values::HELLO_REQUEST_INBOUND,
335        NetworkEvent::HelloResponseOutbound { .. } => metrics::values::HELLO_RESPONSE_OUTBOUND,
336        NetworkEvent::HelloRequestOutbound => metrics::values::HELLO_REQUEST_OUTBOUND,
337        NetworkEvent::HelloResponseInbound => metrics::values::HELLO_RESPONSE_INBOUND,
338        NetworkEvent::PeerConnected(_) => metrics::values::PEER_CONNECTED,
339        NetworkEvent::PeerDisconnected(_) => metrics::values::PEER_DISCONNECTED,
340        NetworkEvent::PubsubMessage { message } => match message {
341            PubsubMessage::Block(_) => metrics::values::PUBSUB_BLOCK,
342            PubsubMessage::Message(_) => metrics::values::PUBSUB_MESSAGE,
343        },
344        NetworkEvent::ChainExchangeRequestOutbound => {
345            metrics::values::CHAIN_EXCHANGE_REQUEST_OUTBOUND
346        }
347        NetworkEvent::ChainExchangeResponseInbound => {
348            metrics::values::CHAIN_EXCHANGE_RESPONSE_INBOUND
349        }
350        NetworkEvent::ChainExchangeRequestInbound => {
351            metrics::values::CHAIN_EXCHANGE_REQUEST_INBOUND
352        }
353        NetworkEvent::ChainExchangeResponseOutbound => {
354            metrics::values::CHAIN_EXCHANGE_RESPONSE_OUTBOUND
355        }
356    };
357
358    metrics::LIBP2P_MESSAGE_TOTAL.get_or_create(&label).inc();
359}
360
361// Keep our peer manager up to date.
362fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
363    event: &NetworkEvent,
364    network: SyncNetworkContext<DB>,
365    chain_store: Arc<ChainStore<DB>>,
366    genesis: &Tipset,
367) {
368    match event {
369        NetworkEvent::PeerConnected(peer_id) => {
370            let genesis_cid = *genesis.block_headers().first().cid();
371            // Spawn and immediately move on to the next event
372            tokio::task::spawn(handle_peer_connected_event(
373                network,
374                chain_store,
375                *peer_id,
376                genesis_cid,
377            ));
378        }
379        NetworkEvent::PeerDisconnected(peer_id) => {
380            handle_peer_disconnected_event(network, *peer_id);
381        }
382        _ => {}
383    }
384}
385
386async fn handle_peer_connected_event<DB: Blockstore + Sync + Send + 'static>(
387    network: SyncNetworkContext<DB>,
388    chain_store: Arc<ChainStore<DB>>,
389    peer_id: PeerId,
390    genesis_block_cid: Cid,
391) {
392    // Query the heaviest TipSet from the store
393    if network.peer_manager().is_peer_new(&peer_id) {
394        // Since the peer is new, send them a hello request
395        // Query the heaviest TipSet from the store
396        let heaviest = chain_store.heaviest_tipset();
397        let request = HelloRequest {
398            heaviest_tip_set: heaviest.cids(),
399            heaviest_tipset_height: heaviest.epoch(),
400            heaviest_tipset_weight: heaviest.weight().clone().into(),
401            genesis_cid: genesis_block_cid,
402        };
403        let (peer_id, moment_sent, response) = match network.hello_request(peer_id, request).await {
404            Ok(response) => response,
405            Err(e) => {
406                debug!("Hello request failed: {}", e);
407                return;
408            }
409        };
410        let dur = Instant::now().duration_since(moment_sent);
411
412        // Update the peer metadata based on the response
413        match response {
414            Some(_) => {
415                network.peer_manager().log_success(&peer_id, dur);
416            }
417            None => {
418                network.peer_manager().log_failure(&peer_id, dur);
419            }
420        }
421    }
422}
423
424fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
425    network: SyncNetworkContext<DB>,
426    peer_id: PeerId,
427) {
428    network.peer_manager().remove_peer(&peer_id);
429    network.peer_manager().unmark_peer_bad(&peer_id);
430}
431
432pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
433    network: SyncNetworkContext<DB>,
434    chain_store: Arc<ChainStore<DB>>,
435    peer_id: Option<PeerId>,
436    tipset_keys: &TipsetKey,
437) -> anyhow::Result<FullTipset> {
438    // Attempt to load from the store
439    if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys) {
440        return Ok(full_tipset);
441    }
442    // Load from the network
443    let tipset = network
444        .chain_exchange_full_tipset(peer_id, tipset_keys)
445        .await
446        .map_err(|e| anyhow::anyhow!(e))?;
447    tipset.persist(chain_store.blockstore())?;
448
449    Ok(tipset)
450}
451
452async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
453    network: SyncNetworkContext<DB>,
454    chain_store: Arc<ChainStore<DB>>,
455    peer_id: Option<PeerId>,
456    tipset_keys: &TipsetKey,
457) -> anyhow::Result<Vec<FullTipset>> {
458    // Attempt to load from the store
459    if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys) {
460        return Ok(vec![full_tipset]);
461    }
462    // Load from the network
463    let tipsets = network
464        .chain_exchange_full_tipsets(peer_id, tipset_keys)
465        .await
466        .map_err(|e| anyhow::anyhow!(e))?;
467
468    for tipset in tipsets.iter() {
469        tipset.persist(chain_store.blockstore())?;
470    }
471
472    Ok(tipsets)
473}
474
475pub fn load_full_tipset<DB: Blockstore>(
476    chain_store: &ChainStore<DB>,
477    tipset_keys: &TipsetKey,
478) -> anyhow::Result<FullTipset> {
479    // Retrieve tipset from store based on passed in TipsetKey
480    let ts = chain_store
481        .chain_index()
482        .load_required_tipset(tipset_keys)?;
483    let blocks: Vec<_> = ts
484        .block_headers()
485        .iter()
486        .map(|header| -> anyhow::Result<Block> {
487            let (bls_msgs, secp_msgs) =
488                crate::chain::block_messages(chain_store.blockstore(), header)?;
489            Ok(Block {
490                header: header.clone(),
491                bls_messages: bls_msgs,
492                secp_messages: secp_msgs,
493            })
494        })
495        .try_collect()?;
496    // Construct FullTipset
497    let fts = FullTipset::new(blocks)?;
498    Ok(fts)
499}
500
501enum SyncEvent {
502    NewFullTipsets(Vec<Arc<FullTipset>>),
503    BadTipset(Arc<FullTipset>),
504    ValidatedTipset {
505        tipset: Arc<FullTipset>,
506        is_proposed_head: bool,
507    },
508}
509
510impl std::fmt::Display for SyncEvent {
511    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512        fn tss_to_string(tss: &[Arc<FullTipset>]) -> String {
513            format!(
514                "epoch: {}-{}",
515                tss.first().map(|ts| ts.epoch()).unwrap_or_default(),
516                tss.last().map(|ts| ts.epoch()).unwrap_or_default()
517            )
518        }
519
520        match self {
521            Self::NewFullTipsets(tss) => write!(f, "NewFullTipsets({})", tss_to_string(tss)),
522            Self::BadTipset(ts) => {
523                write!(f, "BadTipset(epoch: {}, key: {})", ts.epoch(), ts.key())
524            }
525            Self::ValidatedTipset {
526                tipset,
527                is_proposed_head,
528            } => write!(
529                f,
530                "ValidatedTipset(epoch: {}, key: {}, is_proposed_head: {is_proposed_head})",
531                tipset.epoch(),
532                tipset.key()
533            ),
534        }
535    }
536}
537
538struct SyncStateMachine<DB> {
539    cs: Arc<ChainStore<DB>>,
540    bad_block_cache: Option<Arc<BadBlockCache>>,
541    // Map from TipsetKey to FullTipset
542    tipsets: HashMap<TipsetKey, Arc<FullTipset>>,
543    stateless_mode: bool,
544}
545
546impl<DB: Blockstore> SyncStateMachine<DB> {
547    pub fn new(
548        cs: Arc<ChainStore<DB>>,
549        bad_block_cache: Option<Arc<BadBlockCache>>,
550        stateless_mode: bool,
551    ) -> Self {
552        Self {
553            cs,
554            bad_block_cache,
555            tipsets: HashMap::default(),
556            stateless_mode,
557        }
558    }
559
560    // Compute the list of chains from the tipsets map
561    fn chains(&self) -> Vec<Vec<Arc<FullTipset>>> {
562        let mut chains = Vec::new();
563        let mut remaining_tipsets = self.tipsets.clone();
564
565        while let Some(heaviest) = remaining_tipsets
566            .values()
567            .max_by_key(|ts| ts.weight())
568            .cloned()
569        {
570            // Build chain starting from heaviest
571            let mut chain = Vec::new();
572            let mut current = Some(heaviest);
573
574            while let Some(tipset) = current.take() {
575                remaining_tipsets.remove(tipset.key());
576
577                // Find parent in tipsets map
578                current = self.tipsets.get(tipset.parents()).cloned();
579
580                chain.push(tipset);
581            }
582            chain.reverse();
583            chains.push(chain);
584        }
585
586        chains
587    }
588
589    fn is_parent_validated(&self, tipset: &FullTipset) -> bool {
590        let db = self.cs.blockstore();
591        self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
592    }
593
594    fn is_ready_for_validation(&self, tipset: &FullTipset) -> bool {
595        if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
596            // Skip validation in stateless mode and for genesis tipset
597            true
598        } else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
599            let head_ts = self.cs.heaviest_tipset();
600            // Treat post-head-epoch tipsets as not validated to fix <https://github.com/ChainSafe/forest/issues/5677>
601            // basically, the follow task should always start from the current head which could be manually set
602            // to an old one. When a post-head-epoch tipset is considered validated, it could mess up the state machine
603            // in some edge cases and the node ends up being stuck with ever-empty sync task queue as reported
604            // in <https://github.com/ChainSafe/forest/issues/5679>.
605            if parent_ts.key() == head_ts.key() {
606                true
607            } else if parent_ts.epoch() >= head_ts.epoch() {
608                false
609            } else {
610                self.is_parent_validated(tipset)
611            }
612        } else {
613            false
614        }
615    }
616
617    fn add_full_tipset(&mut self, tipset: Arc<FullTipset>) {
618        if let Err(why) = TipsetValidator(&tipset).validate(
619            &self.cs,
620            self.bad_block_cache.as_ref().map(AsRef::as_ref),
621            &self.cs.genesis_tipset(),
622            self.cs.chain_config().block_delay_secs,
623        ) {
624            metrics::INVALID_TIPSET_TOTAL.inc();
625            trace!("Skipping invalid tipset: {}", why);
626            self.mark_bad_tipset(tipset);
627            return;
628        }
629
630        // Check if tipset is outside the chain_finality window
631        let heaviest = self.cs.heaviest_tipset();
632        let epoch_diff = heaviest.epoch() - tipset.epoch();
633
634        if epoch_diff > self.cs.chain_config().policy.chain_finality {
635            self.mark_bad_tipset(tipset);
636            return;
637        }
638
639        // Check if tipset already exists
640        if self.tipsets.contains_key(tipset.key()) {
641            return;
642        }
643
644        // Find any existing tipsets with same epoch and parents
645        let mut to_remove = Vec::new();
646        #[allow(clippy::mutable_key_type)]
647        let mut merged_blocks: HashSet<_> = tipset.blocks().iter().cloned().collect();
648
649        // Collect all parent references from existing tipsets
650        let parent_refs: HashSet<_> = self
651            .tipsets
652            .values()
653            .map(|ts| ts.parents().clone())
654            .collect();
655
656        for (key, existing_ts) in self.tipsets.iter() {
657            if existing_ts.epoch() == tipset.epoch() && existing_ts.parents() == tipset.parents() {
658                // Only mark for removal if not referenced as a parent
659                if !parent_refs.contains(key) {
660                    to_remove.push(key.clone());
661                }
662                // Add blocks from existing tipset - HashSet handles deduplication automatically
663                merged_blocks.extend(existing_ts.blocks().iter().cloned());
664            }
665        }
666
667        // Remove old tipsets that were merged and aren't referenced
668        for key in to_remove {
669            self.tipsets.remove(&key);
670        }
671
672        // Create and insert new merged tipset
673        if let Ok(merged_tipset) = FullTipset::new(merged_blocks) {
674            self.tipsets
675                .insert(merged_tipset.key().clone(), Arc::new(merged_tipset));
676        }
677    }
678
679    // Mark blocks in tipset as bad.
680    // Mark all descendants of tipsets as bad.
681    // Remove all bad tipsets from the tipset map.
682    fn mark_bad_tipset(&mut self, tipset: Arc<FullTipset>) {
683        let mut stack = vec![tipset];
684        while let Some(tipset) = stack.pop() {
685            self.tipsets.remove(tipset.key());
686            // Find all descendant tipsets (tipsets that have this tipset as a parent)
687            let mut to_remove = Vec::new();
688            let mut descendants = Vec::new();
689
690            for (key, ts) in self.tipsets.iter() {
691                if ts.parents() == tipset.key() {
692                    to_remove.push(key.clone());
693                    descendants.push(ts.clone());
694                }
695            }
696
697            // Remove bad tipsets from the map
698            for key in to_remove {
699                self.tipsets.remove(&key);
700            }
701
702            // Mark descendants as bad
703            stack.extend(descendants);
704        }
705    }
706
707    fn mark_validated_tipset(&mut self, tipset: Arc<FullTipset>, is_proposed_head: bool) {
708        if !self.is_parent_validated(&tipset) {
709            tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), "Tipset must be validated");
710            return;
711        }
712
713        self.tipsets.remove(tipset.key());
714        let tipset = tipset.deref().clone().into_tipset();
715        // cs.put_tipset requires state and doesn't work in stateless mode
716        if self.stateless_mode {
717            let epoch = tipset.epoch();
718            let terse_key = tipset.key().terse();
719            if self.cs.heaviest_tipset().weight() < tipset.weight() {
720                if let Err(e) = self.cs.set_heaviest_tipset(Arc::new(tipset)) {
721                    error!("Error setting heaviest tipset: {}", e);
722                } else {
723                    info!("Heaviest tipset: {} ({})", epoch, terse_key);
724                }
725            }
726        } else if is_proposed_head {
727            if let Err(e) = self.cs.put_tipset(&tipset) {
728                error!("Error putting tipset: {e}");
729            }
730        } else if let Err(e) = self.cs.set_heaviest_tipset(tipset.into()) {
731            error!("Error setting heaviest tipset: {e}");
732        }
733    }
734
735    pub fn update(&mut self, event: SyncEvent) {
736        tracing::trace!("update: {event}");
737        match event {
738            SyncEvent::NewFullTipsets(tipsets) => {
739                for tipset in tipsets {
740                    self.add_full_tipset(tipset);
741                }
742            }
743            SyncEvent::BadTipset(tipset) => self.mark_bad_tipset(tipset),
744            SyncEvent::ValidatedTipset {
745                tipset,
746                is_proposed_head,
747            } => self.mark_validated_tipset(tipset, is_proposed_head),
748        }
749    }
750
751    pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
752        // Get the node's current validated head epoch once, as it's the same for all forks.
753        let current_validated_epoch = self.cs.heaviest_tipset().epoch();
754        let now = Utc::now();
755
756        let mut active_sync_info = Vec::new();
757        let mut tasks = Vec::new();
758        for chain in self.chains() {
759            if let Some(first_ts) = chain.first() {
760                let last_ts = chain.last().expect("Infallible");
761                let stage: ForkSyncStage;
762                let start_time = Some(now);
763
764                if !self.is_ready_for_validation(first_ts) {
765                    stage = ForkSyncStage::FetchingHeaders;
766                    tasks.push(SyncTask::FetchTipset(
767                        first_ts.parents().clone(),
768                        first_ts.epoch(),
769                    ));
770                } else {
771                    stage = ForkSyncStage::ValidatingTipsets;
772                    tasks.push(SyncTask::ValidateTipset {
773                        tipset: first_ts.clone(),
774                        is_proposed_head: chain.len() == 1,
775                    });
776                }
777
778                let fork_info = ForkSyncInfo {
779                    target_tipset_key: last_ts.key().clone(),
780                    target_epoch: last_ts.epoch(),
781                    target_sync_epoch_start: first_ts.epoch(),
782                    stage,
783                    validated_chain_head_epoch: current_validated_epoch,
784                    start_time,
785                    last_updated: Some(now),
786                };
787
788                active_sync_info.push(fork_info);
789            }
790        }
791        (tasks, active_sync_info)
792    }
793}
794
795#[derive(PartialEq, Eq, Hash, Clone, Debug)]
796enum SyncTask {
797    ValidateTipset {
798        tipset: Arc<FullTipset>,
799        is_proposed_head: bool,
800    },
801    FetchTipset(TipsetKey, ChainEpoch),
802}
803
804impl std::fmt::Display for SyncTask {
805    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
806        match self {
807            SyncTask::ValidateTipset {
808                tipset,
809                is_proposed_head,
810            } => write!(
811                f,
812                "ValidateTipset(epoch: {}, is_proposed_head: {is_proposed_head})",
813                tipset.epoch()
814            ),
815            SyncTask::FetchTipset(key, epoch) => {
816                let s = key.to_string();
817                write!(
818                    f,
819                    "FetchTipset({}, epoch: {})",
820                    &s[s.len().saturating_sub(8)..],
821                    epoch
822                )
823            }
824        }
825    }
826}
827
828impl SyncTask {
829    async fn execute<DB: Blockstore + Sync + Send + 'static>(
830        self,
831        network: SyncNetworkContext<DB>,
832        state_manager: Arc<StateManager<DB>>,
833        stateless_mode: bool,
834        bad_block_cache: Option<Arc<BadBlockCache>>,
835    ) -> Option<SyncEvent> {
836        tracing::trace!("SyncTask::execute {self}");
837        let cs = state_manager.chain_store();
838        match self {
839            SyncTask::ValidateTipset {
840                tipset,
841                is_proposed_head,
842            } if stateless_mode => Some(SyncEvent::ValidatedTipset {
843                tipset,
844                is_proposed_head,
845            }),
846            SyncTask::ValidateTipset {
847                tipset,
848                is_proposed_head,
849            } => {
850                let genesis = cs.genesis_tipset();
851                match validate_tipset(
852                    state_manager.clone(),
853                    cs,
854                    tipset.deref().clone(),
855                    &genesis,
856                    bad_block_cache,
857                )
858                .await
859                {
860                    Ok(()) => Some(SyncEvent::ValidatedTipset {
861                        tipset,
862                        is_proposed_head,
863                    }),
864                    Err(e) => {
865                        warn!("Error validating tipset: {}", e);
866                        Some(SyncEvent::BadTipset(tipset))
867                    }
868                }
869            }
870            SyncTask::FetchTipset(key, epoch) => {
871                match get_full_tipset_batch(network.clone(), cs.clone(), None, &key).await {
872                    Ok(parents) => Some(SyncEvent::NewFullTipsets(
873                        parents.into_iter().map(Arc::new).collect(),
874                    )),
875                    Err(e) => {
876                        tracing::warn!(%key, %epoch, "failed to fetch tipset: {e}");
877                        None
878                    }
879                }
880            }
881        }
882    }
883}
884
885#[cfg(test)]
886mod tests {
887    use super::*;
888    use crate::blocks::{Chain4U, HeaderBuilder, chain4u};
889    use crate::db::MemoryDB;
890    use crate::utils::db::CborStoreExt as _;
891    use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
892    use num_bigint::BigInt;
893    use num_traits::ToPrimitive;
894    use std::sync::Arc;
895
896    fn setup() -> (Arc<ChainStore<MemoryDB>>, Chain4U<Arc<MemoryDB>>) {
897        // Initialize test logger
898        let _ = tracing_subscriber::fmt()
899            .with_env_filter(
900                tracing_subscriber::EnvFilter::from_default_env()
901                    .add_directive(tracing::Level::DEBUG.into()),
902            )
903            .try_init();
904
905        let db = Arc::new(MemoryDB::default());
906
907        // Populate DB with message roots used by chain4u
908        {
909            let empty_amt = Amt::<Cid, _>::new(&db).flush().unwrap();
910            db.put_cbor_default(&crate::blocks::TxMeta {
911                bls_message_root: empty_amt,
912                secp_message_root: empty_amt,
913            })
914            .unwrap();
915        }
916
917        // Create a chain of 5 tipsets using Chain4U
918        let c4u = Chain4U::with_blockstore(db.clone());
919        chain4u! {
920            in c4u;
921            [genesis_header = dummy_node(&db, 0)]
922        };
923
924        let cs = Arc::new(
925            ChainStore::new(
926                db.clone(),
927                db.clone(),
928                db.clone(),
929                db.clone(),
930                Default::default(),
931                genesis_header.clone().into(),
932            )
933            .unwrap(),
934        );
935
936        cs.set_heaviest_tipset(Arc::new(cs.genesis_tipset()))
937            .unwrap();
938
939        (cs, c4u)
940    }
941
942    fn dummy_state(db: impl Blockstore, i: ChainEpoch) -> Cid {
943        db.put_cbor_default(&i).unwrap()
944    }
945
946    fn dummy_node(db: impl Blockstore, i: ChainEpoch) -> HeaderBuilder {
947        HeaderBuilder {
948            state_root: dummy_state(db, i).into(),
949            weight: BigInt::from(i).into(),
950            epoch: i.into(),
951            ..Default::default()
952        }
953    }
954
955    #[test]
956    fn test_state_machine_validation_order() {
957        let (cs, c4u) = setup();
958        let db = cs.blockstore().clone();
959
960        chain4u! {
961            from [genesis_header] in c4u;
962            [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)] -> [c = dummy_node(&db, 3)] -> [d = dummy_node(&db, 4)] -> [e = dummy_node(&db, 5)]
963        };
964
965        // Create the state machine
966        let mut state_machine = SyncStateMachine::new(cs, Default::default(), true);
967
968        // Insert tipsets in random order
969        let tipsets = vec![e, b, d, c, a];
970
971        // Convert each block into a FullTipset and add it to the state machine
972        for block in tipsets {
973            let full_tipset = FullTipset::new(vec![Block {
974                header: block.clone().into(),
975                bls_messages: vec![],
976                secp_messages: vec![],
977            }])
978            .unwrap();
979            state_machine.update(SyncEvent::NewFullTipsets(vec![Arc::new(full_tipset)]));
980        }
981
982        // Record validation order by processing all validation tasks in each iteration
983        let mut validation_tasks = Vec::new();
984        loop {
985            let (tasks, _) = state_machine.tasks();
986
987            // Find all validation tasks
988            let validation_tipsets: Vec<_> = tasks
989                .into_iter()
990                .filter_map(|task| {
991                    if let SyncTask::ValidateTipset {
992                        tipset,
993                        is_proposed_head,
994                    } = task
995                    {
996                        Some((tipset, is_proposed_head))
997                    } else {
998                        None
999                    }
1000                })
1001                .collect();
1002
1003            if validation_tipsets.is_empty() {
1004                break;
1005            }
1006
1007            // Record and mark all tipsets as validated
1008            for (ts, is_proposed_head) in validation_tipsets {
1009                validation_tasks.push(ts.epoch());
1010                db.put_cbor_default(&ts.epoch()).unwrap();
1011                state_machine.mark_validated_tipset(ts, is_proposed_head);
1012            }
1013        }
1014
1015        // We expect validation tasks for epochs 1 through 5 in order
1016        assert_eq!(validation_tasks, vec![1, 2, 3, 4, 5]);
1017    }
1018
1019    #[test]
1020    fn test_sync_state_machine_chain_fragments() {
1021        let (cs, c4u) = setup();
1022        let db = cs.blockstore().clone();
1023
1024        // Create a forked chain
1025        // genesis -> a -> b
1026        //            \--> c
1027        chain4u! {
1028            in c4u;
1029            [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)]
1030        };
1031        chain4u! {
1032            from [a] in c4u;
1033            [c = dummy_node(&db, 3)]
1034        };
1035
1036        // Create the state machine
1037        let mut state_machine = SyncStateMachine::new(cs, Default::default(), false);
1038
1039        // Convert each block into a FullTipset and add it to the state machine
1040        for block in [a, b, c] {
1041            let full_tipset = FullTipset::new(vec![Block {
1042                header: block.clone().into(),
1043                bls_messages: vec![],
1044                secp_messages: vec![],
1045            }])
1046            .unwrap();
1047            state_machine.update(SyncEvent::NewFullTipsets(vec![Arc::new(full_tipset)]));
1048        }
1049
1050        let chains = state_machine
1051            .chains()
1052            .into_iter()
1053            .map(|v| {
1054                v.into_iter()
1055                    .map(|ts| ts.weight().to_i64().unwrap_or(0))
1056                    .collect()
1057            })
1058            .collect::<Vec<Vec<_>>>();
1059
1060        // Both chains should start at the same tipset
1061        assert_eq!(chains, vec![vec![1, 3], vec![1, 2]]);
1062    }
1063}