1use super::network_context::SyncNetworkContext;
20use crate::{
21 blocks::{Block, FullTipset, Tipset, TipsetKey},
22 chain::ChainStore,
23 chain_sync::{
24 ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
25 bad_block_cache::BadBlockCache, metrics, tipset_syncer::validate_tipset,
26 },
27 libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
28 message_pool::{MessagePool, MpoolRpcProvider},
29 networks::calculate_expected_epoch,
30 shim::clock::ChainEpoch,
31 state_manager::StateManager,
32 utils::misc::env::is_env_truthy,
33};
34use ahash::{HashMap, HashSet};
35use chrono::Utc;
36use cid::Cid;
37use fvm_ipld_blockstore::Blockstore;
38use itertools::Itertools;
39use libp2p::PeerId;
40use parking_lot::{Mutex, RwLock};
41use std::{sync::Arc, time::Instant};
42use tokio::{sync::Notify, task::JoinSet};
43use tracing::{debug, error, info, trace, warn};
44
45pub struct ChainFollower<DB> {
46 pub sync_status: SyncStatus,
48
49 state_manager: Arc<StateManager<DB>>,
51
52 pub network: SyncNetworkContext<DB>,
54
55 genesis: Tipset,
57
58 pub bad_blocks: Option<Arc<BadBlockCache>>,
62
63 net_handler: flume::Receiver<NetworkEvent>,
65
66 pub tipset_sender: flume::Sender<FullTipset>,
68
69 tipset_receiver: flume::Receiver<FullTipset>,
71
72 stateless_mode: bool,
77
78 mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
80}
81
82impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
83 pub fn new(
84 state_manager: Arc<StateManager<DB>>,
85 network: SyncNetworkContext<DB>,
86 genesis: Tipset,
87 net_handler: flume::Receiver<NetworkEvent>,
88 stateless_mode: bool,
89 mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
90 ) -> Self {
91 let (tipset_sender, tipset_receiver) = flume::bounded(20);
92 let disable_bad_block_cache = is_env_truthy("FOREST_DISABLE_BAD_BLOCK_CACHE");
93 Self {
94 sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
95 state_manager,
96 network,
97 genesis,
98 bad_blocks: if disable_bad_block_cache {
99 tracing::warn!("bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`");
100 None
101 } else {
102 Some(Default::default())
103 },
104 net_handler,
105 tipset_sender,
106 tipset_receiver,
107 stateless_mode,
108 mem_pool,
109 }
110 }
111
112 pub async fn run(self) -> anyhow::Result<()> {
113 chain_follower(
114 self.state_manager,
115 self.bad_blocks,
116 self.net_handler,
117 self.tipset_receiver,
118 self.network,
119 self.mem_pool,
120 self.sync_status,
121 self.genesis,
122 self.stateless_mode,
123 )
124 .await
125 }
126}
127
128#[allow(clippy::too_many_arguments)]
129pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
131 state_manager: Arc<StateManager<DB>>,
132 bad_block_cache: Option<Arc<BadBlockCache>>,
133 network_rx: flume::Receiver<NetworkEvent>,
134 tipset_receiver: flume::Receiver<FullTipset>,
135 network: SyncNetworkContext<DB>,
136 mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
137 sync_status: SyncStatus,
138 genesis: Tipset,
139 stateless_mode: bool,
140) -> anyhow::Result<()> {
141 let state_changed = Arc::new(Notify::new());
142 let state_machine = Arc::new(Mutex::new(SyncStateMachine::new(
143 state_manager.chain_store().clone(),
144 bad_block_cache.clone(),
145 stateless_mode,
146 )));
147 let tasks: Arc<Mutex<HashSet<SyncTask>>> = Arc::new(Mutex::new(HashSet::default()));
148
149 let mut set = JoinSet::new();
150
151 set.spawn({
153 let state_manager = state_manager.clone();
154 let state_changed = state_changed.clone();
155 let state_machine = state_machine.clone();
156 let network = network.clone();
157 async move {
158 while let Ok(event) = network_rx.recv_async().await {
159 inc_gossipsub_event_metrics(&event);
160
161 update_peer_info(
162 &event,
163 &network,
164 state_manager.chain_store().clone(),
165 &genesis,
166 );
167
168 let Ok(tipset) = (match event {
169 NetworkEvent::HelloResponseOutbound { request, source } => {
170 let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
171 get_full_tipset(
172 &network,
173 state_manager.chain_store(),
174 Some(source),
175 &tipset_keys,
176 )
177 .await
178 .inspect_err(|e| debug!("Querying full tipset failed: {}", e))
179 }
180 NetworkEvent::PubsubMessage { message } => match message {
181 PubsubMessage::Block(b) => {
182 let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
183 get_full_tipset(&network, state_manager.chain_store(), None, &key).await
184 }
185 PubsubMessage::Message(m) => {
186 if let Err(why) = mem_pool.add(m) {
187 debug!("Received invalid GossipSub message: {}", why);
188 }
189 continue;
190 }
191 },
192 _ => continue,
193 }) else {
194 continue;
195 };
196 {
197 state_machine
198 .lock()
199 .update(SyncEvent::NewFullTipsets(vec![tipset]));
200 state_changed.notify_one();
201 }
202 }
203 }
204 });
205
206 set.spawn({
208 let state_changed = state_changed.clone();
209 let state_machine = state_machine.clone();
210
211 async move {
212 while let Ok(tipset) = tipset_receiver.recv_async().await {
213 state_machine
214 .lock()
215 .update(SyncEvent::NewFullTipsets(vec![tipset]));
216 state_changed.notify_one();
217 }
218 }
219 });
220
221 set.spawn({
223 let state_manager = state_manager.clone();
224 let state_machine = state_machine.clone();
225 let state_changed = state_changed.clone();
226 let tasks = tasks.clone();
227 let bad_block_cache = bad_block_cache.clone();
228 async move {
229 loop {
230 state_changed.notified().await;
231
232 let mut tasks_set = tasks.lock();
233 let (task_vec, current_active_forks) = state_machine.lock().tasks();
234
235 {
237 let old_status_report = sync_status.read().clone();
238 let new_status_report = old_status_report.update(
239 &state_manager,
240 current_active_forks,
241 stateless_mode,
242 );
243
244 sync_status.write().clone_from(&new_status_report);
245 }
246
247 for task in task_vec {
248 let new = tasks_set.insert(task.clone());
250 if new {
251 let action = task.clone().execute(
252 network.clone(),
253 state_manager.clone(),
254 stateless_mode,
255 bad_block_cache.clone(),
256 );
257 tokio::spawn({
258 let tasks = tasks.clone();
259 let state_machine = state_machine.clone();
260 let state_changed = state_changed.clone();
261 async move {
262 if let Some(event) = action.await {
263 state_machine.lock().update(event);
264 state_changed.notify_one();
265 }
266 tasks.lock().remove(&task);
267 }
268 });
269 }
270 }
271 }
272 }
273 });
274
275 set.spawn({
279 let state_manager = state_manager.clone();
280 let state_machine = state_machine.clone();
281 async move {
282 loop {
283 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
284 let (tasks_set, _) = state_machine.lock().tasks();
285 let heaviest_tipset = state_manager.chain_store().heaviest_tipset();
286 let heaviest_epoch = heaviest_tipset.epoch();
287
288 let to_download = tasks_set
289 .iter()
290 .filter_map(|task| match task {
291 SyncTask::FetchTipset(_, epoch) => Some(epoch - heaviest_epoch),
292 _ => None,
293 })
294 .max()
295 .unwrap_or(0);
296
297 let expected_head = calculate_expected_epoch(
298 Utc::now().timestamp() as u64,
299 state_manager.chain_store().genesis_block_header().timestamp,
300 state_manager.chain_config().block_delay_secs,
301 );
302
303 match (expected_head - heaviest_epoch > 10, to_download > 0) {
306 (true, true) => info!(
307 "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}, downloading {to_download} tipsets"
308 , heaviest_tipset.key()
309 ),
310 (true, false) => info!(
311 "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}"
312 , heaviest_tipset.key()
313 ),
314 (false, true) => {
315 info!("Downloading {to_download} tipsets")
316 }
317 (false, false) => {}
318 }
319 }
320 }
321 });
322
323 set.join_all().await;
324 Ok(())
325}
326
327fn inc_gossipsub_event_metrics(event: &NetworkEvent) {
329 let label = match event {
330 NetworkEvent::HelloRequestInbound => metrics::values::HELLO_REQUEST_INBOUND,
331 NetworkEvent::HelloResponseOutbound { .. } => metrics::values::HELLO_RESPONSE_OUTBOUND,
332 NetworkEvent::HelloRequestOutbound => metrics::values::HELLO_REQUEST_OUTBOUND,
333 NetworkEvent::HelloResponseInbound => metrics::values::HELLO_RESPONSE_INBOUND,
334 NetworkEvent::PeerConnected(_) => metrics::values::PEER_CONNECTED,
335 NetworkEvent::PeerDisconnected(_) => metrics::values::PEER_DISCONNECTED,
336 NetworkEvent::PubsubMessage { message } => match message {
337 PubsubMessage::Block(_) => metrics::values::PUBSUB_BLOCK,
338 PubsubMessage::Message(_) => metrics::values::PUBSUB_MESSAGE,
339 },
340 NetworkEvent::ChainExchangeRequestOutbound => {
341 metrics::values::CHAIN_EXCHANGE_REQUEST_OUTBOUND
342 }
343 NetworkEvent::ChainExchangeResponseInbound => {
344 metrics::values::CHAIN_EXCHANGE_RESPONSE_INBOUND
345 }
346 NetworkEvent::ChainExchangeRequestInbound => {
347 metrics::values::CHAIN_EXCHANGE_REQUEST_INBOUND
348 }
349 NetworkEvent::ChainExchangeResponseOutbound => {
350 metrics::values::CHAIN_EXCHANGE_RESPONSE_OUTBOUND
351 }
352 };
353
354 metrics::LIBP2P_MESSAGE_TOTAL.get_or_create(&label).inc();
355}
356
357fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
359 event: &NetworkEvent,
360 network: &SyncNetworkContext<DB>,
361 chain_store: Arc<ChainStore<DB>>,
362 genesis: &Tipset,
363) {
364 match event {
365 NetworkEvent::PeerConnected(peer_id) => {
366 let genesis_cid = *genesis.block_headers().first().cid();
367 tokio::task::spawn(handle_peer_connected_event(
369 network.clone(),
370 chain_store,
371 *peer_id,
372 genesis_cid,
373 ));
374 }
375 NetworkEvent::PeerDisconnected(peer_id) => {
376 handle_peer_disconnected_event(network, *peer_id);
377 }
378 _ => {}
379 }
380}
381
382async fn handle_peer_connected_event<DB: Blockstore + Sync + Send + 'static>(
383 network: SyncNetworkContext<DB>,
384 chain_store: Arc<ChainStore<DB>>,
385 peer_id: PeerId,
386 genesis_block_cid: Cid,
387) {
388 if network.peer_manager().is_peer_new(&peer_id) {
390 let heaviest = chain_store.heaviest_tipset();
393 let request = HelloRequest {
394 heaviest_tip_set: heaviest.cids(),
395 heaviest_tipset_height: heaviest.epoch(),
396 heaviest_tipset_weight: heaviest.weight().clone().into(),
397 genesis_cid: genesis_block_cid,
398 };
399 let (peer_id, moment_sent, response) = match network.hello_request(peer_id, request).await {
400 Ok(response) => response,
401 Err(e) => {
402 debug!("Hello request failed: {}", e);
403 return;
404 }
405 };
406 let dur = Instant::now().duration_since(moment_sent);
407
408 match response {
410 Some(_) => {
411 network.peer_manager().log_success(&peer_id, dur);
412 }
413 None => {
414 network.peer_manager().log_failure(&peer_id, dur);
415 }
416 }
417 }
418}
419
420fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
421 network: &SyncNetworkContext<DB>,
422 peer_id: PeerId,
423) {
424 network.peer_manager().remove_peer(&peer_id);
425 network.peer_manager().unmark_peer_bad(&peer_id);
426}
427
428pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
429 network: &SyncNetworkContext<DB>,
430 chain_store: &ChainStore<DB>,
431 peer_id: Option<PeerId>,
432 tipset_keys: &TipsetKey,
433) -> anyhow::Result<FullTipset> {
434 if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
436 return Ok(full_tipset);
437 }
438 let tipset = network
440 .chain_exchange_full_tipset(peer_id, tipset_keys)
441 .await
442 .map_err(|e| anyhow::anyhow!(e))?;
443 tipset.persist(chain_store.blockstore())?;
444
445 Ok(tipset)
446}
447
448async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
449 network: &SyncNetworkContext<DB>,
450 chain_store: &ChainStore<DB>,
451 peer_id: Option<PeerId>,
452 tipset_keys: &TipsetKey,
453) -> anyhow::Result<Vec<FullTipset>> {
454 if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
456 return Ok(vec![full_tipset]);
457 }
458 let tipsets = network
460 .chain_exchange_full_tipsets(peer_id, tipset_keys)
461 .await
462 .map_err(|e| anyhow::anyhow!(e))?;
463
464 for tipset in tipsets.iter() {
465 tipset.persist(chain_store.blockstore())?;
466 }
467
468 Ok(tipsets)
469}
470
471pub fn load_full_tipset<DB: Blockstore>(
472 chain_store: &ChainStore<DB>,
473 tipset_keys: &TipsetKey,
474) -> anyhow::Result<FullTipset> {
475 let ts = chain_store
477 .chain_index()
478 .load_required_tipset(tipset_keys)?;
479 let blocks: Vec<_> = ts
480 .block_headers()
481 .iter()
482 .map(|header| -> anyhow::Result<Block> {
483 let (bls_msgs, secp_msgs) =
484 crate::chain::block_messages(chain_store.blockstore(), header)?;
485 Ok(Block {
486 header: header.clone(),
487 bls_messages: bls_msgs,
488 secp_messages: secp_msgs,
489 })
490 })
491 .try_collect()?;
492 let fts = FullTipset::new(blocks)?;
494 Ok(fts)
495}
496
497enum SyncEvent {
498 NewFullTipsets(Vec<FullTipset>),
499 BadTipset(FullTipset),
500 ValidatedTipset {
501 tipset: FullTipset,
502 is_proposed_head: bool,
503 },
504}
505
506impl std::fmt::Display for SyncEvent {
507 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
508 fn tss_to_string(tss: &[FullTipset]) -> String {
509 format!(
510 "epoch: {}-{}",
511 tss.first().map(|ts| ts.epoch()).unwrap_or_default(),
512 tss.last().map(|ts| ts.epoch()).unwrap_or_default()
513 )
514 }
515
516 match self {
517 Self::NewFullTipsets(tss) => write!(f, "NewFullTipsets({})", tss_to_string(tss)),
518 Self::BadTipset(ts) => {
519 write!(f, "BadTipset(epoch: {}, key: {})", ts.epoch(), ts.key())
520 }
521 Self::ValidatedTipset {
522 tipset,
523 is_proposed_head,
524 } => write!(
525 f,
526 "ValidatedTipset(epoch: {}, key: {}, is_proposed_head: {is_proposed_head})",
527 tipset.epoch(),
528 tipset.key()
529 ),
530 }
531 }
532}
533
534struct SyncStateMachine<DB> {
535 cs: Arc<ChainStore<DB>>,
536 bad_block_cache: Option<Arc<BadBlockCache>>,
537 tipsets: HashMap<TipsetKey, FullTipset>,
539 stateless_mode: bool,
540}
541
542impl<DB: Blockstore> SyncStateMachine<DB> {
543 pub fn new(
544 cs: Arc<ChainStore<DB>>,
545 bad_block_cache: Option<Arc<BadBlockCache>>,
546 stateless_mode: bool,
547 ) -> Self {
548 Self {
549 cs,
550 bad_block_cache,
551 tipsets: HashMap::default(),
552 stateless_mode,
553 }
554 }
555
556 fn chains(&self) -> Vec<Vec<FullTipset>> {
558 let mut chains = Vec::new();
559 let mut remaining_tipsets = self.tipsets.clone();
560
561 while let Some(heaviest) = remaining_tipsets
562 .values()
563 .max_by_key(|ts| ts.weight())
564 .cloned()
565 {
566 let mut chain = Vec::new();
568 let mut current = Some(heaviest);
569
570 while let Some(tipset) = current.take() {
571 remaining_tipsets.remove(tipset.key());
572
573 current = self.tipsets.get(tipset.parents()).cloned();
575
576 chain.push(tipset);
577 }
578 chain.reverse();
579 chains.push(chain);
580 }
581
582 chains
583 }
584
585 fn is_parent_validated(&self, tipset: &FullTipset) -> bool {
586 let db = self.cs.blockstore();
587 self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
588 }
589
590 fn is_ready_for_validation(&self, tipset: &FullTipset) -> bool {
591 if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
592 true
594 } else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
595 let head_ts = self.cs.heaviest_tipset();
596 if parent_ts.key() == head_ts.key() {
602 true
603 } else if parent_ts.epoch() >= head_ts.epoch() {
604 false
605 } else {
606 self.is_parent_validated(tipset)
607 }
608 } else {
609 false
610 }
611 }
612
613 fn add_full_tipset(&mut self, tipset: FullTipset) {
614 if let Err(why) = TipsetValidator(&tipset).validate(
615 &self.cs,
616 self.bad_block_cache.as_ref().map(AsRef::as_ref),
617 &self.cs.genesis_tipset(),
618 self.cs.chain_config().block_delay_secs,
619 ) {
620 metrics::INVALID_TIPSET_TOTAL.inc();
621 trace!("Skipping invalid tipset: {}", why);
622 self.mark_bad_tipset(tipset);
623 return;
624 }
625
626 let heaviest = self.cs.heaviest_tipset();
628 let epoch_diff = heaviest.epoch() - tipset.epoch();
629
630 if epoch_diff > self.cs.chain_config().policy.chain_finality {
631 self.mark_bad_tipset(tipset);
632 return;
633 }
634
635 if self.tipsets.contains_key(tipset.key()) {
637 return;
638 }
639
640 let mut to_remove = Vec::new();
642 #[allow(clippy::mutable_key_type)]
643 let mut merged_blocks: HashSet<_> = tipset.blocks().iter().cloned().collect();
644
645 let parent_refs: HashSet<_> = self
647 .tipsets
648 .values()
649 .map(|ts| ts.parents().clone())
650 .collect();
651
652 for (key, existing_ts) in self.tipsets.iter() {
653 if existing_ts.epoch() == tipset.epoch() && existing_ts.parents() == tipset.parents() {
654 if !parent_refs.contains(key) {
656 to_remove.push(key.clone());
657 }
658 merged_blocks.extend(existing_ts.blocks().iter().cloned());
660 }
661 }
662
663 for key in to_remove {
665 self.tipsets.remove(&key);
666 }
667
668 if let Ok(merged_tipset) = FullTipset::new(merged_blocks) {
670 self.tipsets
671 .insert(merged_tipset.key().clone(), merged_tipset);
672 }
673 }
674
675 fn mark_bad_tipset(&mut self, tipset: FullTipset) {
679 let mut stack = vec![tipset];
680 while let Some(tipset) = stack.pop() {
681 self.tipsets.remove(tipset.key());
682 let mut to_remove = Vec::new();
684 let mut descendants = Vec::new();
685
686 for (key, ts) in self.tipsets.iter() {
687 if ts.parents() == tipset.key() {
688 to_remove.push(key.clone());
689 descendants.push(ts.clone());
690 }
691 }
692
693 for key in to_remove {
695 self.tipsets.remove(&key);
696 }
697
698 stack.extend(descendants);
700 }
701 }
702
703 fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) {
704 if !self.is_parent_validated(&tipset) {
705 tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), "Tipset must be validated");
706 return;
707 }
708
709 self.tipsets.remove(tipset.key());
710 let tipset = tipset.into_tipset();
711 if self.stateless_mode {
713 let epoch = tipset.epoch();
714 let terse_key = tipset.key().terse();
715 if self.cs.heaviest_tipset().weight() < tipset.weight() {
716 if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
717 error!("Error setting heaviest tipset: {}", e);
718 } else {
719 info!("Heaviest tipset: {} ({})", epoch, terse_key);
720 }
721 }
722 } else if is_proposed_head {
723 if let Err(e) = self.cs.put_tipset(&tipset) {
724 error!("Error putting tipset: {e}");
725 }
726 } else if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
727 error!("Error setting heaviest tipset: {e}");
728 }
729 }
730
731 pub fn update(&mut self, event: SyncEvent) {
732 tracing::trace!("update: {event}");
733 match event {
734 SyncEvent::NewFullTipsets(tipsets) => {
735 for tipset in tipsets {
736 self.add_full_tipset(tipset);
737 }
738 }
739 SyncEvent::BadTipset(tipset) => self.mark_bad_tipset(tipset),
740 SyncEvent::ValidatedTipset {
741 tipset,
742 is_proposed_head,
743 } => self.mark_validated_tipset(tipset, is_proposed_head),
744 }
745 }
746
747 pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
748 let current_validated_epoch = self.cs.heaviest_tipset().epoch();
750 let now = Utc::now();
751
752 let mut active_sync_info = Vec::new();
753 let mut tasks = Vec::new();
754 for chain in self.chains() {
755 if let Some(first_ts) = chain.first() {
756 let last_ts = chain.last().expect("Infallible");
757 let stage: ForkSyncStage;
758 let start_time = Some(now);
759
760 if !self.is_ready_for_validation(first_ts) {
761 stage = ForkSyncStage::FetchingHeaders;
762 tasks.push(SyncTask::FetchTipset(
763 first_ts.parents().clone(),
764 first_ts.epoch(),
765 ));
766 } else {
767 stage = ForkSyncStage::ValidatingTipsets;
768 tasks.push(SyncTask::ValidateTipset {
769 tipset: first_ts.clone(),
770 is_proposed_head: chain.len() == 1,
771 });
772 }
773
774 let fork_info = ForkSyncInfo {
775 target_tipset_key: last_ts.key().clone(),
776 target_epoch: last_ts.epoch(),
777 target_sync_epoch_start: first_ts.epoch(),
778 stage,
779 validated_chain_head_epoch: current_validated_epoch,
780 start_time,
781 last_updated: Some(now),
782 };
783
784 active_sync_info.push(fork_info);
785 }
786 }
787 (tasks, active_sync_info)
788 }
789}
790
791#[derive(PartialEq, Eq, Hash, Clone, Debug)]
792enum SyncTask {
793 ValidateTipset {
794 tipset: FullTipset,
795 is_proposed_head: bool,
796 },
797 FetchTipset(TipsetKey, ChainEpoch),
798}
799
800impl std::fmt::Display for SyncTask {
801 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
802 match self {
803 SyncTask::ValidateTipset {
804 tipset,
805 is_proposed_head,
806 } => write!(
807 f,
808 "ValidateTipset(epoch: {}, is_proposed_head: {is_proposed_head})",
809 tipset.epoch()
810 ),
811 SyncTask::FetchTipset(key, epoch) => {
812 let s = key.to_string();
813 write!(
814 f,
815 "FetchTipset({}, epoch: {})",
816 &s[s.len().saturating_sub(8)..],
817 epoch
818 )
819 }
820 }
821 }
822}
823
824impl SyncTask {
825 async fn execute<DB: Blockstore + Sync + Send + 'static>(
826 self,
827 network: SyncNetworkContext<DB>,
828 state_manager: Arc<StateManager<DB>>,
829 stateless_mode: bool,
830 bad_block_cache: Option<Arc<BadBlockCache>>,
831 ) -> Option<SyncEvent> {
832 tracing::trace!("SyncTask::execute {self}");
833 match self {
834 SyncTask::ValidateTipset {
835 tipset,
836 is_proposed_head,
837 } if stateless_mode => Some(SyncEvent::ValidatedTipset {
838 tipset,
839 is_proposed_head,
840 }),
841 SyncTask::ValidateTipset {
842 tipset,
843 is_proposed_head,
844 } => match validate_tipset(&state_manager, tipset.clone(), bad_block_cache).await {
845 Ok(()) => Some(SyncEvent::ValidatedTipset {
846 tipset,
847 is_proposed_head,
848 }),
849 Err(e) => {
850 warn!("Error validating tipset: {e}");
851 Some(SyncEvent::BadTipset(tipset))
852 }
853 },
854 SyncTask::FetchTipset(key, epoch) => {
855 match get_full_tipset_batch(&network, state_manager.chain_store(), None, &key).await
856 {
857 Ok(parents) => Some(SyncEvent::NewFullTipsets(parents)),
858 Err(e) => {
859 tracing::warn!(%key, %epoch, "failed to fetch tipset: {e}");
860 None
861 }
862 }
863 }
864 }
865 }
866}
867
868#[cfg(test)]
869mod tests {
870 use super::*;
871 use crate::blocks::{Chain4U, HeaderBuilder, chain4u};
872 use crate::db::MemoryDB;
873 use crate::utils::db::CborStoreExt as _;
874 use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
875 use num_bigint::BigInt;
876 use num_traits::ToPrimitive;
877 use std::sync::Arc;
878 use tracing::level_filters::LevelFilter;
879 use tracing_subscriber::EnvFilter;
880
881 fn setup() -> (Arc<ChainStore<MemoryDB>>, Chain4U<Arc<MemoryDB>>) {
882 let _ = tracing_subscriber::fmt()
884 .without_time()
885 .with_env_filter(
886 EnvFilter::builder()
887 .with_default_directive(LevelFilter::DEBUG.into())
888 .from_env()
889 .unwrap(),
890 )
891 .try_init();
892
893 let db = Arc::new(MemoryDB::default());
894
895 {
897 let empty_amt = Amt::<Cid, _>::new(&db).flush().unwrap();
898 db.put_cbor_default(&crate::blocks::TxMeta {
899 bls_message_root: empty_amt,
900 secp_message_root: empty_amt,
901 })
902 .unwrap();
903 }
904
905 let c4u = Chain4U::with_blockstore(db.clone());
907 chain4u! {
908 in c4u;
909 [genesis_header = dummy_node(&db, 0)]
910 };
911
912 let cs = Arc::new(
913 ChainStore::new(
914 db.clone(),
915 db.clone(),
916 db.clone(),
917 Default::default(),
918 genesis_header.clone().into(),
919 )
920 .unwrap(),
921 );
922
923 cs.set_heaviest_tipset(cs.genesis_tipset()).unwrap();
924
925 (cs, c4u)
926 }
927
928 fn dummy_state(db: impl Blockstore, i: ChainEpoch) -> Cid {
929 db.put_cbor_default(&i).unwrap()
930 }
931
932 fn dummy_node(db: impl Blockstore, i: ChainEpoch) -> HeaderBuilder {
933 HeaderBuilder {
934 state_root: dummy_state(db, i).into(),
935 weight: BigInt::from(i).into(),
936 epoch: i.into(),
937 ..Default::default()
938 }
939 }
940
941 #[test]
942 fn test_state_machine_validation_order() {
943 let (cs, c4u) = setup();
944 let db = cs.blockstore().clone();
945
946 chain4u! {
947 from [genesis_header] in c4u;
948 [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)] -> [c = dummy_node(&db, 3)] -> [d = dummy_node(&db, 4)] -> [e = dummy_node(&db, 5)]
949 };
950
951 let mut state_machine = SyncStateMachine::new(cs, Default::default(), true);
953
954 let tipsets = vec![e, b, d, c, a];
956
957 for block in tipsets {
959 let full_tipset = FullTipset::new(vec![Block {
960 header: block.clone().into(),
961 bls_messages: vec![],
962 secp_messages: vec![],
963 }])
964 .unwrap();
965 state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
966 }
967
968 let mut validation_tasks = Vec::new();
970 loop {
971 let (tasks, _) = state_machine.tasks();
972
973 let validation_tipsets: Vec<_> = tasks
975 .into_iter()
976 .filter_map(|task| {
977 if let SyncTask::ValidateTipset {
978 tipset,
979 is_proposed_head,
980 } = task
981 {
982 Some((tipset, is_proposed_head))
983 } else {
984 None
985 }
986 })
987 .collect();
988
989 if validation_tipsets.is_empty() {
990 break;
991 }
992
993 for (ts, is_proposed_head) in validation_tipsets {
995 validation_tasks.push(ts.epoch());
996 db.put_cbor_default(&ts.epoch()).unwrap();
997 state_machine.mark_validated_tipset(ts, is_proposed_head);
998 }
999 }
1000
1001 assert_eq!(validation_tasks, vec![1, 2, 3, 4, 5]);
1003 }
1004
1005 #[test]
1006 fn test_sync_state_machine_chain_fragments() {
1007 let (cs, c4u) = setup();
1008 let db = cs.blockstore().clone();
1009
1010 chain4u! {
1014 in c4u;
1015 [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)]
1016 };
1017 chain4u! {
1018 from [a] in c4u;
1019 [c = dummy_node(&db, 3)]
1020 };
1021
1022 let mut state_machine = SyncStateMachine::new(cs, Default::default(), false);
1024
1025 for block in [a, b, c] {
1027 let full_tipset = FullTipset::new(vec![Block {
1028 header: block.clone().into(),
1029 bls_messages: vec![],
1030 secp_messages: vec![],
1031 }])
1032 .unwrap();
1033 state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1034 }
1035
1036 let chains = state_machine
1037 .chains()
1038 .into_iter()
1039 .map(|v| {
1040 v.into_iter()
1041 .map(|ts| ts.weight().to_i64().unwrap_or(0))
1042 .collect_vec()
1043 })
1044 .collect_vec();
1045
1046 assert_eq!(chains, vec![vec![1, 3], vec![1, 2]]);
1048 }
1049}