1use super::network_context::SyncNetworkContext;
20use crate::{
21 blocks::{Block, FullTipset, Tipset, TipsetKey},
22 chain::ChainStore,
23 chain_sync::{
24 ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
25 bad_block_cache::BadBlockCache, metrics, tipset_syncer::validate_tipset,
26 },
27 libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
28 message_pool::{MessagePool, MpoolRpcProvider},
29 networks::calculate_expected_epoch,
30 shim::clock::ChainEpoch,
31 state_manager::StateManager,
32 utils::misc::env::is_env_truthy,
33};
34use ahash::{HashMap, HashSet};
35use chrono::Utc;
36use cid::Cid;
37use fvm_ipld_blockstore::Blockstore;
38use itertools::Itertools;
39use libp2p::PeerId;
40use parking_lot::{Mutex, RwLock};
41use std::{ops::Deref as _, sync::Arc, time::Instant};
42use tokio::{sync::Notify, task::JoinSet};
43use tracing::{debug, error, info, trace, warn};
44
45pub struct ChainFollower<DB> {
46 pub sync_status: SyncStatus,
48
49 state_manager: Arc<StateManager<DB>>,
51
52 pub network: SyncNetworkContext<DB>,
54
55 genesis: Arc<Tipset>,
57
58 pub bad_blocks: Option<Arc<BadBlockCache>>,
62
63 net_handler: flume::Receiver<NetworkEvent>,
65
66 pub tipset_sender: flume::Sender<Arc<FullTipset>>,
68
69 tipset_receiver: flume::Receiver<Arc<FullTipset>>,
71
72 stateless_mode: bool,
77
78 mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
80}
81
82impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
83 pub fn new(
84 state_manager: Arc<StateManager<DB>>,
85 network: SyncNetworkContext<DB>,
86 genesis: Arc<Tipset>,
87 net_handler: flume::Receiver<NetworkEvent>,
88 stateless_mode: bool,
89 mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
90 ) -> Self {
91 let (tipset_sender, tipset_receiver) = flume::bounded(20);
92 let disable_bad_block_cache = is_env_truthy("FOREST_DISABLE_BAD_BLOCK_CACHE");
93 Self {
94 sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
95 state_manager,
96 network,
97 genesis,
98 bad_blocks: if disable_bad_block_cache {
99 tracing::warn!("bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`");
100 None
101 } else {
102 Some(Default::default())
103 },
104 net_handler,
105 tipset_sender,
106 tipset_receiver,
107 stateless_mode,
108 mem_pool,
109 }
110 }
111
112 pub async fn run(self) -> anyhow::Result<()> {
113 chain_follower(
114 self.state_manager,
115 self.bad_blocks,
116 self.net_handler,
117 self.tipset_receiver,
118 self.network,
119 self.mem_pool,
120 self.sync_status,
121 self.genesis,
122 self.stateless_mode,
123 )
124 .await
125 }
126}
127
128#[allow(clippy::too_many_arguments)]
129pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
131 state_manager: Arc<StateManager<DB>>,
132 bad_block_cache: Option<Arc<BadBlockCache>>,
133 network_rx: flume::Receiver<NetworkEvent>,
134 tipset_receiver: flume::Receiver<Arc<FullTipset>>,
135 network: SyncNetworkContext<DB>,
136 mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
137 sync_status: SyncStatus,
138 genesis: Arc<Tipset>,
139 stateless_mode: bool,
140) -> anyhow::Result<()> {
141 let state_changed = Arc::new(Notify::new());
142 let state_machine = Arc::new(Mutex::new(SyncStateMachine::new(
143 state_manager.chain_store().clone(),
144 bad_block_cache.clone(),
145 stateless_mode,
146 )));
147 let tasks: Arc<Mutex<HashSet<SyncTask>>> = Arc::new(Mutex::new(HashSet::default()));
148
149 let mut set = JoinSet::new();
150
151 set.spawn({
153 let state_manager = state_manager.clone();
154 let state_changed = state_changed.clone();
155 let state_machine = state_machine.clone();
156 let network = network.clone();
157 async move {
158 while let Ok(event) = network_rx.recv_async().await {
159 inc_gossipsub_event_metrics(&event);
160
161 update_peer_info(
162 &event,
163 network.clone(),
164 state_manager.chain_store().clone(),
165 &genesis,
166 );
167
168 let Ok(tipset) = (match event {
169 NetworkEvent::HelloResponseOutbound { request, source } => {
170 let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
171 get_full_tipset(
172 network.clone(),
173 state_manager.chain_store().clone(),
174 Some(source),
175 &tipset_keys,
176 )
177 .await
178 .inspect_err(|e| debug!("Querying full tipset failed: {}", e))
179 }
180 NetworkEvent::PubsubMessage { message } => match message {
181 PubsubMessage::Block(b) => {
182 let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
183 get_full_tipset(
184 network.clone(),
185 state_manager.chain_store().clone(),
186 None,
187 &key,
188 )
189 .await
190 }
191 PubsubMessage::Message(m) => {
192 if let Err(why) = mem_pool.add(m) {
193 debug!("Received invalid GossipSub message: {}", why);
194 }
195 continue;
196 }
197 },
198 _ => continue,
199 }) else {
200 continue;
201 };
202 {
203 state_machine
204 .lock()
205 .update(SyncEvent::NewFullTipsets(vec![Arc::new(tipset)]));
206 state_changed.notify_one();
207 }
208 }
209 }
210 });
211
212 set.spawn({
214 let state_changed = state_changed.clone();
215 let state_machine = state_machine.clone();
216
217 async move {
218 while let Ok(tipset) = tipset_receiver.recv_async().await {
219 state_machine
220 .lock()
221 .update(SyncEvent::NewFullTipsets(vec![tipset]));
222 state_changed.notify_one();
223 }
224 }
225 });
226
227 set.spawn({
229 let state_manager = state_manager.clone();
230 let state_machine = state_machine.clone();
231 let state_changed = state_changed.clone();
232 let tasks = tasks.clone();
233 let bad_block_cache = bad_block_cache.clone();
234 async move {
235 loop {
236 state_changed.notified().await;
237
238 let mut tasks_set = tasks.lock();
239 let (task_vec, current_active_forks) = state_machine.lock().tasks();
240
241 {
243 let mut status_report_guard = sync_status.write();
244 status_report_guard.update(
245 &state_manager,
246 current_active_forks,
247 stateless_mode,
248 );
249 }
250
251 for task in task_vec {
252 let new = tasks_set.insert(task.clone());
254 if new {
255 let tasks_clone = tasks.clone();
256 let action = task.clone().execute(
257 network.clone(),
258 state_manager.clone(),
259 stateless_mode,
260 bad_block_cache.clone(),
261 );
262 tokio::spawn({
263 let state_machine = state_machine.clone();
264 let state_changed = state_changed.clone();
265 async move {
266 if let Some(event) = action.await {
267 state_machine.lock().update(event);
268 state_changed.notify_one();
269 }
270 tasks_clone.lock().remove(&task);
271 }
272 });
273 }
274 }
275 }
276 }
277 });
278
279 set.spawn({
283 let state_manager = state_manager.clone();
284 let state_machine = state_machine.clone();
285 async move {
286 loop {
287 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
288 let (tasks_set, _) = state_machine.lock().tasks();
289 let heaviest_tipset = state_manager.chain_store().heaviest_tipset();
290 let heaviest_epoch = heaviest_tipset.epoch();
291
292 let to_download = tasks_set
293 .iter()
294 .filter_map(|task| match task {
295 SyncTask::FetchTipset(_, epoch) => Some(epoch - heaviest_epoch),
296 _ => None,
297 })
298 .max()
299 .unwrap_or(0);
300
301 let expected_head = calculate_expected_epoch(
302 Utc::now().timestamp() as u64,
303 state_manager.chain_store().genesis_block_header().timestamp,
304 state_manager.chain_config().block_delay_secs,
305 );
306
307 match (expected_head - heaviest_epoch > 10, to_download > 0) {
310 (true, true) => info!(
311 "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}, downloading {to_download} tipsets"
312 , heaviest_tipset.key()
313 ),
314 (true, false) => info!(
315 "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}"
316 , heaviest_tipset.key()
317 ),
318 (false, true) => {
319 info!("Downloading {to_download} tipsets")
320 }
321 (false, false) => {}
322 }
323 }
324 }
325 });
326
327 set.join_all().await;
328 Ok(())
329}
330
331fn inc_gossipsub_event_metrics(event: &NetworkEvent) {
333 let label = match event {
334 NetworkEvent::HelloRequestInbound => metrics::values::HELLO_REQUEST_INBOUND,
335 NetworkEvent::HelloResponseOutbound { .. } => metrics::values::HELLO_RESPONSE_OUTBOUND,
336 NetworkEvent::HelloRequestOutbound => metrics::values::HELLO_REQUEST_OUTBOUND,
337 NetworkEvent::HelloResponseInbound => metrics::values::HELLO_RESPONSE_INBOUND,
338 NetworkEvent::PeerConnected(_) => metrics::values::PEER_CONNECTED,
339 NetworkEvent::PeerDisconnected(_) => metrics::values::PEER_DISCONNECTED,
340 NetworkEvent::PubsubMessage { message } => match message {
341 PubsubMessage::Block(_) => metrics::values::PUBSUB_BLOCK,
342 PubsubMessage::Message(_) => metrics::values::PUBSUB_MESSAGE,
343 },
344 NetworkEvent::ChainExchangeRequestOutbound => {
345 metrics::values::CHAIN_EXCHANGE_REQUEST_OUTBOUND
346 }
347 NetworkEvent::ChainExchangeResponseInbound => {
348 metrics::values::CHAIN_EXCHANGE_RESPONSE_INBOUND
349 }
350 NetworkEvent::ChainExchangeRequestInbound => {
351 metrics::values::CHAIN_EXCHANGE_REQUEST_INBOUND
352 }
353 NetworkEvent::ChainExchangeResponseOutbound => {
354 metrics::values::CHAIN_EXCHANGE_RESPONSE_OUTBOUND
355 }
356 };
357
358 metrics::LIBP2P_MESSAGE_TOTAL.get_or_create(&label).inc();
359}
360
361fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
363 event: &NetworkEvent,
364 network: SyncNetworkContext<DB>,
365 chain_store: Arc<ChainStore<DB>>,
366 genesis: &Tipset,
367) {
368 match event {
369 NetworkEvent::PeerConnected(peer_id) => {
370 let genesis_cid = *genesis.block_headers().first().cid();
371 tokio::task::spawn(handle_peer_connected_event(
373 network,
374 chain_store,
375 *peer_id,
376 genesis_cid,
377 ));
378 }
379 NetworkEvent::PeerDisconnected(peer_id) => {
380 handle_peer_disconnected_event(network, *peer_id);
381 }
382 _ => {}
383 }
384}
385
386async fn handle_peer_connected_event<DB: Blockstore + Sync + Send + 'static>(
387 network: SyncNetworkContext<DB>,
388 chain_store: Arc<ChainStore<DB>>,
389 peer_id: PeerId,
390 genesis_block_cid: Cid,
391) {
392 if network.peer_manager().is_peer_new(&peer_id) {
394 let heaviest = chain_store.heaviest_tipset();
397 let request = HelloRequest {
398 heaviest_tip_set: heaviest.cids(),
399 heaviest_tipset_height: heaviest.epoch(),
400 heaviest_tipset_weight: heaviest.weight().clone().into(),
401 genesis_cid: genesis_block_cid,
402 };
403 let (peer_id, moment_sent, response) = match network.hello_request(peer_id, request).await {
404 Ok(response) => response,
405 Err(e) => {
406 debug!("Hello request failed: {}", e);
407 return;
408 }
409 };
410 let dur = Instant::now().duration_since(moment_sent);
411
412 match response {
414 Some(_) => {
415 network.peer_manager().log_success(&peer_id, dur);
416 }
417 None => {
418 network.peer_manager().log_failure(&peer_id, dur);
419 }
420 }
421 }
422}
423
424fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
425 network: SyncNetworkContext<DB>,
426 peer_id: PeerId,
427) {
428 network.peer_manager().remove_peer(&peer_id);
429 network.peer_manager().unmark_peer_bad(&peer_id);
430}
431
432pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
433 network: SyncNetworkContext<DB>,
434 chain_store: Arc<ChainStore<DB>>,
435 peer_id: Option<PeerId>,
436 tipset_keys: &TipsetKey,
437) -> anyhow::Result<FullTipset> {
438 if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys) {
440 return Ok(full_tipset);
441 }
442 let tipset = network
444 .chain_exchange_full_tipset(peer_id, tipset_keys)
445 .await
446 .map_err(|e| anyhow::anyhow!(e))?;
447 tipset.persist(chain_store.blockstore())?;
448
449 Ok(tipset)
450}
451
452async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
453 network: SyncNetworkContext<DB>,
454 chain_store: Arc<ChainStore<DB>>,
455 peer_id: Option<PeerId>,
456 tipset_keys: &TipsetKey,
457) -> anyhow::Result<Vec<FullTipset>> {
458 if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys) {
460 return Ok(vec![full_tipset]);
461 }
462 let tipsets = network
464 .chain_exchange_full_tipsets(peer_id, tipset_keys)
465 .await
466 .map_err(|e| anyhow::anyhow!(e))?;
467
468 for tipset in tipsets.iter() {
469 tipset.persist(chain_store.blockstore())?;
470 }
471
472 Ok(tipsets)
473}
474
475pub fn load_full_tipset<DB: Blockstore>(
476 chain_store: &ChainStore<DB>,
477 tipset_keys: &TipsetKey,
478) -> anyhow::Result<FullTipset> {
479 let ts = chain_store
481 .chain_index()
482 .load_required_tipset(tipset_keys)?;
483 let blocks: Vec<_> = ts
484 .block_headers()
485 .iter()
486 .map(|header| -> anyhow::Result<Block> {
487 let (bls_msgs, secp_msgs) =
488 crate::chain::block_messages(chain_store.blockstore(), header)?;
489 Ok(Block {
490 header: header.clone(),
491 bls_messages: bls_msgs,
492 secp_messages: secp_msgs,
493 })
494 })
495 .try_collect()?;
496 let fts = FullTipset::new(blocks)?;
498 Ok(fts)
499}
500
501enum SyncEvent {
502 NewFullTipsets(Vec<Arc<FullTipset>>),
503 BadTipset(Arc<FullTipset>),
504 ValidatedTipset {
505 tipset: Arc<FullTipset>,
506 is_proposed_head: bool,
507 },
508}
509
510impl std::fmt::Display for SyncEvent {
511 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512 fn tss_to_string(tss: &[Arc<FullTipset>]) -> String {
513 format!(
514 "epoch: {}-{}",
515 tss.first().map(|ts| ts.epoch()).unwrap_or_default(),
516 tss.last().map(|ts| ts.epoch()).unwrap_or_default()
517 )
518 }
519
520 match self {
521 Self::NewFullTipsets(tss) => write!(f, "NewFullTipsets({})", tss_to_string(tss)),
522 Self::BadTipset(ts) => {
523 write!(f, "BadTipset(epoch: {}, key: {})", ts.epoch(), ts.key())
524 }
525 Self::ValidatedTipset {
526 tipset,
527 is_proposed_head,
528 } => write!(
529 f,
530 "ValidatedTipset(epoch: {}, key: {}, is_proposed_head: {is_proposed_head})",
531 tipset.epoch(),
532 tipset.key()
533 ),
534 }
535 }
536}
537
538struct SyncStateMachine<DB> {
539 cs: Arc<ChainStore<DB>>,
540 bad_block_cache: Option<Arc<BadBlockCache>>,
541 tipsets: HashMap<TipsetKey, Arc<FullTipset>>,
543 stateless_mode: bool,
544}
545
546impl<DB: Blockstore> SyncStateMachine<DB> {
547 pub fn new(
548 cs: Arc<ChainStore<DB>>,
549 bad_block_cache: Option<Arc<BadBlockCache>>,
550 stateless_mode: bool,
551 ) -> Self {
552 Self {
553 cs,
554 bad_block_cache,
555 tipsets: HashMap::default(),
556 stateless_mode,
557 }
558 }
559
560 fn chains(&self) -> Vec<Vec<Arc<FullTipset>>> {
562 let mut chains = Vec::new();
563 let mut remaining_tipsets = self.tipsets.clone();
564
565 while let Some(heaviest) = remaining_tipsets
566 .values()
567 .max_by_key(|ts| ts.weight())
568 .cloned()
569 {
570 let mut chain = Vec::new();
572 let mut current = Some(heaviest);
573
574 while let Some(tipset) = current.take() {
575 remaining_tipsets.remove(tipset.key());
576
577 current = self.tipsets.get(tipset.parents()).cloned();
579
580 chain.push(tipset);
581 }
582 chain.reverse();
583 chains.push(chain);
584 }
585
586 chains
587 }
588
589 fn is_parent_validated(&self, tipset: &FullTipset) -> bool {
590 let db = self.cs.blockstore();
591 self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
592 }
593
594 fn is_ready_for_validation(&self, tipset: &FullTipset) -> bool {
595 if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
596 true
598 } else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
599 let head_ts = self.cs.heaviest_tipset();
600 if parent_ts.key() == head_ts.key() {
606 true
607 } else if parent_ts.epoch() >= head_ts.epoch() {
608 false
609 } else {
610 self.is_parent_validated(tipset)
611 }
612 } else {
613 false
614 }
615 }
616
617 fn add_full_tipset(&mut self, tipset: Arc<FullTipset>) {
618 if let Err(why) = TipsetValidator(&tipset).validate(
619 &self.cs,
620 self.bad_block_cache.as_ref().map(AsRef::as_ref),
621 &self.cs.genesis_tipset(),
622 self.cs.chain_config().block_delay_secs,
623 ) {
624 metrics::INVALID_TIPSET_TOTAL.inc();
625 trace!("Skipping invalid tipset: {}", why);
626 self.mark_bad_tipset(tipset);
627 return;
628 }
629
630 let heaviest = self.cs.heaviest_tipset();
632 let epoch_diff = heaviest.epoch() - tipset.epoch();
633
634 if epoch_diff > self.cs.chain_config().policy.chain_finality {
635 self.mark_bad_tipset(tipset);
636 return;
637 }
638
639 if self.tipsets.contains_key(tipset.key()) {
641 return;
642 }
643
644 let mut to_remove = Vec::new();
646 #[allow(clippy::mutable_key_type)]
647 let mut merged_blocks: HashSet<_> = tipset.blocks().iter().cloned().collect();
648
649 let parent_refs: HashSet<_> = self
651 .tipsets
652 .values()
653 .map(|ts| ts.parents().clone())
654 .collect();
655
656 for (key, existing_ts) in self.tipsets.iter() {
657 if existing_ts.epoch() == tipset.epoch() && existing_ts.parents() == tipset.parents() {
658 if !parent_refs.contains(key) {
660 to_remove.push(key.clone());
661 }
662 merged_blocks.extend(existing_ts.blocks().iter().cloned());
664 }
665 }
666
667 for key in to_remove {
669 self.tipsets.remove(&key);
670 }
671
672 if let Ok(merged_tipset) = FullTipset::new(merged_blocks) {
674 self.tipsets
675 .insert(merged_tipset.key().clone(), Arc::new(merged_tipset));
676 }
677 }
678
679 fn mark_bad_tipset(&mut self, tipset: Arc<FullTipset>) {
683 let mut stack = vec![tipset];
684 while let Some(tipset) = stack.pop() {
685 self.tipsets.remove(tipset.key());
686 let mut to_remove = Vec::new();
688 let mut descendants = Vec::new();
689
690 for (key, ts) in self.tipsets.iter() {
691 if ts.parents() == tipset.key() {
692 to_remove.push(key.clone());
693 descendants.push(ts.clone());
694 }
695 }
696
697 for key in to_remove {
699 self.tipsets.remove(&key);
700 }
701
702 stack.extend(descendants);
704 }
705 }
706
707 fn mark_validated_tipset(&mut self, tipset: Arc<FullTipset>, is_proposed_head: bool) {
708 if !self.is_parent_validated(&tipset) {
709 tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), "Tipset must be validated");
710 return;
711 }
712
713 self.tipsets.remove(tipset.key());
714 let tipset = tipset.deref().clone().into_tipset();
715 if self.stateless_mode {
717 let epoch = tipset.epoch();
718 let terse_key = tipset.key().terse();
719 if self.cs.heaviest_tipset().weight() < tipset.weight() {
720 if let Err(e) = self.cs.set_heaviest_tipset(Arc::new(tipset)) {
721 error!("Error setting heaviest tipset: {}", e);
722 } else {
723 info!("Heaviest tipset: {} ({})", epoch, terse_key);
724 }
725 }
726 } else if is_proposed_head {
727 if let Err(e) = self.cs.put_tipset(&tipset) {
728 error!("Error putting tipset: {e}");
729 }
730 } else if let Err(e) = self.cs.set_heaviest_tipset(tipset.into()) {
731 error!("Error setting heaviest tipset: {e}");
732 }
733 }
734
735 pub fn update(&mut self, event: SyncEvent) {
736 tracing::trace!("update: {event}");
737 match event {
738 SyncEvent::NewFullTipsets(tipsets) => {
739 for tipset in tipsets {
740 self.add_full_tipset(tipset);
741 }
742 }
743 SyncEvent::BadTipset(tipset) => self.mark_bad_tipset(tipset),
744 SyncEvent::ValidatedTipset {
745 tipset,
746 is_proposed_head,
747 } => self.mark_validated_tipset(tipset, is_proposed_head),
748 }
749 }
750
751 pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
752 let current_validated_epoch = self.cs.heaviest_tipset().epoch();
754 let now = Utc::now();
755
756 let mut active_sync_info = Vec::new();
757 let mut tasks = Vec::new();
758 for chain in self.chains() {
759 if let Some(first_ts) = chain.first() {
760 let last_ts = chain.last().expect("Infallible");
761 let stage: ForkSyncStage;
762 let start_time = Some(now);
763
764 if !self.is_ready_for_validation(first_ts) {
765 stage = ForkSyncStage::FetchingHeaders;
766 tasks.push(SyncTask::FetchTipset(
767 first_ts.parents().clone(),
768 first_ts.epoch(),
769 ));
770 } else {
771 stage = ForkSyncStage::ValidatingTipsets;
772 tasks.push(SyncTask::ValidateTipset {
773 tipset: first_ts.clone(),
774 is_proposed_head: chain.len() == 1,
775 });
776 }
777
778 let fork_info = ForkSyncInfo {
779 target_tipset_key: last_ts.key().clone(),
780 target_epoch: last_ts.epoch(),
781 target_sync_epoch_start: first_ts.epoch(),
782 stage,
783 validated_chain_head_epoch: current_validated_epoch,
784 start_time,
785 last_updated: Some(now),
786 };
787
788 active_sync_info.push(fork_info);
789 }
790 }
791 (tasks, active_sync_info)
792 }
793}
794
795#[derive(PartialEq, Eq, Hash, Clone, Debug)]
796enum SyncTask {
797 ValidateTipset {
798 tipset: Arc<FullTipset>,
799 is_proposed_head: bool,
800 },
801 FetchTipset(TipsetKey, ChainEpoch),
802}
803
804impl std::fmt::Display for SyncTask {
805 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
806 match self {
807 SyncTask::ValidateTipset {
808 tipset,
809 is_proposed_head,
810 } => write!(
811 f,
812 "ValidateTipset(epoch: {}, is_proposed_head: {is_proposed_head})",
813 tipset.epoch()
814 ),
815 SyncTask::FetchTipset(key, epoch) => {
816 let s = key.to_string();
817 write!(
818 f,
819 "FetchTipset({}, epoch: {})",
820 &s[s.len().saturating_sub(8)..],
821 epoch
822 )
823 }
824 }
825 }
826}
827
828impl SyncTask {
829 async fn execute<DB: Blockstore + Sync + Send + 'static>(
830 self,
831 network: SyncNetworkContext<DB>,
832 state_manager: Arc<StateManager<DB>>,
833 stateless_mode: bool,
834 bad_block_cache: Option<Arc<BadBlockCache>>,
835 ) -> Option<SyncEvent> {
836 tracing::trace!("SyncTask::execute {self}");
837 let cs = state_manager.chain_store();
838 match self {
839 SyncTask::ValidateTipset {
840 tipset,
841 is_proposed_head,
842 } if stateless_mode => Some(SyncEvent::ValidatedTipset {
843 tipset,
844 is_proposed_head,
845 }),
846 SyncTask::ValidateTipset {
847 tipset,
848 is_proposed_head,
849 } => {
850 let genesis = cs.genesis_tipset();
851 match validate_tipset(
852 state_manager.clone(),
853 cs,
854 tipset.deref().clone(),
855 &genesis,
856 bad_block_cache,
857 )
858 .await
859 {
860 Ok(()) => Some(SyncEvent::ValidatedTipset {
861 tipset,
862 is_proposed_head,
863 }),
864 Err(e) => {
865 warn!("Error validating tipset: {}", e);
866 Some(SyncEvent::BadTipset(tipset))
867 }
868 }
869 }
870 SyncTask::FetchTipset(key, epoch) => {
871 match get_full_tipset_batch(network.clone(), cs.clone(), None, &key).await {
872 Ok(parents) => Some(SyncEvent::NewFullTipsets(
873 parents.into_iter().map(Arc::new).collect(),
874 )),
875 Err(e) => {
876 tracing::warn!(%key, %epoch, "failed to fetch tipset: {e}");
877 None
878 }
879 }
880 }
881 }
882 }
883}
884
885#[cfg(test)]
886mod tests {
887 use super::*;
888 use crate::blocks::{Chain4U, HeaderBuilder, chain4u};
889 use crate::db::MemoryDB;
890 use crate::utils::db::CborStoreExt as _;
891 use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
892 use num_bigint::BigInt;
893 use num_traits::ToPrimitive;
894 use std::sync::Arc;
895
896 fn setup() -> (Arc<ChainStore<MemoryDB>>, Chain4U<Arc<MemoryDB>>) {
897 let _ = tracing_subscriber::fmt()
899 .with_env_filter(
900 tracing_subscriber::EnvFilter::from_default_env()
901 .add_directive(tracing::Level::DEBUG.into()),
902 )
903 .try_init();
904
905 let db = Arc::new(MemoryDB::default());
906
907 {
909 let empty_amt = Amt::<Cid, _>::new(&db).flush().unwrap();
910 db.put_cbor_default(&crate::blocks::TxMeta {
911 bls_message_root: empty_amt,
912 secp_message_root: empty_amt,
913 })
914 .unwrap();
915 }
916
917 let c4u = Chain4U::with_blockstore(db.clone());
919 chain4u! {
920 in c4u;
921 [genesis_header = dummy_node(&db, 0)]
922 };
923
924 let cs = Arc::new(
925 ChainStore::new(
926 db.clone(),
927 db.clone(),
928 db.clone(),
929 db.clone(),
930 Default::default(),
931 genesis_header.clone().into(),
932 )
933 .unwrap(),
934 );
935
936 cs.set_heaviest_tipset(Arc::new(cs.genesis_tipset()))
937 .unwrap();
938
939 (cs, c4u)
940 }
941
942 fn dummy_state(db: impl Blockstore, i: ChainEpoch) -> Cid {
943 db.put_cbor_default(&i).unwrap()
944 }
945
946 fn dummy_node(db: impl Blockstore, i: ChainEpoch) -> HeaderBuilder {
947 HeaderBuilder {
948 state_root: dummy_state(db, i).into(),
949 weight: BigInt::from(i).into(),
950 epoch: i.into(),
951 ..Default::default()
952 }
953 }
954
955 #[test]
956 fn test_state_machine_validation_order() {
957 let (cs, c4u) = setup();
958 let db = cs.blockstore().clone();
959
960 chain4u! {
961 from [genesis_header] in c4u;
962 [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)] -> [c = dummy_node(&db, 3)] -> [d = dummy_node(&db, 4)] -> [e = dummy_node(&db, 5)]
963 };
964
965 let mut state_machine = SyncStateMachine::new(cs, Default::default(), true);
967
968 let tipsets = vec![e, b, d, c, a];
970
971 for block in tipsets {
973 let full_tipset = FullTipset::new(vec![Block {
974 header: block.clone().into(),
975 bls_messages: vec![],
976 secp_messages: vec![],
977 }])
978 .unwrap();
979 state_machine.update(SyncEvent::NewFullTipsets(vec![Arc::new(full_tipset)]));
980 }
981
982 let mut validation_tasks = Vec::new();
984 loop {
985 let (tasks, _) = state_machine.tasks();
986
987 let validation_tipsets: Vec<_> = tasks
989 .into_iter()
990 .filter_map(|task| {
991 if let SyncTask::ValidateTipset {
992 tipset,
993 is_proposed_head,
994 } = task
995 {
996 Some((tipset, is_proposed_head))
997 } else {
998 None
999 }
1000 })
1001 .collect();
1002
1003 if validation_tipsets.is_empty() {
1004 break;
1005 }
1006
1007 for (ts, is_proposed_head) in validation_tipsets {
1009 validation_tasks.push(ts.epoch());
1010 db.put_cbor_default(&ts.epoch()).unwrap();
1011 state_machine.mark_validated_tipset(ts, is_proposed_head);
1012 }
1013 }
1014
1015 assert_eq!(validation_tasks, vec![1, 2, 3, 4, 5]);
1017 }
1018
1019 #[test]
1020 fn test_sync_state_machine_chain_fragments() {
1021 let (cs, c4u) = setup();
1022 let db = cs.blockstore().clone();
1023
1024 chain4u! {
1028 in c4u;
1029 [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)]
1030 };
1031 chain4u! {
1032 from [a] in c4u;
1033 [c = dummy_node(&db, 3)]
1034 };
1035
1036 let mut state_machine = SyncStateMachine::new(cs, Default::default(), false);
1038
1039 for block in [a, b, c] {
1041 let full_tipset = FullTipset::new(vec![Block {
1042 header: block.clone().into(),
1043 bls_messages: vec![],
1044 secp_messages: vec![],
1045 }])
1046 .unwrap();
1047 state_machine.update(SyncEvent::NewFullTipsets(vec![Arc::new(full_tipset)]));
1048 }
1049
1050 let chains = state_machine
1051 .chains()
1052 .into_iter()
1053 .map(|v| {
1054 v.into_iter()
1055 .map(|ts| ts.weight().to_i64().unwrap_or(0))
1056 .collect()
1057 })
1058 .collect::<Vec<Vec<_>>>();
1059
1060 assert_eq!(chains, vec![vec![1, 3], vec![1, 2]]);
1062 }
1063}