1use super::network_context::SyncNetworkContext;
20use crate::{
21 blocks::{Block, FullTipset, Tipset, TipsetKey},
22 chain::ChainStore,
23 chain_sync::{
24 ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
25 bad_block_cache::BadBlockCache,
26 metrics,
27 tipset_syncer::{TipsetSyncerError, validate_tipset},
28 },
29 libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
30 message_pool::{MessagePool, MpoolRpcProvider},
31 networks::calculate_expected_epoch,
32 shim::clock::ChainEpoch,
33 state_manager::StateManager,
34 utils::misc::env::is_env_truthy,
35};
36use ahash::{HashMap, HashSet};
37use chrono::Utc;
38use cid::Cid;
39use fvm_ipld_blockstore::Blockstore;
40use itertools::Itertools;
41use libp2p::PeerId;
42use parking_lot::{Mutex, RwLock};
43use std::{sync::Arc, time::Instant};
44use tokio::{sync::Notify, task::JoinSet};
45use tracing::{debug, error, info, trace, warn};
46
47pub struct ChainFollower<DB> {
48 pub sync_status: SyncStatus,
50
51 state_manager: Arc<StateManager<DB>>,
53
54 pub network: SyncNetworkContext<DB>,
56
57 genesis: Tipset,
59
60 pub bad_blocks: Option<Arc<BadBlockCache>>,
64
65 net_handler: flume::Receiver<NetworkEvent>,
67
68 pub tipset_sender: flume::Sender<FullTipset>,
70
71 tipset_receiver: flume::Receiver<FullTipset>,
73
74 stateless_mode: bool,
79
80 mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
82}
83
84impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
85 pub fn new(
86 state_manager: Arc<StateManager<DB>>,
87 network: SyncNetworkContext<DB>,
88 genesis: Tipset,
89 net_handler: flume::Receiver<NetworkEvent>,
90 stateless_mode: bool,
91 mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
92 ) -> Self {
93 let (tipset_sender, tipset_receiver) = flume::bounded(20);
94 let disable_bad_block_cache = is_env_truthy("FOREST_DISABLE_BAD_BLOCK_CACHE");
95 Self {
96 sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
97 state_manager,
98 network,
99 genesis,
100 bad_blocks: if disable_bad_block_cache {
101 tracing::warn!("bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`");
102 None
103 } else {
104 Some(Default::default())
105 },
106 net_handler,
107 tipset_sender,
108 tipset_receiver,
109 stateless_mode,
110 mem_pool,
111 }
112 }
113
114 pub async fn run(self) -> anyhow::Result<()> {
115 chain_follower(
116 self.state_manager,
117 self.bad_blocks,
118 self.net_handler,
119 self.tipset_receiver,
120 self.network,
121 self.mem_pool,
122 self.sync_status,
123 self.genesis,
124 self.stateless_mode,
125 )
126 .await
127 }
128}
129
130#[allow(clippy::too_many_arguments)]
131pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
133 state_manager: Arc<StateManager<DB>>,
134 bad_block_cache: Option<Arc<BadBlockCache>>,
135 network_rx: flume::Receiver<NetworkEvent>,
136 tipset_receiver: flume::Receiver<FullTipset>,
137 network: SyncNetworkContext<DB>,
138 mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
139 sync_status: SyncStatus,
140 genesis: Tipset,
141 stateless_mode: bool,
142) -> anyhow::Result<()> {
143 let state_changed = Arc::new(Notify::new());
144 let state_machine = Arc::new(Mutex::new(SyncStateMachine::new(
145 state_manager.chain_store().clone(),
146 bad_block_cache.clone(),
147 stateless_mode,
148 )));
149 let tasks: Arc<Mutex<HashSet<SyncTask>>> = Arc::new(Mutex::new(HashSet::default()));
150
151 let mut set = JoinSet::new();
152
153 set.spawn({
155 let state_manager = state_manager.clone();
156 let state_changed = state_changed.clone();
157 let state_machine = state_machine.clone();
158 let network = network.clone();
159 async move {
160 while let Ok(event) = network_rx.recv_async().await {
161 inc_gossipsub_event_metrics(&event);
162
163 update_peer_info(
164 &event,
165 &network,
166 state_manager.chain_store().clone(),
167 &genesis,
168 );
169
170 let Ok(tipset) = (match event {
171 NetworkEvent::HelloResponseOutbound { request, source } => {
172 let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
173 get_full_tipset(
174 &network,
175 state_manager.chain_store(),
176 Some(source),
177 &tipset_keys,
178 )
179 .await
180 .inspect_err(|e| debug!("Querying full tipset failed: {}", e))
181 }
182 NetworkEvent::PubsubMessage { message } => match message {
183 PubsubMessage::Block(b) => {
184 let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
185 get_full_tipset(&network, state_manager.chain_store(), None, &key).await
186 }
187 PubsubMessage::Message(m) => {
188 if let Err(why) = mem_pool.add(m) {
189 debug!("Received invalid GossipSub message: {}", why);
190 }
191 continue;
192 }
193 },
194 _ => continue,
195 }) else {
196 continue;
197 };
198 {
199 state_machine
200 .lock()
201 .update(SyncEvent::NewFullTipsets(vec![tipset]));
202 state_changed.notify_one();
203 }
204 }
205 }
206 });
207
208 set.spawn({
210 let state_changed = state_changed.clone();
211 let state_machine = state_machine.clone();
212
213 async move {
214 while let Ok(tipset) = tipset_receiver.recv_async().await {
215 state_machine
216 .lock()
217 .update(SyncEvent::NewFullTipsets(vec![tipset]));
218 state_changed.notify_one();
219 }
220 }
221 });
222
223 set.spawn({
225 let state_manager = state_manager.clone();
226 let state_machine = state_machine.clone();
227 let state_changed = state_changed.clone();
228 let tasks = tasks.clone();
229 let bad_block_cache = bad_block_cache.clone();
230 async move {
231 loop {
232 state_changed.notified().await;
233
234 let mut tasks_set = tasks.lock();
235 let (task_vec, current_active_forks) = state_machine.lock().tasks();
236
237 {
239 let old_status_report = sync_status.read().clone();
240 let new_status_report = old_status_report.update(
241 &state_manager,
242 current_active_forks,
243 stateless_mode,
244 );
245
246 sync_status.write().clone_from(&new_status_report);
247 }
248
249 for task in task_vec {
250 let new = tasks_set.insert(task.clone());
252 if new {
253 let action = task.clone().execute(
254 network.clone(),
255 state_manager.clone(),
256 stateless_mode,
257 bad_block_cache.clone(),
258 );
259 tokio::spawn({
260 let tasks = tasks.clone();
261 let state_machine = state_machine.clone();
262 let state_changed = state_changed.clone();
263 async move {
264 if let Some(event) = action.await {
265 state_machine.lock().update(event);
266 state_changed.notify_one();
267 }
268 tasks.lock().remove(&task);
269 }
270 });
271 }
272 }
273 }
274 }
275 });
276
277 set.spawn({
281 let state_manager = state_manager.clone();
282 let state_machine = state_machine.clone();
283 async move {
284 loop {
285 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
286 let (tasks_set, _) = state_machine.lock().tasks();
287 let heaviest_tipset = state_manager.chain_store().heaviest_tipset();
288 let heaviest_epoch = heaviest_tipset.epoch();
289
290 let to_download = tasks_set
291 .iter()
292 .filter_map(|task| match task {
293 SyncTask::FetchTipset(_, epoch) => Some(epoch - heaviest_epoch),
294 _ => None,
295 })
296 .max()
297 .unwrap_or(0);
298
299 let expected_head = calculate_expected_epoch(
300 Utc::now().timestamp() as u64,
301 state_manager.chain_store().genesis_block_header().timestamp,
302 state_manager.chain_config().block_delay_secs,
303 );
304
305 match (expected_head - heaviest_epoch > 10, to_download > 0) {
308 (true, true) => info!(
309 "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}, downloading {to_download} tipsets"
310 , heaviest_tipset.key()
311 ),
312 (true, false) => info!(
313 "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}"
314 , heaviest_tipset.key()
315 ),
316 (false, true) => {
317 info!("Downloading {to_download} tipsets")
318 }
319 (false, false) => {}
320 }
321 }
322 }
323 });
324
325 set.join_all().await;
326 Ok(())
327}
328
329fn inc_gossipsub_event_metrics(event: &NetworkEvent) {
331 let label = match event {
332 NetworkEvent::HelloRequestInbound => metrics::values::HELLO_REQUEST_INBOUND,
333 NetworkEvent::HelloResponseOutbound { .. } => metrics::values::HELLO_RESPONSE_OUTBOUND,
334 NetworkEvent::HelloRequestOutbound => metrics::values::HELLO_REQUEST_OUTBOUND,
335 NetworkEvent::HelloResponseInbound => metrics::values::HELLO_RESPONSE_INBOUND,
336 NetworkEvent::PeerConnected(_) => metrics::values::PEER_CONNECTED,
337 NetworkEvent::PeerDisconnected(_) => metrics::values::PEER_DISCONNECTED,
338 NetworkEvent::PubsubMessage { message } => match message {
339 PubsubMessage::Block(_) => metrics::values::PUBSUB_BLOCK,
340 PubsubMessage::Message(_) => metrics::values::PUBSUB_MESSAGE,
341 },
342 NetworkEvent::ChainExchangeRequestOutbound => {
343 metrics::values::CHAIN_EXCHANGE_REQUEST_OUTBOUND
344 }
345 NetworkEvent::ChainExchangeResponseInbound => {
346 metrics::values::CHAIN_EXCHANGE_RESPONSE_INBOUND
347 }
348 NetworkEvent::ChainExchangeRequestInbound => {
349 metrics::values::CHAIN_EXCHANGE_REQUEST_INBOUND
350 }
351 NetworkEvent::ChainExchangeResponseOutbound => {
352 metrics::values::CHAIN_EXCHANGE_RESPONSE_OUTBOUND
353 }
354 };
355
356 metrics::LIBP2P_MESSAGE_TOTAL.get_or_create(&label).inc();
357}
358
359fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
361 event: &NetworkEvent,
362 network: &SyncNetworkContext<DB>,
363 chain_store: Arc<ChainStore<DB>>,
364 genesis: &Tipset,
365) {
366 match event {
367 NetworkEvent::PeerConnected(peer_id) => {
368 let genesis_cid = *genesis.block_headers().first().cid();
369 tokio::task::spawn(handle_peer_connected_event(
371 network.clone(),
372 chain_store,
373 *peer_id,
374 genesis_cid,
375 ));
376 }
377 NetworkEvent::PeerDisconnected(peer_id) => {
378 handle_peer_disconnected_event(network, *peer_id);
379 }
380 _ => {}
381 }
382}
383
384async fn handle_peer_connected_event<DB: Blockstore + Sync + Send + 'static>(
385 network: SyncNetworkContext<DB>,
386 chain_store: Arc<ChainStore<DB>>,
387 peer_id: PeerId,
388 genesis_block_cid: Cid,
389) {
390 if network.peer_manager().is_peer_new(&peer_id) {
392 let heaviest = chain_store.heaviest_tipset();
395 let request = HelloRequest {
396 heaviest_tip_set: heaviest.cids(),
397 heaviest_tipset_height: heaviest.epoch(),
398 heaviest_tipset_weight: heaviest.weight().clone().into(),
399 genesis_cid: genesis_block_cid,
400 };
401 let (peer_id, moment_sent, response) = match network.hello_request(peer_id, request).await {
402 Ok(response) => response,
403 Err(e) => {
404 debug!("Hello request failed: {}", e);
405 return;
406 }
407 };
408 let dur = Instant::now().duration_since(moment_sent);
409
410 match response {
412 Some(_) => {
413 network.peer_manager().log_success(&peer_id, dur);
414 }
415 None => {
416 network.peer_manager().log_failure(&peer_id, dur);
417 }
418 }
419 }
420}
421
422fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
423 network: &SyncNetworkContext<DB>,
424 peer_id: PeerId,
425) {
426 network.peer_manager().remove_peer(&peer_id);
427 network.peer_manager().unmark_peer_bad(&peer_id);
428}
429
430pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
431 network: &SyncNetworkContext<DB>,
432 chain_store: &ChainStore<DB>,
433 peer_id: Option<PeerId>,
434 tipset_keys: &TipsetKey,
435) -> anyhow::Result<FullTipset> {
436 if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
438 return Ok(full_tipset);
439 }
440 let tipset = network
442 .chain_exchange_full_tipset(peer_id, tipset_keys)
443 .await
444 .map_err(|e| anyhow::anyhow!(e))?;
445 tipset.persist(chain_store.blockstore())?;
446
447 Ok(tipset)
448}
449
450async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
451 network: &SyncNetworkContext<DB>,
452 chain_store: &ChainStore<DB>,
453 peer_id: Option<PeerId>,
454 tipset_keys: &TipsetKey,
455) -> anyhow::Result<Vec<FullTipset>> {
456 if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
458 return Ok(vec![full_tipset]);
459 }
460 let tipsets = network
462 .chain_exchange_full_tipsets(peer_id, tipset_keys)
463 .await
464 .map_err(|e| anyhow::anyhow!(e))?;
465
466 for tipset in tipsets.iter() {
467 tipset.persist(chain_store.blockstore())?;
468 }
469
470 Ok(tipsets)
471}
472
473pub fn load_full_tipset<DB: Blockstore>(
474 chain_store: &ChainStore<DB>,
475 tipset_keys: &TipsetKey,
476) -> anyhow::Result<FullTipset> {
477 let ts = chain_store
479 .chain_index()
480 .load_required_tipset(tipset_keys)?;
481 let blocks: Vec<_> = ts
482 .block_headers()
483 .iter()
484 .map(|header| -> anyhow::Result<Block> {
485 let (bls_msgs, secp_msgs) =
486 crate::chain::block_messages(chain_store.blockstore(), header)?;
487 Ok(Block {
488 header: header.clone(),
489 bls_messages: bls_msgs,
490 secp_messages: secp_msgs,
491 })
492 })
493 .try_collect()?;
494 let fts = FullTipset::new(blocks)?;
496 Ok(fts)
497}
498
499enum SyncEvent {
500 NewFullTipsets(Vec<FullTipset>),
501 BadTipset(FullTipset),
502 ValidatedTipset {
503 tipset: FullTipset,
504 is_proposed_head: bool,
505 },
506}
507
508impl std::fmt::Display for SyncEvent {
509 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
510 fn tss_to_string(tss: &[FullTipset]) -> String {
511 format!(
512 "epoch: {}-{}",
513 tss.first().map(|ts| ts.epoch()).unwrap_or_default(),
514 tss.last().map(|ts| ts.epoch()).unwrap_or_default()
515 )
516 }
517
518 match self {
519 Self::NewFullTipsets(tss) => write!(f, "NewFullTipsets({})", tss_to_string(tss)),
520 Self::BadTipset(ts) => {
521 write!(f, "BadTipset(epoch: {}, key: {})", ts.epoch(), ts.key())
522 }
523 Self::ValidatedTipset {
524 tipset,
525 is_proposed_head,
526 } => write!(
527 f,
528 "ValidatedTipset(epoch: {}, key: {}, is_proposed_head: {is_proposed_head})",
529 tipset.epoch(),
530 tipset.key()
531 ),
532 }
533 }
534}
535
536struct SyncStateMachine<DB> {
537 cs: Arc<ChainStore<DB>>,
538 bad_block_cache: Option<Arc<BadBlockCache>>,
539 tipsets: HashMap<TipsetKey, FullTipset>,
541 stateless_mode: bool,
542}
543
544impl<DB: Blockstore> SyncStateMachine<DB> {
545 pub fn new(
546 cs: Arc<ChainStore<DB>>,
547 bad_block_cache: Option<Arc<BadBlockCache>>,
548 stateless_mode: bool,
549 ) -> Self {
550 Self {
551 cs,
552 bad_block_cache,
553 tipsets: HashMap::default(),
554 stateless_mode,
555 }
556 }
557
558 fn chains(&self) -> Vec<Vec<FullTipset>> {
560 let mut chains = Vec::new();
561 let mut remaining_tipsets = self.tipsets.clone();
562
563 while let Some(heaviest) = remaining_tipsets
564 .values()
565 .max_by_key(|ts| ts.weight())
566 .cloned()
567 {
568 let mut chain = Vec::new();
570 let mut current = Some(heaviest);
571
572 while let Some(tipset) = current.take() {
573 remaining_tipsets.remove(tipset.key());
574
575 current = self.tipsets.get(tipset.parents()).cloned();
577
578 chain.push(tipset);
579 }
580 chain.reverse();
581 chains.push(chain);
582 }
583
584 chains
585 }
586
587 fn is_parent_validated(&self, tipset: &FullTipset) -> bool {
588 let db = self.cs.blockstore();
589 self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
590 }
591
592 fn is_ready_for_validation(&self, tipset: &FullTipset) -> bool {
593 if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
594 true
596 } else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
597 let head_ts = self.cs.heaviest_tipset();
598 if parent_ts.key() == head_ts.key() {
604 true
605 } else if parent_ts.epoch() >= head_ts.epoch() {
606 false
607 } else {
608 self.is_parent_validated(tipset)
609 }
610 } else {
611 false
612 }
613 }
614
615 fn add_full_tipset(&mut self, tipset: FullTipset) {
616 if let Err(why) = TipsetValidator(&tipset).validate(
617 &self.cs,
618 self.bad_block_cache.as_ref().map(AsRef::as_ref),
619 &self.cs.genesis_tipset(),
620 self.cs.chain_config().block_delay_secs,
621 ) {
622 metrics::INVALID_TIPSET_TOTAL.inc();
623 trace!("Skipping invalid tipset: {}", why);
624 self.mark_bad_tipset(tipset);
625 return;
626 }
627
628 let heaviest = self.cs.heaviest_tipset();
630 let epoch_diff = heaviest.epoch() - tipset.epoch();
631
632 if epoch_diff > self.cs.chain_config().policy.chain_finality {
633 self.mark_bad_tipset(tipset);
634 return;
635 }
636
637 if self.tipsets.contains_key(tipset.key()) {
639 return;
640 }
641
642 let mut to_remove = Vec::new();
644 #[allow(clippy::mutable_key_type)]
645 let mut merged_blocks: HashSet<_> = tipset.blocks().iter().cloned().collect();
646
647 let parent_refs: HashSet<_> = self
649 .tipsets
650 .values()
651 .map(|ts| ts.parents().clone())
652 .collect();
653
654 for (key, existing_ts) in self.tipsets.iter() {
655 if existing_ts.epoch() == tipset.epoch() && existing_ts.parents() == tipset.parents() {
656 if !parent_refs.contains(key) {
658 to_remove.push(key.clone());
659 }
660 merged_blocks.extend(existing_ts.blocks().iter().cloned());
662 }
663 }
664
665 for key in to_remove {
667 self.tipsets.remove(&key);
668 }
669
670 if let Ok(merged_tipset) = FullTipset::new(merged_blocks) {
672 self.tipsets
673 .insert(merged_tipset.key().clone(), merged_tipset);
674 }
675 }
676
677 fn mark_bad_tipset(&mut self, tipset: FullTipset) {
681 let mut stack = vec![tipset];
682 while let Some(tipset) = stack.pop() {
683 self.tipsets.remove(tipset.key());
684 let mut to_remove = Vec::new();
686 let mut descendants = Vec::new();
687
688 for (key, ts) in self.tipsets.iter() {
689 if ts.parents() == tipset.key() {
690 to_remove.push(key.clone());
691 descendants.push(ts.clone());
692 }
693 }
694
695 for key in to_remove {
697 self.tipsets.remove(&key);
698 }
699
700 stack.extend(descendants);
702 }
703 }
704
705 fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) {
706 if !self.is_parent_validated(&tipset) {
707 tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), "Tipset must be validated");
708 return;
709 }
710
711 self.tipsets.remove(tipset.key());
712 let tipset = tipset.into_tipset();
713 if self.stateless_mode {
715 let epoch = tipset.epoch();
716 let terse_key = tipset.key().terse();
717 if self.cs.heaviest_tipset().weight() < tipset.weight() {
718 if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
719 error!("Error setting heaviest tipset: {}", e);
720 } else {
721 info!("Heaviest tipset: {} ({})", epoch, terse_key);
722 }
723 }
724 } else if is_proposed_head {
725 if let Err(e) = self.cs.put_tipset(&tipset) {
726 error!("Error putting tipset: {e}");
727 }
728 } else if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
729 error!("Error setting heaviest tipset: {e}");
730 }
731 }
732
733 pub fn update(&mut self, event: SyncEvent) {
734 tracing::trace!("update: {event}");
735 match event {
736 SyncEvent::NewFullTipsets(tipsets) => {
737 for tipset in tipsets {
738 self.add_full_tipset(tipset);
739 }
740 }
741 SyncEvent::BadTipset(tipset) => self.mark_bad_tipset(tipset),
742 SyncEvent::ValidatedTipset {
743 tipset,
744 is_proposed_head,
745 } => self.mark_validated_tipset(tipset, is_proposed_head),
746 }
747 }
748
749 pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
750 let current_validated_epoch = self.cs.heaviest_tipset().epoch();
752 let now = Utc::now();
753
754 let mut active_sync_info = Vec::new();
755 let mut tasks = Vec::new();
756 for chain in self.chains() {
757 if let Some(first_ts) = chain.first() {
758 let last_ts = chain.last().expect("Infallible");
759 let stage: ForkSyncStage;
760 let start_time = Some(now);
761
762 if !self.is_ready_for_validation(first_ts) {
763 stage = ForkSyncStage::FetchingHeaders;
764 tasks.push(SyncTask::FetchTipset(
765 first_ts.parents().clone(),
766 first_ts.epoch(),
767 ));
768 } else {
769 stage = ForkSyncStage::ValidatingTipsets;
770 tasks.push(SyncTask::ValidateTipset {
771 tipset: first_ts.clone(),
772 is_proposed_head: chain.len() == 1,
773 });
774 }
775
776 let fork_info = ForkSyncInfo {
777 target_tipset_key: last_ts.key().clone(),
778 target_epoch: last_ts.epoch(),
779 target_sync_epoch_start: first_ts.epoch(),
780 stage,
781 validated_chain_head_epoch: current_validated_epoch,
782 start_time,
783 last_updated: Some(now),
784 };
785
786 active_sync_info.push(fork_info);
787 }
788 }
789 (tasks, active_sync_info)
790 }
791}
792
793#[derive(PartialEq, Eq, Hash, Clone, Debug)]
794enum SyncTask {
795 ValidateTipset {
796 tipset: FullTipset,
797 is_proposed_head: bool,
798 },
799 FetchTipset(TipsetKey, ChainEpoch),
800}
801
802impl std::fmt::Display for SyncTask {
803 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
804 match self {
805 SyncTask::ValidateTipset {
806 tipset,
807 is_proposed_head,
808 } => write!(
809 f,
810 "ValidateTipset(epoch: {}, is_proposed_head: {is_proposed_head})",
811 tipset.epoch()
812 ),
813 SyncTask::FetchTipset(key, epoch) => {
814 let s = key.to_string();
815 write!(
816 f,
817 "FetchTipset({}, epoch: {})",
818 &s[s.len().saturating_sub(8)..],
819 epoch
820 )
821 }
822 }
823 }
824}
825
826impl SyncTask {
827 async fn execute<DB: Blockstore + Sync + Send + 'static>(
828 self,
829 network: SyncNetworkContext<DB>,
830 state_manager: Arc<StateManager<DB>>,
831 stateless_mode: bool,
832 bad_block_cache: Option<Arc<BadBlockCache>>,
833 ) -> Option<SyncEvent> {
834 tracing::trace!("SyncTask::execute {self}");
835 match self {
836 SyncTask::ValidateTipset {
837 tipset,
838 is_proposed_head,
839 } if stateless_mode => Some(SyncEvent::ValidatedTipset {
840 tipset,
841 is_proposed_head,
842 }),
843 SyncTask::ValidateTipset {
844 tipset,
845 is_proposed_head,
846 } => match validate_tipset(&state_manager, tipset.clone(), bad_block_cache).await {
847 Ok(()) => Some(SyncEvent::ValidatedTipset {
848 tipset,
849 is_proposed_head,
850 }),
851 Err(e) if matches!(e, TipsetSyncerError::TimeTravellingBlock { .. }) => {
856 warn!("Time travelling block detected, skipping tipset for now: {e}");
857 None
858 }
859 Err(e) => {
860 warn!("Error validating tipset: {e}");
861 Some(SyncEvent::BadTipset(tipset))
862 }
863 },
864 SyncTask::FetchTipset(key, epoch) => {
865 match get_full_tipset_batch(&network, state_manager.chain_store(), None, &key).await
866 {
867 Ok(parents) => Some(SyncEvent::NewFullTipsets(parents)),
868 Err(e) => {
869 tracing::warn!(%key, %epoch, "failed to fetch tipset: {e}");
870 None
871 }
872 }
873 }
874 }
875 }
876}
877
878#[cfg(test)]
879mod tests {
880 use super::*;
881 use crate::blocks::{Chain4U, HeaderBuilder, chain4u};
882 use crate::db::MemoryDB;
883 use crate::utils::db::CborStoreExt as _;
884 use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
885 use num_bigint::BigInt;
886 use num_traits::ToPrimitive;
887 use std::sync::Arc;
888 use tracing::level_filters::LevelFilter;
889 use tracing_subscriber::EnvFilter;
890
891 fn setup() -> (Arc<ChainStore<MemoryDB>>, Chain4U<Arc<MemoryDB>>) {
892 let _ = tracing_subscriber::fmt()
894 .without_time()
895 .with_env_filter(
896 EnvFilter::builder()
897 .with_default_directive(LevelFilter::DEBUG.into())
898 .from_env()
899 .unwrap(),
900 )
901 .try_init();
902
903 let db = Arc::new(MemoryDB::default());
904
905 {
907 let empty_amt = Amt::<Cid, _>::new(&db).flush().unwrap();
908 db.put_cbor_default(&crate::blocks::TxMeta {
909 bls_message_root: empty_amt,
910 secp_message_root: empty_amt,
911 })
912 .unwrap();
913 }
914
915 let c4u = Chain4U::with_blockstore(db.clone());
917 chain4u! {
918 in c4u;
919 [genesis_header = dummy_node(&db, 0)]
920 };
921
922 let cs = Arc::new(
923 ChainStore::new(
924 db.clone(),
925 db.clone(),
926 db.clone(),
927 Default::default(),
928 genesis_header.clone().into(),
929 )
930 .unwrap(),
931 );
932
933 cs.set_heaviest_tipset(cs.genesis_tipset()).unwrap();
934
935 (cs, c4u)
936 }
937
938 fn dummy_state(db: impl Blockstore, i: ChainEpoch) -> Cid {
939 db.put_cbor_default(&i).unwrap()
940 }
941
942 fn dummy_node(db: impl Blockstore, i: ChainEpoch) -> HeaderBuilder {
943 HeaderBuilder {
944 state_root: dummy_state(db, i).into(),
945 weight: BigInt::from(i).into(),
946 epoch: i.into(),
947 ..Default::default()
948 }
949 }
950
951 #[test]
952 fn test_state_machine_validation_order() {
953 let (cs, c4u) = setup();
954 let db = cs.blockstore().clone();
955
956 chain4u! {
957 from [genesis_header] in c4u;
958 [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)] -> [c = dummy_node(&db, 3)] -> [d = dummy_node(&db, 4)] -> [e = dummy_node(&db, 5)]
959 };
960
961 let mut state_machine = SyncStateMachine::new(cs, Default::default(), true);
963
964 let tipsets = vec![e, b, d, c, a];
966
967 for block in tipsets {
969 let full_tipset = FullTipset::new(vec![Block {
970 header: block.clone().into(),
971 bls_messages: vec![],
972 secp_messages: vec![],
973 }])
974 .unwrap();
975 state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
976 }
977
978 let mut validation_tasks = Vec::new();
980 loop {
981 let (tasks, _) = state_machine.tasks();
982
983 let validation_tipsets: Vec<_> = tasks
985 .into_iter()
986 .filter_map(|task| {
987 if let SyncTask::ValidateTipset {
988 tipset,
989 is_proposed_head,
990 } = task
991 {
992 Some((tipset, is_proposed_head))
993 } else {
994 None
995 }
996 })
997 .collect();
998
999 if validation_tipsets.is_empty() {
1000 break;
1001 }
1002
1003 for (ts, is_proposed_head) in validation_tipsets {
1005 validation_tasks.push(ts.epoch());
1006 db.put_cbor_default(&ts.epoch()).unwrap();
1007 state_machine.mark_validated_tipset(ts, is_proposed_head);
1008 }
1009 }
1010
1011 assert_eq!(validation_tasks, vec![1, 2, 3, 4, 5]);
1013 }
1014
1015 #[test]
1016 fn test_sync_state_machine_chain_fragments() {
1017 let (cs, c4u) = setup();
1018 let db = cs.blockstore().clone();
1019
1020 chain4u! {
1024 in c4u;
1025 [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)]
1026 };
1027 chain4u! {
1028 from [a] in c4u;
1029 [c = dummy_node(&db, 3)]
1030 };
1031
1032 let mut state_machine = SyncStateMachine::new(cs, Default::default(), false);
1034
1035 for block in [a, b, c] {
1037 let full_tipset = FullTipset::new(vec![Block {
1038 header: block.clone().into(),
1039 bls_messages: vec![],
1040 secp_messages: vec![],
1041 }])
1042 .unwrap();
1043 state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1044 }
1045
1046 let chains = state_machine
1047 .chains()
1048 .into_iter()
1049 .map(|v| {
1050 v.into_iter()
1051 .map(|ts| ts.weight().to_i64().unwrap_or(0))
1052 .collect_vec()
1053 })
1054 .collect_vec();
1055
1056 assert_eq!(chains, vec![vec![1, 3], vec![1, 2]]);
1058 }
1059}