1use super::network_context::SyncNetworkContext;
20use crate::{
21 blocks::{Block, FullTipset, Tipset, TipsetKey},
22 chain::ChainStore,
23 chain_sync::{
24 ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
25 bad_block_cache::{BadBlockCache, SeenBlockCache},
26 metrics,
27 tipset_syncer::{TipsetSyncerError, validate_tipset},
28 validation::GossipBlockValidator,
29 },
30 libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
31 message_pool::MessagePool,
32 networks::calculate_expected_epoch,
33 shim::clock::ChainEpoch,
34 state_manager::StateManager,
35 utils::ShallowClone as _,
36};
37use ahash::{HashMap, HashSet};
38use chrono::Utc;
39use cid::Cid;
40use fvm_ipld_blockstore::Blockstore;
41use itertools::Itertools;
42use libp2p::PeerId;
43use parking_lot::{Mutex, RwLock};
44use std::{sync::Arc, time::Instant};
45use tokio::{sync::Notify, task::JoinSet};
46use tracing::{debug, error, info, trace, warn};
47
48pub struct ChainFollower<DB> {
49 tasks: Arc<Mutex<HashSet<SyncTask>>>,
51
52 state_machine: Arc<Mutex<SyncStateMachine<DB>>>,
54
55 pub sync_status: SyncStatus,
57
58 pub state_manager: Arc<StateManager<DB>>,
60
61 pub network: SyncNetworkContext<DB>,
63
64 genesis: Tipset,
66
67 pub bad_blocks: Option<Arc<BadBlockCache>>,
71
72 net_handler: flume::Receiver<NetworkEvent>,
74
75 pub tipset_sender: flume::Sender<FullTipset>,
77
78 tipset_receiver: flume::Receiver<FullTipset>,
80
81 stateless_mode: bool,
86
87 mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
89}
90
91impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
92 pub fn new(
93 state_manager: Arc<StateManager<DB>>,
94 network: SyncNetworkContext<DB>,
95 genesis: Tipset,
96 net_handler: flume::Receiver<NetworkEvent>,
97 stateless_mode: bool,
98 mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
99 ) -> Self {
100 crate::def_is_env_truthy!(cache_disabled, "FOREST_DISABLE_BAD_BLOCK_CACHE");
101 let (tipset_sender, tipset_receiver) = flume::bounded(20);
102 let tasks: Arc<Mutex<HashSet<SyncTask>>> = Arc::new(Mutex::new(HashSet::default()));
103 let bad_blocks = if cache_disabled() {
104 tracing::warn!("bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`");
105 None
106 } else {
107 Some(Default::default())
108 };
109 let state_machine = Arc::new(Mutex::new(SyncStateMachine::new(
110 state_manager.chain_store().clone(),
111 bad_blocks.clone(),
112 stateless_mode,
113 )));
114 Self {
115 tasks,
116 state_machine,
117 sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
118 state_manager,
119 network,
120 genesis,
121 bad_blocks,
122 net_handler,
123 tipset_sender,
124 tipset_receiver,
125 stateless_mode,
126 mem_pool,
127 }
128 }
129
130 pub fn reset(&self) {
132 let start = Instant::now();
133 self.tasks.lock().clear();
134 self.state_manager
135 .chain_store()
136 .validated_blocks
137 .lock()
138 .clear();
139 self.state_machine.lock().tipsets.clear();
140 if let Some(bad_blocks) = &self.bad_blocks {
141 bad_blocks.clear();
142 }
143 tracing::info!(
144 "chain follower reset, took {}",
145 humantime::format_duration(start.elapsed())
146 );
147 }
148
149 pub async fn run(&self) -> anyhow::Result<()> {
150 chain_follower(
151 &self.tasks,
152 &self.state_machine,
153 &self.state_manager,
154 self.bad_blocks.clone(),
155 self.net_handler.clone(),
156 self.tipset_receiver.clone(),
157 &self.network,
158 &self.mem_pool,
159 &self.sync_status,
160 &self.genesis,
161 self.stateless_mode,
162 )
163 .await
164 }
165}
166
167#[allow(clippy::too_many_arguments)]
168async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
170 tasks: &Arc<Mutex<HashSet<SyncTask>>>,
171 state_machine: &Arc<Mutex<SyncStateMachine<DB>>>,
172 state_manager: &Arc<StateManager<DB>>,
173 bad_block_cache: Option<Arc<BadBlockCache>>,
174 network_rx: flume::Receiver<NetworkEvent>,
175 tipset_receiver: flume::Receiver<FullTipset>,
176 network: &SyncNetworkContext<DB>,
177 mem_pool: &Arc<MessagePool<Arc<ChainStore<DB>>>>,
178 sync_status: &SyncStatus,
179 genesis: &Tipset,
180 stateless_mode: bool,
181) -> anyhow::Result<()> {
182 let state_changed = Arc::new(Notify::new());
183
184 let seen_block_cache = SeenBlockCache::default();
185
186 let mut set = JoinSet::new();
187
188 set.spawn({
190 let state_manager = state_manager.shallow_clone();
191 let state_changed = state_changed.shallow_clone();
192 let state_machine = state_machine.shallow_clone();
193 let network = network.shallow_clone();
194 let mem_pool = mem_pool.shallow_clone();
195 let genesis = genesis.shallow_clone();
196 let bad_block_cache = bad_block_cache.shallow_clone();
197 let seen_block_cache = seen_block_cache.shallow_clone();
198 async move {
199 while let Ok(event) = network_rx.recv_async().await {
200 inc_gossipsub_event_metrics(&event);
201
202 update_peer_info(
203 &event,
204 &network,
205 state_manager.chain_store().clone(),
206 &genesis,
207 );
208
209 let Ok(tipset) = (match event {
210 NetworkEvent::HelloResponseOutbound { request, source } => {
211 let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
212 get_full_tipset(
213 &network,
214 state_manager.chain_store(),
215 Some(source),
216 &tipset_keys,
217 )
218 .await
219 .inspect_err(|e| debug!("Querying full tipset failed: {}", e))
220 }
221 NetworkEvent::PubsubMessage { message } => match message {
222 PubsubMessage::Block(b) => {
223 let cs = state_manager.chain_store();
224 let cfg = cs.chain_config();
225 if let Err(reason) = GossipBlockValidator::new(&b).validate_pre_fetch(
226 &genesis,
227 cfg.block_delay_secs,
228 cfg.policy.chain_finality,
229 cs.heaviest_tipset().epoch(),
230 bad_block_cache.as_deref(),
231 &seen_block_cache,
232 ) {
233 metrics::GOSSIP_BLOCK_REJECTED_TOTAL
234 .get_or_create(&metrics::GossipRejectReasonLabel {
235 reason: reason.label(),
236 })
237 .inc();
238 debug!("Rejected gossip block {}: {reason}", b.header.cid());
239 continue;
240 }
241 let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
242 get_full_tipset(&network, cs, None, &key).await
243 }
244 PubsubMessage::Message(m) => {
245 if let Err(why) = mem_pool.add(m) {
246 debug!("Received invalid GossipSub message: {}", why);
247 }
248 continue;
249 }
250 },
251 _ => continue,
252 }) else {
253 continue;
254 };
255 {
256 state_machine
257 .lock()
258 .update(SyncEvent::NewFullTipsets(vec![tipset]));
259 state_changed.notify_one();
260 }
261 }
262 }
263 });
264
265 set.spawn({
267 let state_changed = state_changed.clone();
268 let state_machine = state_machine.clone();
269
270 async move {
271 while let Ok(tipset) = tipset_receiver.recv_async().await {
272 state_machine
273 .lock()
274 .update(SyncEvent::NewFullTipsets(vec![tipset]));
275 state_changed.notify_one();
276 }
277 }
278 });
279
280 set.spawn({
282 let state_manager = state_manager.shallow_clone();
283 let state_machine = state_machine.shallow_clone();
284 let network = network.shallow_clone();
285 let sync_status = sync_status.shallow_clone();
286 let state_changed = state_changed.shallow_clone();
287 let tasks = tasks.shallow_clone();
288 let bad_block_cache = bad_block_cache.shallow_clone();
289 async move {
290 loop {
291 state_changed.notified().await;
292
293 let mut tasks_set = tasks.lock();
294 let (task_vec, current_active_forks) = state_machine.lock().tasks();
295
296 {
298 let old_status_report = sync_status.read().clone();
299 let new_status_report = old_status_report.update(
300 &state_manager,
301 current_active_forks,
302 stateless_mode,
303 );
304
305 sync_status.write().clone_from(&new_status_report);
306 }
307
308 for task in task_vec {
309 let new = tasks_set.insert(task.clone());
311 if new {
312 let action = task.clone().execute(
313 network.shallow_clone(),
314 state_manager.shallow_clone(),
315 stateless_mode,
316 bad_block_cache.shallow_clone(),
317 );
318 tokio::spawn({
319 let tasks = tasks.clone();
320 let state_machine = state_machine.clone();
321 let state_changed = state_changed.clone();
322 async move {
323 if let Some(event) = action.await {
324 state_machine.lock().update(event);
325 state_changed.notify_one();
326 }
327 tasks.lock().remove(&task);
328 }
329 });
330 }
331 }
332 }
333 }
334 });
335
336 set.spawn({
340 let state_manager = state_manager.clone();
341 let state_machine = state_machine.clone();
342 async move {
343 loop {
344 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
345 let (tasks_set, _) = state_machine.lock().tasks();
346 let heaviest_tipset = state_manager.chain_store().heaviest_tipset();
347 let heaviest_epoch = heaviest_tipset.epoch();
348
349 let to_download = tasks_set
350 .iter()
351 .filter_map(|task| match task {
352 SyncTask::FetchTipset(_, epoch) => Some(epoch - heaviest_epoch),
353 _ => None,
354 })
355 .max()
356 .unwrap_or(0);
357
358 let expected_head = calculate_expected_epoch(
359 Utc::now().timestamp() as u64,
360 state_manager.chain_store().genesis_block_header().timestamp,
361 state_manager.chain_config().block_delay_secs,
362 );
363
364 match (expected_head - heaviest_epoch > 10, to_download > 0) {
367 (true, true) => info!(
368 "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}, downloading {to_download} tipsets"
369 , heaviest_tipset.key()
370 ),
371 (true, false) => info!(
372 "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}"
373 , heaviest_tipset.key()
374 ),
375 (false, true) => {
376 info!("Downloading {to_download} tipsets")
377 }
378 (false, false) => {}
379 }
380 }
381 }
382 });
383
384 set.join_all().await;
385 Ok(())
386}
387
388fn inc_gossipsub_event_metrics(event: &NetworkEvent) {
390 let label = match event {
391 NetworkEvent::HelloRequestInbound => metrics::values::HELLO_REQUEST_INBOUND,
392 NetworkEvent::HelloResponseOutbound { .. } => metrics::values::HELLO_RESPONSE_OUTBOUND,
393 NetworkEvent::HelloRequestOutbound => metrics::values::HELLO_REQUEST_OUTBOUND,
394 NetworkEvent::HelloResponseInbound => metrics::values::HELLO_RESPONSE_INBOUND,
395 NetworkEvent::PeerConnected(_) => metrics::values::PEER_CONNECTED,
396 NetworkEvent::PeerDisconnected(_) => metrics::values::PEER_DISCONNECTED,
397 NetworkEvent::PubsubMessage { message } => match message {
398 PubsubMessage::Block(_) => metrics::values::PUBSUB_BLOCK,
399 PubsubMessage::Message(_) => metrics::values::PUBSUB_MESSAGE,
400 },
401 NetworkEvent::ChainExchangeRequestOutbound => {
402 metrics::values::CHAIN_EXCHANGE_REQUEST_OUTBOUND
403 }
404 NetworkEvent::ChainExchangeResponseInbound => {
405 metrics::values::CHAIN_EXCHANGE_RESPONSE_INBOUND
406 }
407 NetworkEvent::ChainExchangeRequestInbound => {
408 metrics::values::CHAIN_EXCHANGE_REQUEST_INBOUND
409 }
410 NetworkEvent::ChainExchangeResponseOutbound => {
411 metrics::values::CHAIN_EXCHANGE_RESPONSE_OUTBOUND
412 }
413 };
414
415 metrics::LIBP2P_MESSAGE_TOTAL.get_or_create(&label).inc();
416}
417
418fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
420 event: &NetworkEvent,
421 network: &SyncNetworkContext<DB>,
422 chain_store: Arc<ChainStore<DB>>,
423 genesis: &Tipset,
424) {
425 match event {
426 NetworkEvent::PeerConnected(peer_id) => {
427 let genesis_cid = *genesis.block_headers().first().cid();
428 tokio::task::spawn(handle_peer_connected_event(
430 network.shallow_clone(),
431 chain_store,
432 *peer_id,
433 genesis_cid,
434 ));
435 }
436 NetworkEvent::PeerDisconnected(peer_id) => {
437 handle_peer_disconnected_event(network, *peer_id);
438 }
439 _ => {}
440 }
441}
442
443async fn handle_peer_connected_event<DB: Blockstore + Sync + Send + 'static>(
444 network: SyncNetworkContext<DB>,
445 chain_store: Arc<ChainStore<DB>>,
446 peer_id: PeerId,
447 genesis_block_cid: Cid,
448) {
449 if network.peer_manager().is_peer_new(&peer_id) {
451 let heaviest = chain_store.heaviest_tipset();
454 let request = HelloRequest {
455 heaviest_tip_set: heaviest.cids(),
456 heaviest_tipset_height: heaviest.epoch(),
457 heaviest_tipset_weight: heaviest.weight().clone().into(),
458 genesis_cid: genesis_block_cid,
459 };
460 let (peer_id, moment_sent, response) = match network.hello_request(peer_id, request).await {
461 Ok(response) => response,
462 Err(e) => {
463 debug!("Hello request failed: {}", e);
464 return;
465 }
466 };
467 let dur = Instant::now().duration_since(moment_sent);
468
469 match response {
471 Some(_) => {
472 network.peer_manager().log_success(&peer_id, dur);
473 }
474 None => {
475 network.peer_manager().log_failure(&peer_id, dur);
476 }
477 }
478 }
479}
480
481fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
482 network: &SyncNetworkContext<DB>,
483 peer_id: PeerId,
484) {
485 network.peer_manager().remove_peer(&peer_id);
486 network.peer_manager().unmark_peer_bad(&peer_id);
487}
488
489pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
490 network: &SyncNetworkContext<DB>,
491 chain_store: &ChainStore<DB>,
492 peer_id: Option<PeerId>,
493 tipset_keys: &TipsetKey,
494) -> anyhow::Result<FullTipset> {
495 if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
497 return Ok(full_tipset);
498 }
499 let tipset = network
501 .chain_exchange_full_tipset(peer_id, tipset_keys)
502 .await
503 .map_err(|e| anyhow::anyhow!(e))?;
504 tipset.persist(chain_store.blockstore())?;
505
506 Ok(tipset)
507}
508
509async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
510 network: &SyncNetworkContext<DB>,
511 chain_store: &ChainStore<DB>,
512 peer_id: Option<PeerId>,
513 tipset_keys: &TipsetKey,
514) -> anyhow::Result<Vec<FullTipset>> {
515 if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
517 return Ok(vec![full_tipset]);
518 }
519 let tipsets = network
521 .chain_exchange_full_tipsets(peer_id, tipset_keys)
522 .await
523 .map_err(|e| anyhow::anyhow!(e))?;
524
525 for tipset in tipsets.iter() {
526 tipset.persist(chain_store.blockstore())?;
527 }
528
529 Ok(tipsets)
530}
531
532pub fn load_full_tipset<DB: Blockstore>(
533 chain_store: &ChainStore<DB>,
534 tipset_keys: &TipsetKey,
535) -> anyhow::Result<FullTipset> {
536 let ts = chain_store
538 .chain_index()
539 .load_required_tipset(tipset_keys)?;
540 let blocks: Vec<_> = ts
541 .block_headers()
542 .iter()
543 .map(|header| -> anyhow::Result<Block> {
544 let (bls_msgs, secp_msgs) =
545 crate::chain::block_messages(chain_store.blockstore(), header)?;
546 Ok(Block {
547 header: header.clone(),
548 bls_messages: bls_msgs,
549 secp_messages: secp_msgs,
550 })
551 })
552 .try_collect()?;
553 let fts = FullTipset::new(blocks)?;
555 Ok(fts)
556}
557
558enum SyncEvent {
559 NewFullTipsets(Vec<FullTipset>),
560 BadTipset(FullTipset),
561 ValidatedTipset {
562 tipset: FullTipset,
563 is_proposed_head: bool,
564 },
565}
566
567impl std::fmt::Display for SyncEvent {
568 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569 fn tss_to_string(tss: &[FullTipset]) -> String {
570 format!(
571 "epoch: {}-{}",
572 tss.first().map(|ts| ts.epoch()).unwrap_or_default(),
573 tss.last().map(|ts| ts.epoch()).unwrap_or_default()
574 )
575 }
576
577 match self {
578 Self::NewFullTipsets(tss) => write!(f, "NewFullTipsets({})", tss_to_string(tss)),
579 Self::BadTipset(ts) => {
580 write!(f, "BadTipset(epoch: {}, key: {})", ts.epoch(), ts.key())
581 }
582 Self::ValidatedTipset {
583 tipset,
584 is_proposed_head,
585 } => write!(
586 f,
587 "ValidatedTipset(epoch: {}, key: {}, is_proposed_head: {is_proposed_head})",
588 tipset.epoch(),
589 tipset.key()
590 ),
591 }
592 }
593}
594
595struct SyncStateMachine<DB> {
596 cs: Arc<ChainStore<DB>>,
597 bad_block_cache: Option<Arc<BadBlockCache>>,
598 tipsets: HashMap<TipsetKey, FullTipset>,
600 stateless_mode: bool,
601}
602
603impl<DB: Blockstore> SyncStateMachine<DB> {
604 pub fn new(
605 cs: Arc<ChainStore<DB>>,
606 bad_block_cache: Option<Arc<BadBlockCache>>,
607 stateless_mode: bool,
608 ) -> Self {
609 Self {
610 cs,
611 bad_block_cache,
612 tipsets: HashMap::default(),
613 stateless_mode,
614 }
615 }
616
617 fn chains(&self) -> Vec<Vec<FullTipset>> {
619 let mut chains = Vec::new();
620 let mut remaining_tipsets = self.tipsets.clone();
621
622 while let Some(heaviest) = remaining_tipsets
623 .values()
624 .max_by_key(|ts| ts.weight())
625 .cloned()
626 {
627 let mut chain = Vec::new();
629 let mut current = Some(heaviest);
630
631 while let Some(tipset) = current.take() {
632 remaining_tipsets.remove(tipset.key());
633
634 current = self.tipsets.get(tipset.parents()).cloned();
636
637 chain.push(tipset);
638 }
639 chain.reverse();
640 chains.push(chain);
641 }
642
643 chains
644 }
645
646 fn is_parent_validated(&self, tipset: &FullTipset) -> bool {
647 let db = self.cs.blockstore();
648 self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
649 }
650
651 fn is_ready_for_validation(&self, tipset: &FullTipset) -> bool {
652 if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
653 true
655 } else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
656 let head_ts = self.cs.heaviest_tipset();
657 if parent_ts.key() == head_ts.key() {
663 true
664 } else if parent_ts.epoch() >= head_ts.epoch() {
665 false
666 } else {
667 self.is_parent_validated(tipset)
668 }
669 } else {
670 false
671 }
672 }
673
674 fn add_full_tipset(&mut self, tipset: FullTipset) {
675 if let Err(why) = TipsetValidator(&tipset).validate(
676 &self.cs,
677 self.bad_block_cache.as_ref().map(AsRef::as_ref),
678 &self.cs.genesis_tipset(),
679 self.cs.chain_config().block_delay_secs,
680 ) {
681 metrics::INVALID_TIPSET_TOTAL.inc();
682 trace!("Skipping invalid tipset: {}", why);
683 self.mark_bad_tipset(tipset);
684 return;
685 }
686
687 let heaviest = self.cs.heaviest_tipset();
689 let epoch_diff = heaviest.epoch() - tipset.epoch();
690
691 if epoch_diff > self.cs.chain_config().policy.chain_finality {
692 self.mark_bad_tipset(tipset);
693 return;
694 }
695
696 if self.tipsets.contains_key(tipset.key()) {
698 return;
699 }
700
701 let mut to_remove = Vec::new();
703 #[allow(clippy::mutable_key_type)]
704 let mut merged_blocks: HashSet<_> = tipset.blocks().iter().cloned().collect();
705
706 let parent_refs: HashSet<_> = self
708 .tipsets
709 .values()
710 .map(|ts| ts.parents().clone())
711 .collect();
712
713 for (key, existing_ts) in self.tipsets.iter() {
714 if existing_ts.epoch() == tipset.epoch() && existing_ts.parents() == tipset.parents() {
715 if !parent_refs.contains(key) {
717 to_remove.push(key.clone());
718 }
719 merged_blocks.extend(existing_ts.blocks().iter().cloned());
721 }
722 }
723
724 for key in to_remove {
726 self.tipsets.remove(&key);
727 }
728
729 if let Ok(merged_tipset) = FullTipset::new(merged_blocks) {
731 self.tipsets
732 .insert(merged_tipset.key().clone(), merged_tipset);
733 }
734 }
735
736 fn mark_bad_tipset(&mut self, tipset: FullTipset) {
740 let mut stack = vec![tipset];
741 while let Some(tipset) = stack.pop() {
742 self.tipsets.remove(tipset.key());
743 let mut to_remove = Vec::new();
745 let mut descendants = Vec::new();
746
747 for (key, ts) in self.tipsets.iter() {
748 if ts.parents() == tipset.key() {
749 to_remove.push(key.clone());
750 descendants.push(ts.clone());
751 }
752 }
753
754 for key in to_remove {
756 self.tipsets.remove(&key);
757 }
758
759 stack.extend(descendants);
761 }
762 }
763
764 fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) {
765 if !self.is_parent_validated(&tipset) {
766 tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), parent_state = %tipset.parent_state(), "Parent tipset must be validated");
767 return;
768 }
769
770 self.tipsets.remove(tipset.key());
771 let tipset = tipset.into_tipset();
772 if self.stateless_mode {
774 let epoch = tipset.epoch();
775 let terse_key = tipset.key().terse();
776 if self.cs.heaviest_tipset().weight() < tipset.weight() {
777 if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
778 error!("Error setting heaviest tipset: {}", e);
779 } else {
780 info!("Heaviest tipset: {} ({})", epoch, terse_key);
781 }
782 }
783 } else if is_proposed_head {
784 if let Err(e) = self.cs.put_tipset(&tipset) {
785 error!("Error putting tipset: {e}");
786 }
787 } else if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
788 error!("Error setting heaviest tipset: {e}");
789 }
790 }
791
792 pub fn update(&mut self, event: SyncEvent) {
793 tracing::trace!("update: {event}");
794 match event {
795 SyncEvent::NewFullTipsets(tipsets) => {
796 for tipset in tipsets {
797 self.add_full_tipset(tipset);
798 }
799 }
800 SyncEvent::BadTipset(tipset) => self.mark_bad_tipset(tipset),
801 SyncEvent::ValidatedTipset {
802 tipset,
803 is_proposed_head,
804 } => self.mark_validated_tipset(tipset, is_proposed_head),
805 }
806 }
807
808 pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
809 let current_validated_epoch = self.cs.heaviest_tipset().epoch();
811 let now = Utc::now();
812
813 let mut active_sync_info = Vec::new();
814 let mut tasks = Vec::new();
815 for chain in self.chains() {
816 if let Some(first_ts) = chain.first() {
817 let last_ts = chain.last().expect("Infallible");
818 let stage: ForkSyncStage;
819 let start_time = Some(now);
820
821 if !self.is_ready_for_validation(first_ts) {
822 stage = ForkSyncStage::FetchingHeaders;
823 tasks.push(SyncTask::FetchTipset(
824 first_ts.parents().clone(),
825 first_ts.epoch(),
826 ));
827 } else {
828 stage = ForkSyncStage::ValidatingTipsets;
829 tasks.push(SyncTask::ValidateTipset {
830 tipset: first_ts.clone(),
831 is_proposed_head: chain.len() == 1,
832 });
833 }
834
835 let fork_info = ForkSyncInfo {
836 target_tipset_key: last_ts.key().clone(),
837 target_epoch: last_ts.epoch(),
838 target_sync_epoch_start: first_ts.epoch(),
839 stage,
840 validated_chain_head_epoch: current_validated_epoch,
841 start_time,
842 last_updated: Some(now),
843 };
844
845 active_sync_info.push(fork_info);
846 }
847 }
848 (tasks, active_sync_info)
849 }
850}
851
852#[derive(PartialEq, Eq, Hash, Clone, Debug)]
853enum SyncTask {
854 ValidateTipset {
855 tipset: FullTipset,
856 is_proposed_head: bool,
857 },
858 FetchTipset(TipsetKey, ChainEpoch),
859}
860
861impl std::fmt::Display for SyncTask {
862 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
863 match self {
864 SyncTask::ValidateTipset {
865 tipset,
866 is_proposed_head,
867 } => write!(
868 f,
869 "ValidateTipset(epoch: {}, is_proposed_head: {is_proposed_head})",
870 tipset.epoch()
871 ),
872 SyncTask::FetchTipset(key, epoch) => {
873 let s = key.to_string();
874 write!(
875 f,
876 "FetchTipset({}, epoch: {})",
877 &s[s.len().saturating_sub(8)..],
878 epoch
879 )
880 }
881 }
882 }
883}
884
885impl SyncTask {
886 async fn execute<DB: Blockstore + Sync + Send + 'static>(
887 self,
888 network: SyncNetworkContext<DB>,
889 state_manager: Arc<StateManager<DB>>,
890 stateless_mode: bool,
891 bad_block_cache: Option<Arc<BadBlockCache>>,
892 ) -> Option<SyncEvent> {
893 tracing::trace!("SyncTask::execute {self}");
894 match self {
895 SyncTask::ValidateTipset {
896 tipset,
897 is_proposed_head,
898 } if stateless_mode => Some(SyncEvent::ValidatedTipset {
899 tipset,
900 is_proposed_head,
901 }),
902 SyncTask::ValidateTipset {
903 tipset,
904 is_proposed_head,
905 } => match validate_tipset(&state_manager, tipset.clone(), bad_block_cache).await {
906 Ok(()) => Some(SyncEvent::ValidatedTipset {
907 tipset,
908 is_proposed_head,
909 }),
910 Err(e) if matches!(e, TipsetSyncerError::TimeTravellingBlock { .. }) => {
915 warn!("Time travelling block detected, skipping tipset for now: {e}");
916 None
917 }
918 Err(e) => {
919 warn!("Error validating tipset: {e}");
920 Some(SyncEvent::BadTipset(tipset))
921 }
922 },
923 SyncTask::FetchTipset(key, epoch) => {
924 match get_full_tipset_batch(&network, state_manager.chain_store(), None, &key).await
925 {
926 Ok(parents) => Some(SyncEvent::NewFullTipsets(parents)),
927 Err(e) => {
928 tracing::warn!(%key, %epoch, "failed to fetch tipset: {e:#}");
929 None
930 }
931 }
932 }
933 }
934 }
935}
936
937#[cfg(test)]
938mod tests {
939 use super::*;
940 use crate::blocks::{Chain4U, HeaderBuilder, chain4u};
941 use crate::db::MemoryDB;
942 use crate::utils::db::CborStoreExt as _;
943 use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
944 use num_bigint::BigInt;
945 use num_traits::ToPrimitive;
946 use std::sync::Arc;
947 use tracing::level_filters::LevelFilter;
948 use tracing_subscriber::EnvFilter;
949
950 fn setup() -> (Arc<ChainStore<MemoryDB>>, Chain4U<Arc<MemoryDB>>) {
951 let _ = tracing_subscriber::fmt()
953 .without_time()
954 .with_env_filter(
955 EnvFilter::builder()
956 .with_default_directive(LevelFilter::DEBUG.into())
957 .from_env()
958 .unwrap(),
959 )
960 .try_init();
961
962 let db = Arc::new(MemoryDB::default());
963
964 {
966 let empty_amt = Amt::<Cid, _>::new(&db).flush().unwrap();
967 db.put_cbor_default(&crate::blocks::TxMeta {
968 bls_message_root: empty_amt,
969 secp_message_root: empty_amt,
970 })
971 .unwrap();
972 }
973
974 let c4u = Chain4U::with_blockstore(db.clone());
976 chain4u! {
977 in c4u;
978 [genesis_header = dummy_node(&db, 0)]
979 };
980
981 let cs = Arc::new(
982 ChainStore::new(
983 db.clone(),
984 db.clone(),
985 db.clone(),
986 Default::default(),
987 genesis_header.clone().into(),
988 )
989 .unwrap(),
990 );
991
992 cs.set_heaviest_tipset(cs.genesis_tipset()).unwrap();
993
994 (cs, c4u)
995 }
996
997 fn dummy_state(db: impl Blockstore, i: ChainEpoch) -> Cid {
998 db.put_cbor_default(&i).unwrap()
999 }
1000
1001 fn dummy_node(db: impl Blockstore, i: ChainEpoch) -> HeaderBuilder {
1002 HeaderBuilder {
1003 state_root: dummy_state(db, i).into(),
1004 weight: BigInt::from(i).into(),
1005 epoch: i.into(),
1006 ..Default::default()
1007 }
1008 }
1009
1010 #[test]
1011 fn test_state_machine_validation_order() {
1012 let (cs, c4u) = setup();
1013 let db = cs.blockstore().clone();
1014
1015 chain4u! {
1016 from [genesis_header] in c4u;
1017 [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)] -> [c = dummy_node(&db, 3)] -> [d = dummy_node(&db, 4)] -> [e = dummy_node(&db, 5)]
1018 };
1019
1020 let mut state_machine = SyncStateMachine::new(cs, Default::default(), true);
1022
1023 let tipsets = vec![e, b, d, c, a];
1025
1026 for block in tipsets {
1028 let full_tipset = FullTipset::new(vec![Block {
1029 header: block.clone().into(),
1030 bls_messages: vec![],
1031 secp_messages: vec![],
1032 }])
1033 .unwrap();
1034 state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1035 }
1036
1037 let mut validation_tasks = Vec::new();
1039 loop {
1040 let (tasks, _) = state_machine.tasks();
1041
1042 let validation_tipsets: Vec<_> = tasks
1044 .into_iter()
1045 .filter_map(|task| {
1046 if let SyncTask::ValidateTipset {
1047 tipset,
1048 is_proposed_head,
1049 } = task
1050 {
1051 Some((tipset, is_proposed_head))
1052 } else {
1053 None
1054 }
1055 })
1056 .collect();
1057
1058 if validation_tipsets.is_empty() {
1059 break;
1060 }
1061
1062 for (ts, is_proposed_head) in validation_tipsets {
1064 validation_tasks.push(ts.epoch());
1065 db.put_cbor_default(&ts.epoch()).unwrap();
1066 state_machine.mark_validated_tipset(ts, is_proposed_head);
1067 }
1068 }
1069
1070 assert_eq!(validation_tasks, vec![1, 2, 3, 4, 5]);
1072 }
1073
1074 #[test]
1075 fn test_sync_state_machine_chain_fragments() {
1076 let (cs, c4u) = setup();
1077 let db = cs.blockstore().clone();
1078
1079 chain4u! {
1083 in c4u;
1084 [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)]
1085 };
1086 chain4u! {
1087 from [a] in c4u;
1088 [c = dummy_node(&db, 3)]
1089 };
1090
1091 let mut state_machine = SyncStateMachine::new(cs, Default::default(), false);
1093
1094 for block in [a, b, c] {
1096 let full_tipset = FullTipset::new(vec![Block {
1097 header: block.clone().into(),
1098 bls_messages: vec![],
1099 secp_messages: vec![],
1100 }])
1101 .unwrap();
1102 state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1103 }
1104
1105 let chains = state_machine
1106 .chains()
1107 .into_iter()
1108 .map(|v| {
1109 v.into_iter()
1110 .map(|ts| ts.weight().to_i64().unwrap_or(0))
1111 .collect_vec()
1112 })
1113 .collect_vec();
1114
1115 assert_eq!(chains, vec![vec![1, 3], vec![1, 2]]);
1117 }
1118}