1use super::network_context::SyncNetworkContext;
20use crate::{
21 blocks::{Block, FullTipset, Tipset, TipsetKey},
22 chain::{ChainStore, index::ResolveNullTipset},
23 chain_sync::{
24 ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
25 bad_block_cache::{BadBlockCache, SeenBlockCache},
26 metrics,
27 tipset_syncer::{TipsetSyncerError, validate_tipset},
28 validation::GossipBlockValidator,
29 },
30 libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
31 message_pool::MessagePool,
32 networks::calculate_expected_epoch,
33 shim::clock::ChainEpoch,
34 state_manager::StateManager,
35 utils::ShallowClone as _,
36};
37use ahash::{HashMap, HashSet};
38use chrono::Utc;
39use cid::Cid;
40use fvm_ipld_blockstore::Blockstore;
41use itertools::Itertools;
42use libp2p::PeerId;
43use parking_lot::{Mutex, RwLock};
44use std::{sync::Arc, time::Instant};
45use tokio::{sync::Notify, task::JoinSet};
46use tracing::{debug, error, info, trace, warn};
47
48pub struct ChainFollower<DB> {
49 tasks: Arc<Mutex<HashSet<SyncTask>>>,
51
52 state_machine: Arc<Mutex<SyncStateMachine<DB>>>,
54
55 pub sync_status: SyncStatus,
57
58 pub state_manager: Arc<StateManager<DB>>,
60
61 pub network: SyncNetworkContext<DB>,
63
64 genesis: Tipset,
66
67 pub bad_blocks: Option<Arc<BadBlockCache>>,
71
72 net_handler: flume::Receiver<NetworkEvent>,
74
75 pub tipset_sender: flume::Sender<FullTipset>,
77
78 tipset_receiver: flume::Receiver<FullTipset>,
80
81 stateless_mode: bool,
86
87 mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
89}
90
91impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
92 pub fn new(
93 state_manager: Arc<StateManager<DB>>,
94 network: SyncNetworkContext<DB>,
95 genesis: Tipset,
96 net_handler: flume::Receiver<NetworkEvent>,
97 stateless_mode: bool,
98 mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
99 ) -> Self {
100 crate::def_is_env_truthy!(cache_disabled, "FOREST_DISABLE_BAD_BLOCK_CACHE");
101 let (tipset_sender, tipset_receiver) = flume::bounded(20);
102 let tasks: Arc<Mutex<HashSet<SyncTask>>> = Arc::new(Mutex::new(HashSet::default()));
103 let bad_blocks = if cache_disabled() {
104 tracing::warn!("bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`");
105 None
106 } else {
107 Some(Default::default())
108 };
109 let state_machine = Arc::new(Mutex::new(SyncStateMachine::new(
110 state_manager.chain_store().clone(),
111 bad_blocks.clone(),
112 stateless_mode,
113 )));
114 Self {
115 tasks,
116 state_machine,
117 sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
118 state_manager,
119 network,
120 genesis,
121 bad_blocks,
122 net_handler,
123 tipset_sender,
124 tipset_receiver,
125 stateless_mode,
126 mem_pool,
127 }
128 }
129
130 pub fn reset(&self) {
132 let start = Instant::now();
133 self.tasks.lock().clear();
134 self.state_manager
135 .chain_store()
136 .validated_blocks
137 .lock()
138 .clear();
139 self.state_machine.lock().tipsets.clear();
140 if let Some(bad_blocks) = &self.bad_blocks {
141 bad_blocks.clear();
142 }
143 tracing::info!(
144 "chain follower reset, took {}",
145 humantime::format_duration(start.elapsed())
146 );
147 }
148
149 pub async fn run(&self) -> anyhow::Result<()> {
150 chain_follower(
151 &self.tasks,
152 &self.state_machine,
153 &self.state_manager,
154 self.bad_blocks.clone(),
155 self.net_handler.clone(),
156 self.tipset_receiver.clone(),
157 &self.network,
158 &self.mem_pool,
159 &self.sync_status,
160 &self.genesis,
161 self.stateless_mode,
162 )
163 .await
164 }
165}
166
167#[allow(clippy::too_many_arguments)]
168async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
170 tasks: &Arc<Mutex<HashSet<SyncTask>>>,
171 state_machine: &Arc<Mutex<SyncStateMachine<DB>>>,
172 state_manager: &Arc<StateManager<DB>>,
173 bad_block_cache: Option<Arc<BadBlockCache>>,
174 network_rx: flume::Receiver<NetworkEvent>,
175 tipset_receiver: flume::Receiver<FullTipset>,
176 network: &SyncNetworkContext<DB>,
177 mem_pool: &Arc<MessagePool<Arc<ChainStore<DB>>>>,
178 sync_status: &SyncStatus,
179 genesis: &Tipset,
180 stateless_mode: bool,
181) -> anyhow::Result<()> {
182 let state_changed = Arc::new(Notify::new());
183
184 let seen_block_cache = SeenBlockCache::default();
185
186 let mut set = JoinSet::new();
187
188 set.spawn({
190 let state_manager = state_manager.shallow_clone();
191 let state_changed = state_changed.shallow_clone();
192 let state_machine = state_machine.shallow_clone();
193 let network = network.shallow_clone();
194 let mem_pool = mem_pool.shallow_clone();
195 let genesis = genesis.shallow_clone();
196 let bad_block_cache = bad_block_cache.shallow_clone();
197 let seen_block_cache = seen_block_cache.shallow_clone();
198 async move {
199 while let Ok(event) = network_rx.recv_async().await {
200 inc_gossipsub_event_metrics(&event);
201
202 update_peer_info(
203 &event,
204 &network,
205 state_manager.chain_store().clone(),
206 &genesis,
207 );
208
209 let Ok(tipset) = (match event {
210 NetworkEvent::HelloResponseOutbound { request, source } => {
211 let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
212 get_full_tipset(
213 &network,
214 state_manager.chain_store(),
215 Some(source),
216 &tipset_keys,
217 )
218 .await
219 .inspect_err(|e| debug!("Querying full tipset failed: {}", e))
220 }
221 NetworkEvent::PubsubMessage { message } => match message {
222 PubsubMessage::Block(b) => {
223 let cs = state_manager.chain_store();
224 let cfg = cs.chain_config();
225 if let Err(reason) = GossipBlockValidator::new(&b).validate_pre_fetch(
226 &genesis,
227 cfg.block_delay_secs,
228 cfg.policy.chain_finality,
229 cs.heaviest_tipset().epoch(),
230 bad_block_cache.as_deref(),
231 &seen_block_cache,
232 ) {
233 metrics::GOSSIP_BLOCK_REJECTED_TOTAL
234 .get_or_create(&metrics::GossipRejectReasonLabel {
235 reason: reason.label(),
236 })
237 .inc();
238 debug!("Rejected gossip block {}: {reason}", b.header.cid());
239 continue;
240 }
241 let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
242 get_full_tipset(&network, cs, None, &key).await
243 }
244 PubsubMessage::Message(m) => {
245 if let Err(why) = mem_pool.add(m) {
246 debug!("Received invalid GossipSub message: {}", why);
247 }
248 continue;
249 }
250 },
251 _ => continue,
252 }) else {
253 continue;
254 };
255 {
256 state_machine
257 .lock()
258 .update(SyncEvent::NewFullTipsets(vec![tipset]));
259 state_changed.notify_one();
260 }
261 }
262 }
263 });
264
265 set.spawn({
267 let state_changed = state_changed.clone();
268 let state_machine = state_machine.clone();
269
270 async move {
271 while let Ok(tipset) = tipset_receiver.recv_async().await {
272 state_machine
273 .lock()
274 .update(SyncEvent::NewFullTipsets(vec![tipset]));
275 state_changed.notify_one();
276 }
277 }
278 });
279
280 set.spawn({
282 let state_manager = state_manager.shallow_clone();
283 let state_machine = state_machine.shallow_clone();
284 let network = network.shallow_clone();
285 let sync_status = sync_status.shallow_clone();
286 let state_changed = state_changed.shallow_clone();
287 let tasks = tasks.shallow_clone();
288 let bad_block_cache = bad_block_cache.shallow_clone();
289 async move {
290 loop {
291 state_changed.notified().await;
292
293 let mut tasks_set = tasks.lock();
294 let (task_vec, current_active_forks) = state_machine.lock().tasks();
295
296 {
298 let old_status_report = sync_status.read().clone();
299 let new_status_report = old_status_report.update(
300 &state_manager,
301 current_active_forks,
302 stateless_mode,
303 );
304
305 sync_status.write().clone_from(&new_status_report);
306 }
307
308 for task in task_vec {
309 let new = tasks_set.insert(task.clone());
311 if new {
312 let action = task.clone().execute(
313 network.shallow_clone(),
314 state_manager.shallow_clone(),
315 stateless_mode,
316 bad_block_cache.shallow_clone(),
317 );
318 tokio::spawn({
319 let tasks = tasks.clone();
320 let state_machine = state_machine.clone();
321 let state_changed = state_changed.clone();
322 async move {
323 if let Some(event) = action.await {
324 state_machine.lock().update(event);
325 state_changed.notify_one();
326 }
327 tasks.lock().remove(&task);
328 }
329 });
330 }
331 }
332 }
333 }
334 });
335
336 set.spawn({
340 let state_manager = state_manager.clone();
341 let state_machine = state_machine.clone();
342 async move {
343 loop {
344 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
345 let (tasks_set, _) = state_machine.lock().tasks();
346 let heaviest_tipset = state_manager.chain_store().heaviest_tipset();
347 let heaviest_epoch = heaviest_tipset.epoch();
348
349 let to_download = tasks_set
350 .iter()
351 .filter_map(|task| match task {
352 SyncTask::FetchTipset(_, epoch) => Some(epoch - heaviest_epoch),
353 _ => None,
354 })
355 .max()
356 .unwrap_or(0);
357
358 let expected_head = calculate_expected_epoch(
359 Utc::now().timestamp() as u64,
360 state_manager.chain_store().genesis_block_header().timestamp,
361 state_manager.chain_config().block_delay_secs,
362 );
363
364 match (expected_head - heaviest_epoch > 10, to_download > 0) {
367 (true, true) => info!(
368 "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}, downloading {to_download} tipsets"
369 , heaviest_tipset.key()
370 ),
371 (true, false) => info!(
372 "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}"
373 , heaviest_tipset.key()
374 ),
375 (false, true) => {
376 info!("Downloading {to_download} tipsets")
377 }
378 (false, false) => {}
379 }
380 }
381 }
382 });
383
384 set.join_all().await;
385 Ok(())
386}
387
388fn inc_gossipsub_event_metrics(event: &NetworkEvent) {
390 let label = match event {
391 NetworkEvent::HelloRequestInbound => metrics::values::HELLO_REQUEST_INBOUND,
392 NetworkEvent::HelloResponseOutbound { .. } => metrics::values::HELLO_RESPONSE_OUTBOUND,
393 NetworkEvent::HelloRequestOutbound => metrics::values::HELLO_REQUEST_OUTBOUND,
394 NetworkEvent::HelloResponseInbound => metrics::values::HELLO_RESPONSE_INBOUND,
395 NetworkEvent::PeerConnected(_) => metrics::values::PEER_CONNECTED,
396 NetworkEvent::PeerDisconnected(_) => metrics::values::PEER_DISCONNECTED,
397 NetworkEvent::PubsubMessage { message } => match message {
398 PubsubMessage::Block(_) => metrics::values::PUBSUB_BLOCK,
399 PubsubMessage::Message(_) => metrics::values::PUBSUB_MESSAGE,
400 },
401 NetworkEvent::ChainExchangeRequestOutbound => {
402 metrics::values::CHAIN_EXCHANGE_REQUEST_OUTBOUND
403 }
404 NetworkEvent::ChainExchangeResponseInbound => {
405 metrics::values::CHAIN_EXCHANGE_RESPONSE_INBOUND
406 }
407 NetworkEvent::ChainExchangeRequestInbound => {
408 metrics::values::CHAIN_EXCHANGE_REQUEST_INBOUND
409 }
410 NetworkEvent::ChainExchangeResponseOutbound => {
411 metrics::values::CHAIN_EXCHANGE_RESPONSE_OUTBOUND
412 }
413 };
414
415 metrics::LIBP2P_MESSAGE_TOTAL.get_or_create(&label).inc();
416}
417
418fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
420 event: &NetworkEvent,
421 network: &SyncNetworkContext<DB>,
422 chain_store: Arc<ChainStore<DB>>,
423 genesis: &Tipset,
424) {
425 match event {
426 NetworkEvent::PeerConnected(peer_id) => {
427 let genesis_cid = *genesis.block_headers().first().cid();
428 tokio::task::spawn(handle_peer_connected_event(
430 network.shallow_clone(),
431 chain_store,
432 *peer_id,
433 genesis_cid,
434 ));
435 }
436 NetworkEvent::PeerDisconnected(peer_id) => {
437 handle_peer_disconnected_event(network, *peer_id);
438 }
439 _ => {}
440 }
441}
442
443async fn handle_peer_connected_event<DB: Blockstore + Sync + Send + 'static>(
444 network: SyncNetworkContext<DB>,
445 chain_store: Arc<ChainStore<DB>>,
446 peer_id: PeerId,
447 genesis_block_cid: Cid,
448) {
449 if network.peer_manager().is_peer_new(&peer_id) {
451 let heaviest = chain_store.heaviest_tipset();
454 let request = HelloRequest {
455 heaviest_tip_set: heaviest.cids(),
456 heaviest_tipset_height: heaviest.epoch(),
457 heaviest_tipset_weight: heaviest.weight().clone().into(),
458 genesis_cid: genesis_block_cid,
459 };
460 let (peer_id, moment_sent, response) = match network.hello_request(peer_id, request).await {
461 Ok(response) => response,
462 Err(e) => {
463 debug!("Hello request failed: {}", e);
464 return;
465 }
466 };
467 let dur = Instant::now().duration_since(moment_sent);
468
469 match response {
471 Some(_) => {
472 network.peer_manager().log_success(&peer_id, dur);
473 }
474 None => {
475 network.peer_manager().log_failure(&peer_id, dur);
476 }
477 }
478 }
479}
480
481fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
482 network: &SyncNetworkContext<DB>,
483 peer_id: PeerId,
484) {
485 network.peer_manager().remove_peer(&peer_id);
486 network.peer_manager().unmark_peer_bad(&peer_id);
487}
488
489pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
490 network: &SyncNetworkContext<DB>,
491 chain_store: &ChainStore<DB>,
492 peer_id: Option<PeerId>,
493 tipset_keys: &TipsetKey,
494) -> anyhow::Result<FullTipset> {
495 if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
497 return Ok(full_tipset);
498 }
499 let tipset = network
501 .chain_exchange_full_tipset(peer_id, tipset_keys)
502 .await
503 .map_err(|e| anyhow::anyhow!(e))?;
504 tipset.persist(chain_store.blockstore())?;
505
506 Ok(tipset)
507}
508
509async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
510 network: &SyncNetworkContext<DB>,
511 chain_store: &ChainStore<DB>,
512 peer_id: Option<PeerId>,
513 tipset_keys: &TipsetKey,
514) -> anyhow::Result<Vec<FullTipset>> {
515 if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
517 return Ok(vec![full_tipset]);
518 }
519 let tipsets = network
521 .chain_exchange_full_tipsets(peer_id, tipset_keys)
522 .await
523 .map_err(|e| anyhow::anyhow!(e))?;
524
525 for tipset in tipsets.iter() {
526 tipset.persist(chain_store.blockstore())?;
527 }
528
529 Ok(tipsets)
530}
531
532pub fn load_full_tipset<DB: Blockstore>(
533 chain_store: &ChainStore<DB>,
534 tipset_keys: &TipsetKey,
535) -> anyhow::Result<FullTipset> {
536 let ts = chain_store
538 .chain_index()
539 .load_required_tipset(tipset_keys)?;
540 let blocks: Vec<_> = ts
541 .block_headers()
542 .iter()
543 .map(|header| -> anyhow::Result<Block> {
544 let (bls_msgs, secp_msgs) =
545 crate::chain::block_messages(chain_store.blockstore(), header)?;
546 Ok(Block {
547 header: header.clone(),
548 bls_messages: bls_msgs,
549 secp_messages: secp_msgs,
550 })
551 })
552 .try_collect()?;
553 let fts = FullTipset::new(blocks)?;
555 Ok(fts)
556}
557
558enum SyncEvent {
559 NewFullTipsets(Vec<FullTipset>),
560 BadTipset(FullTipset),
561 ValidatedTipset {
562 tipset: FullTipset,
563 is_proposed_head: bool,
564 },
565}
566
567impl std::fmt::Display for SyncEvent {
568 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569 fn tss_to_string(tss: &[FullTipset]) -> String {
570 format!(
571 "epoch: {}-{}",
572 tss.first().map(|ts| ts.epoch()).unwrap_or_default(),
573 tss.last().map(|ts| ts.epoch()).unwrap_or_default()
574 )
575 }
576
577 match self {
578 Self::NewFullTipsets(tss) => write!(f, "NewFullTipsets({})", tss_to_string(tss)),
579 Self::BadTipset(ts) => {
580 write!(f, "BadTipset(epoch: {}, key: {})", ts.epoch(), ts.key())
581 }
582 Self::ValidatedTipset {
583 tipset,
584 is_proposed_head,
585 } => write!(
586 f,
587 "ValidatedTipset(epoch: {}, key: {}, is_proposed_head: {is_proposed_head})",
588 tipset.epoch(),
589 tipset.key()
590 ),
591 }
592 }
593}
594
595struct SyncStateMachine<DB> {
596 cs: Arc<ChainStore<DB>>,
597 bad_block_cache: Option<Arc<BadBlockCache>>,
598 tipsets: HashMap<TipsetKey, FullTipset>,
600 stateless_mode: bool,
601}
602
603impl<DB: Blockstore> SyncStateMachine<DB> {
604 pub fn new(
605 cs: Arc<ChainStore<DB>>,
606 bad_block_cache: Option<Arc<BadBlockCache>>,
607 stateless_mode: bool,
608 ) -> Self {
609 Self {
610 cs,
611 bad_block_cache,
612 tipsets: HashMap::default(),
613 stateless_mode,
614 }
615 }
616
617 fn chains(&self) -> Vec<Vec<FullTipset>> {
619 let mut chains = Vec::new();
620 let mut remaining_tipsets = self.tipsets.clone();
621
622 while let Some(heaviest) = remaining_tipsets
623 .values()
624 .max_by_key(|ts| ts.weight())
625 .cloned()
626 {
627 let mut chain = Vec::new();
629 let mut current = Some(heaviest);
630
631 while let Some(tipset) = current.take() {
632 remaining_tipsets.remove(tipset.key());
633
634 current = self.tipsets.get(tipset.parents()).cloned();
636
637 chain.push(tipset);
638 }
639 chain.reverse();
640 chains.push(chain);
641 }
642
643 chains
644 }
645
646 fn is_parent_validated(&self, tipset: &FullTipset) -> bool {
647 let db = self.cs.blockstore();
648 self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
649 }
650
651 fn is_ready_for_validation(&self, tipset: &FullTipset) -> bool {
652 if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
653 true
655 } else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
656 let head_ts = self.cs.heaviest_tipset();
657 if parent_ts.key() == head_ts.key() {
663 true
664 } else if parent_ts.epoch() >= head_ts.epoch() {
665 false
666 } else {
667 self.is_parent_validated(tipset)
668 }
669 } else {
670 false
671 }
672 }
673
674 fn add_full_tipset(&mut self, tipset: FullTipset) {
675 if let Err(why) = TipsetValidator(&tipset).validate(
676 &self.cs,
677 self.bad_block_cache.as_ref().map(AsRef::as_ref),
678 &self.cs.genesis_tipset(),
679 self.cs.chain_config().block_delay_secs,
680 ) {
681 metrics::INVALID_TIPSET_TOTAL.inc();
682 trace!("Skipping invalid tipset: {}", why);
683 self.mark_bad_tipset(tipset);
684 return;
685 }
686
687 let heaviest = self.cs.heaviest_tipset();
689 let epoch_diff = heaviest.epoch() - tipset.epoch();
690
691 if epoch_diff > self.cs.chain_config().policy.chain_finality {
692 self.mark_bad_tipset(tipset);
693 return;
694 }
695
696 if self.tipsets.contains_key(tipset.key()) {
698 return;
699 }
700
701 if let Ok(Some(ts)) = self.cs.chain_index().tipset_by_height(
703 tipset.epoch(),
704 self.cs.heaviest_tipset(),
705 ResolveNullTipset::TakeOlder,
706 ) && ts.key() == tipset.key()
707 {
708 return;
709 }
710
711 let mut to_remove = Vec::new();
713 #[allow(clippy::mutable_key_type)]
714 let mut merged_blocks: HashSet<_> = tipset.blocks().iter().cloned().collect();
715
716 let parent_refs: HashSet<_> = self
718 .tipsets
719 .values()
720 .map(|ts| ts.parents().clone())
721 .collect();
722
723 for (key, existing_ts) in self.tipsets.iter() {
724 if existing_ts.epoch() == tipset.epoch() && existing_ts.parents() == tipset.parents() {
725 if !parent_refs.contains(key) {
727 to_remove.push(key.clone());
728 }
729 merged_blocks.extend(existing_ts.blocks().iter().cloned());
731 }
732 }
733
734 for key in to_remove {
736 self.tipsets.remove(&key);
737 }
738
739 if let Ok(merged_tipset) = FullTipset::new(merged_blocks) {
741 self.tipsets
742 .insert(merged_tipset.key().clone(), merged_tipset);
743 }
744 }
745
746 fn mark_bad_tipset(&mut self, tipset: FullTipset) {
750 let mut stack = vec![tipset];
751 while let Some(tipset) = stack.pop() {
752 self.tipsets.remove(tipset.key());
753 let mut to_remove = Vec::new();
755 let mut descendants = Vec::new();
756
757 for (key, ts) in self.tipsets.iter() {
758 if ts.parents() == tipset.key() {
759 to_remove.push(key.clone());
760 descendants.push(ts.clone());
761 }
762 }
763
764 for key in to_remove {
766 self.tipsets.remove(&key);
767 }
768
769 stack.extend(descendants);
771 }
772 }
773
774 fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) {
775 if !self.is_parent_validated(&tipset) {
776 tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), parent_state = %tipset.parent_state(), "Parent tipset must be validated");
777 return;
778 }
779
780 self.tipsets.remove(tipset.key());
781 let tipset = tipset.into_tipset();
782 if self.stateless_mode {
784 let epoch = tipset.epoch();
785 let terse_key = tipset.key().terse();
786 if self.cs.heaviest_tipset().weight() < tipset.weight() {
787 if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
788 error!("Error setting heaviest tipset: {}", e);
789 } else {
790 info!("Heaviest tipset: {} ({})", epoch, terse_key);
791 }
792 }
793 } else if is_proposed_head {
794 if let Err(e) = self.cs.put_tipset(&tipset) {
795 error!("Error putting tipset: {e}");
796 }
797 } else if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
798 error!("Error setting heaviest tipset: {e}");
799 }
800 }
801
802 pub fn update(&mut self, event: SyncEvent) {
803 tracing::trace!("update: {event}");
804 match event {
805 SyncEvent::NewFullTipsets(tipsets) => {
806 for tipset in tipsets {
807 self.add_full_tipset(tipset);
808 }
809 }
810 SyncEvent::BadTipset(tipset) => self.mark_bad_tipset(tipset),
811 SyncEvent::ValidatedTipset {
812 tipset,
813 is_proposed_head,
814 } => self.mark_validated_tipset(tipset, is_proposed_head),
815 }
816 }
817
818 pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
819 let current_validated_epoch = self.cs.heaviest_tipset().epoch();
821 let now = Utc::now();
822
823 let mut active_sync_info = Vec::new();
824 let mut tasks = Vec::new();
825 for chain in self.chains() {
826 if let Some(first_ts) = chain.first() {
827 let last_ts = chain.last().expect("Infallible");
828 let stage: ForkSyncStage;
829 let start_time = Some(now);
830
831 if !self.is_ready_for_validation(first_ts) {
832 stage = ForkSyncStage::FetchingHeaders;
833 tasks.push(SyncTask::FetchTipset(
834 first_ts.parents().clone(),
835 first_ts.epoch(),
836 ));
837 } else {
838 stage = ForkSyncStage::ValidatingTipsets;
839 tasks.push(SyncTask::ValidateTipset {
840 tipset: first_ts.clone(),
841 is_proposed_head: chain.len() == 1,
842 });
843 }
844
845 let fork_info = ForkSyncInfo {
846 target_tipset_key: last_ts.key().clone(),
847 target_epoch: last_ts.epoch(),
848 target_sync_epoch_start: first_ts.epoch(),
849 stage,
850 validated_chain_head_epoch: current_validated_epoch,
851 start_time,
852 last_updated: Some(now),
853 };
854
855 active_sync_info.push(fork_info);
856 }
857 }
858 (tasks, active_sync_info)
859 }
860}
861
862#[derive(PartialEq, Eq, Hash, Clone, Debug)]
863enum SyncTask {
864 ValidateTipset {
865 tipset: FullTipset,
866 is_proposed_head: bool,
867 },
868 FetchTipset(TipsetKey, ChainEpoch),
869}
870
871impl std::fmt::Display for SyncTask {
872 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
873 match self {
874 SyncTask::ValidateTipset {
875 tipset,
876 is_proposed_head,
877 } => write!(
878 f,
879 "ValidateTipset(epoch: {}, is_proposed_head: {is_proposed_head})",
880 tipset.epoch()
881 ),
882 SyncTask::FetchTipset(key, epoch) => {
883 let s = key.to_string();
884 write!(
885 f,
886 "FetchTipset({}, epoch: {})",
887 &s[s.len().saturating_sub(8)..],
888 epoch
889 )
890 }
891 }
892 }
893}
894
895impl SyncTask {
896 async fn execute<DB: Blockstore + Sync + Send + 'static>(
897 self,
898 network: SyncNetworkContext<DB>,
899 state_manager: Arc<StateManager<DB>>,
900 stateless_mode: bool,
901 bad_block_cache: Option<Arc<BadBlockCache>>,
902 ) -> Option<SyncEvent> {
903 tracing::trace!("SyncTask::execute {self}");
904 match self {
905 SyncTask::ValidateTipset {
906 tipset,
907 is_proposed_head,
908 } if stateless_mode => Some(SyncEvent::ValidatedTipset {
909 tipset,
910 is_proposed_head,
911 }),
912 SyncTask::ValidateTipset {
913 tipset,
914 is_proposed_head,
915 } => match validate_tipset(&state_manager, tipset.clone(), bad_block_cache).await {
916 Ok(()) => Some(SyncEvent::ValidatedTipset {
917 tipset,
918 is_proposed_head,
919 }),
920 Err(e) if matches!(e, TipsetSyncerError::TimeTravellingBlock { .. }) => {
925 warn!("Time travelling block detected, skipping tipset for now: {e}");
926 None
927 }
928 Err(e) => {
929 warn!("Error validating tipset: {e}");
930 Some(SyncEvent::BadTipset(tipset))
931 }
932 },
933 SyncTask::FetchTipset(key, epoch) => {
934 match get_full_tipset_batch(&network, state_manager.chain_store(), None, &key).await
935 {
936 Ok(parents) => Some(SyncEvent::NewFullTipsets(parents)),
937 Err(e) => {
938 tracing::warn!(%key, %epoch, "failed to fetch tipset: {e:#}");
939 None
940 }
941 }
942 }
943 }
944 }
945}
946
947#[cfg(test)]
948mod tests {
949 use super::*;
950 use crate::blocks::{Chain4U, HeaderBuilder, chain4u};
951 use crate::db::MemoryDB;
952 use crate::utils::db::CborStoreExt as _;
953 use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
954 use num_bigint::BigInt;
955 use num_traits::ToPrimitive;
956 use std::sync::Arc;
957 use tracing::level_filters::LevelFilter;
958 use tracing_subscriber::EnvFilter;
959
960 fn setup() -> (Arc<ChainStore<MemoryDB>>, Chain4U<Arc<MemoryDB>>) {
961 let _ = tracing_subscriber::fmt()
963 .without_time()
964 .with_env_filter(
965 EnvFilter::builder()
966 .with_default_directive(LevelFilter::DEBUG.into())
967 .from_env()
968 .unwrap(),
969 )
970 .try_init();
971
972 let db = Arc::new(MemoryDB::default());
973
974 {
976 let empty_amt = Amt::<Cid, _>::new(&db).flush().unwrap();
977 db.put_cbor_default(&crate::blocks::TxMeta {
978 bls_message_root: empty_amt,
979 secp_message_root: empty_amt,
980 })
981 .unwrap();
982 }
983
984 let c4u = Chain4U::with_blockstore(db.clone());
986 chain4u! {
987 in c4u;
988 [genesis_header = dummy_node(&db, 0)]
989 };
990
991 let cs = Arc::new(
992 ChainStore::new(
993 db.clone(),
994 db.clone(),
995 db.clone(),
996 Default::default(),
997 genesis_header.clone().into(),
998 )
999 .unwrap(),
1000 );
1001
1002 cs.set_heaviest_tipset(cs.genesis_tipset()).unwrap();
1003
1004 (cs, c4u)
1005 }
1006
1007 fn dummy_state(db: impl Blockstore, i: ChainEpoch) -> Cid {
1008 db.put_cbor_default(&i).unwrap()
1009 }
1010
1011 fn dummy_node(db: impl Blockstore, i: ChainEpoch) -> HeaderBuilder {
1012 HeaderBuilder {
1013 state_root: dummy_state(db, i).into(),
1014 weight: BigInt::from(i).into(),
1015 epoch: i.into(),
1016 ..Default::default()
1017 }
1018 }
1019
1020 #[test]
1021 fn test_state_machine_validation_order() {
1022 let (cs, c4u) = setup();
1023 let db = cs.blockstore().clone();
1024
1025 chain4u! {
1026 from [genesis_header] in c4u;
1027 [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)] -> [c = dummy_node(&db, 3)] -> [d = dummy_node(&db, 4)] -> [e = dummy_node(&db, 5)]
1028 };
1029
1030 let mut state_machine = SyncStateMachine::new(cs, Default::default(), true);
1032
1033 let tipsets = vec![e, b, d, c, a];
1035
1036 for block in tipsets {
1038 let full_tipset = FullTipset::new(vec![Block {
1039 header: block.clone().into(),
1040 bls_messages: vec![],
1041 secp_messages: vec![],
1042 }])
1043 .unwrap();
1044 state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1045 }
1046
1047 let mut validation_tasks = Vec::new();
1049 loop {
1050 let (tasks, _) = state_machine.tasks();
1051
1052 let validation_tipsets: Vec<_> = tasks
1054 .into_iter()
1055 .filter_map(|task| {
1056 if let SyncTask::ValidateTipset {
1057 tipset,
1058 is_proposed_head,
1059 } = task
1060 {
1061 Some((tipset, is_proposed_head))
1062 } else {
1063 None
1064 }
1065 })
1066 .collect();
1067
1068 if validation_tipsets.is_empty() {
1069 break;
1070 }
1071
1072 for (ts, is_proposed_head) in validation_tipsets {
1074 validation_tasks.push(ts.epoch());
1075 db.put_cbor_default(&ts.epoch()).unwrap();
1076 state_machine.mark_validated_tipset(ts, is_proposed_head);
1077 }
1078 }
1079
1080 assert_eq!(validation_tasks, vec![1, 2, 3, 4, 5]);
1082 }
1083
1084 #[test]
1085 fn test_sync_state_machine_chain_fragments() {
1086 let (cs, c4u) = setup();
1087 let db = cs.blockstore().clone();
1088
1089 chain4u! {
1093 in c4u;
1094 [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)]
1095 };
1096 chain4u! {
1097 from [a] in c4u;
1098 [c = dummy_node(&db, 3)]
1099 };
1100
1101 let mut state_machine = SyncStateMachine::new(cs, Default::default(), false);
1103
1104 for block in [a, b, c] {
1106 let full_tipset = FullTipset::new(vec![Block {
1107 header: block.clone().into(),
1108 bls_messages: vec![],
1109 secp_messages: vec![],
1110 }])
1111 .unwrap();
1112 state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1113 }
1114
1115 let chains = state_machine
1116 .chains()
1117 .into_iter()
1118 .map(|v| {
1119 v.into_iter()
1120 .map(|ts| ts.weight().to_i64().unwrap_or(0))
1121 .collect_vec()
1122 })
1123 .collect_vec();
1124
1125 assert_eq!(chains, vec![vec![1, 3], vec![1, 2]]);
1127 }
1128}