1use super::network_context::SyncNetworkContext;
20use crate::{
21 blocks::{Block, FullTipset, Tipset, TipsetKey},
22 chain::{ChainStore, index::ResolveNullTipset},
23 chain_sync::{
24 ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
25 bad_block_cache::{BadBlockCache, SeenBlockCache},
26 metrics,
27 tipset_syncer::{TipsetSyncerError, validate_tipset},
28 validation::GossipBlockValidator,
29 },
30 db::EthMappingsStore,
31 libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
32 message_pool::MessagePool,
33 networks::calculate_expected_epoch,
34 shim::clock::ChainEpoch,
35 state_manager::StateManager,
36 utils::ShallowClone as _,
37};
38use ahash::{HashMap, HashSet};
39use chrono::Utc;
40use cid::Cid;
41use fvm_ipld_blockstore::Blockstore;
42use itertools::Itertools;
43use libp2p::PeerId;
44use parking_lot::{Mutex, RwLock};
45use std::{sync::Arc, time::Instant};
46use tokio::{sync::Notify, task::JoinSet};
47use tracing::{debug, error, info, trace, warn};
48
49pub struct ChainFollower<DB> {
50 tasks: Arc<Mutex<HashSet<SyncTask>>>,
52
53 state_machine: Arc<Mutex<SyncStateMachine<DB>>>,
55
56 pub sync_status: SyncStatus,
58
59 pub state_manager: Arc<StateManager<DB>>,
61
62 pub network: SyncNetworkContext<DB>,
64
65 genesis: Tipset,
67
68 pub bad_blocks: Option<Arc<BadBlockCache>>,
72
73 net_handler: flume::Receiver<NetworkEvent>,
75
76 pub tipset_sender: flume::Sender<FullTipset>,
78
79 tipset_receiver: flume::Receiver<FullTipset>,
81
82 stateless_mode: bool,
87
88 mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
90}
91
92impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
93 pub fn new(
94 state_manager: Arc<StateManager<DB>>,
95 network: SyncNetworkContext<DB>,
96 genesis: Tipset,
97 net_handler: flume::Receiver<NetworkEvent>,
98 stateless_mode: bool,
99 mem_pool: Arc<MessagePool<Arc<ChainStore<DB>>>>,
100 ) -> Self {
101 crate::def_is_env_truthy!(cache_disabled, "FOREST_DISABLE_BAD_BLOCK_CACHE");
102 let (tipset_sender, tipset_receiver) = flume::bounded(20);
103 let tasks: Arc<Mutex<HashSet<SyncTask>>> = Arc::new(Mutex::new(HashSet::default()));
104 let bad_blocks = if cache_disabled() {
105 tracing::warn!("bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`");
106 None
107 } else {
108 Some(Default::default())
109 };
110 let state_machine = Arc::new(Mutex::new(SyncStateMachine::new(
111 state_manager.chain_store().clone(),
112 bad_blocks.clone(),
113 stateless_mode,
114 )));
115 Self {
116 tasks,
117 state_machine,
118 sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
119 state_manager,
120 network,
121 genesis,
122 bad_blocks,
123 net_handler,
124 tipset_sender,
125 tipset_receiver,
126 stateless_mode,
127 mem_pool,
128 }
129 }
130
131 pub fn reset(&self) {
133 let start = Instant::now();
134 self.tasks.lock().clear();
135 self.state_manager
136 .chain_store()
137 .validated_blocks
138 .lock()
139 .clear();
140 self.state_machine.lock().tipsets.clear();
141 if let Some(bad_blocks) = &self.bad_blocks {
142 bad_blocks.clear();
143 }
144 tracing::info!(
145 "chain follower reset, took {}",
146 humantime::format_duration(start.elapsed())
147 );
148 }
149
150 pub async fn run(&self) -> anyhow::Result<()>
151 where
152 DB: EthMappingsStore,
153 {
154 chain_follower(
155 &self.tasks,
156 &self.state_machine,
157 &self.state_manager,
158 self.bad_blocks.clone(),
159 self.net_handler.clone(),
160 self.tipset_receiver.clone(),
161 &self.network,
162 &self.mem_pool,
163 &self.sync_status,
164 &self.genesis,
165 self.stateless_mode,
166 )
167 .await
168 }
169}
170
171#[allow(clippy::too_many_arguments)]
172async fn chain_follower<DB: Blockstore + EthMappingsStore + Sync + Send + 'static>(
174 tasks: &Arc<Mutex<HashSet<SyncTask>>>,
175 state_machine: &Arc<Mutex<SyncStateMachine<DB>>>,
176 state_manager: &Arc<StateManager<DB>>,
177 bad_block_cache: Option<Arc<BadBlockCache>>,
178 network_rx: flume::Receiver<NetworkEvent>,
179 tipset_receiver: flume::Receiver<FullTipset>,
180 network: &SyncNetworkContext<DB>,
181 mem_pool: &Arc<MessagePool<Arc<ChainStore<DB>>>>,
182 sync_status: &SyncStatus,
183 genesis: &Tipset,
184 stateless_mode: bool,
185) -> anyhow::Result<()> {
186 let state_changed = Arc::new(Notify::new());
187
188 let seen_block_cache = SeenBlockCache::default();
189
190 let mut set = JoinSet::new();
191
192 set.spawn({
194 let state_manager = state_manager.shallow_clone();
195 let state_changed = state_changed.shallow_clone();
196 let state_machine = state_machine.shallow_clone();
197 let network = network.shallow_clone();
198 let mem_pool = mem_pool.shallow_clone();
199 let genesis = genesis.shallow_clone();
200 let bad_block_cache = bad_block_cache.shallow_clone();
201 let seen_block_cache = seen_block_cache.shallow_clone();
202 async move {
203 while let Ok(event) = network_rx.recv_async().await {
204 inc_gossipsub_event_metrics(&event);
205
206 update_peer_info(
207 &event,
208 &network,
209 state_manager.chain_store().clone(),
210 &genesis,
211 );
212
213 let Ok(tipset) = (match event {
214 NetworkEvent::HelloResponseOutbound { request, source } => {
215 let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
216 get_full_tipset(
217 &network,
218 state_manager.chain_store(),
219 Some(source),
220 &tipset_keys,
221 )
222 .await
223 .inspect_err(|e| debug!("Querying full tipset failed: {}", e))
224 }
225 NetworkEvent::PubsubMessage { message } => match message {
226 PubsubMessage::Block(b) => {
227 let cs = state_manager.chain_store();
228 let cfg = cs.chain_config();
229 if let Err(reason) = GossipBlockValidator::new(&b).validate_pre_fetch(
230 &genesis,
231 cfg.block_delay_secs,
232 cfg.policy.chain_finality,
233 cs.heaviest_tipset().epoch(),
234 bad_block_cache.as_deref(),
235 &seen_block_cache,
236 ) {
237 metrics::GOSSIP_BLOCK_REJECTED_TOTAL
238 .get_or_create(&metrics::GossipRejectReasonLabel {
239 reason: reason.label(),
240 })
241 .inc();
242 debug!("Rejected gossip block {}: {reason}", b.header.cid());
243 continue;
244 }
245 let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
246 get_full_tipset(&network, cs, None, &key).await
247 }
248 PubsubMessage::Message(m) => {
249 if let Err(why) = mem_pool.add(m) {
250 debug!("Received invalid GossipSub message: {}", why);
251 }
252 continue;
253 }
254 },
255 _ => continue,
256 }) else {
257 continue;
258 };
259 {
260 state_machine
261 .lock()
262 .update(SyncEvent::NewFullTipsets(vec![tipset]));
263 state_changed.notify_one();
264 }
265 }
266 }
267 });
268
269 set.spawn({
271 let state_changed = state_changed.clone();
272 let state_machine = state_machine.clone();
273
274 async move {
275 while let Ok(tipset) = tipset_receiver.recv_async().await {
276 state_machine
277 .lock()
278 .update(SyncEvent::NewFullTipsets(vec![tipset]));
279 state_changed.notify_one();
280 }
281 }
282 });
283
284 set.spawn({
286 let state_manager = state_manager.shallow_clone();
287 let state_machine = state_machine.shallow_clone();
288 let network = network.shallow_clone();
289 let sync_status = sync_status.shallow_clone();
290 let state_changed = state_changed.shallow_clone();
291 let tasks = tasks.shallow_clone();
292 let bad_block_cache = bad_block_cache.shallow_clone();
293 async move {
294 loop {
295 state_changed.notified().await;
296
297 let mut tasks_set = tasks.lock();
298 let (task_vec, current_active_forks) = state_machine.lock().tasks();
299
300 {
302 let old_status_report = sync_status.read().clone();
303 let new_status_report = old_status_report.update(
304 &state_manager,
305 current_active_forks,
306 stateless_mode,
307 );
308
309 sync_status.write().clone_from(&new_status_report);
310 }
311
312 for task in task_vec {
313 let new = tasks_set.insert(task.clone());
315 if new {
316 let action = task.clone().execute(
317 network.shallow_clone(),
318 state_manager.shallow_clone(),
319 stateless_mode,
320 bad_block_cache.shallow_clone(),
321 );
322 tokio::spawn({
323 let tasks = tasks.clone();
324 let state_machine = state_machine.clone();
325 let state_changed = state_changed.clone();
326 async move {
327 if let Some(event) = action.await {
328 state_machine.lock().update(event);
329 state_changed.notify_one();
330 }
331 tasks.lock().remove(&task);
332 }
333 });
334 }
335 }
336 }
337 }
338 });
339
340 set.spawn({
344 let state_manager = state_manager.clone();
345 let state_machine = state_machine.clone();
346 async move {
347 loop {
348 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
349 let (tasks_set, _) = state_machine.lock().tasks();
350 let heaviest_tipset = state_manager.chain_store().heaviest_tipset();
351 let heaviest_epoch = heaviest_tipset.epoch();
352
353 let to_download = tasks_set
354 .iter()
355 .filter_map(|task| match task {
356 SyncTask::FetchTipset(_, epoch) => Some(epoch - heaviest_epoch),
357 _ => None,
358 })
359 .max()
360 .unwrap_or(0);
361
362 let expected_head = calculate_expected_epoch(
363 Utc::now().timestamp() as u64,
364 state_manager.chain_store().genesis_block_header().timestamp,
365 state_manager.chain_config().block_delay_secs,
366 );
367
368 match (expected_head - heaviest_epoch > 10, to_download > 0) {
371 (true, true) => info!(
372 "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}, downloading {to_download} tipsets"
373 , heaviest_tipset.key()
374 ),
375 (true, false) => info!(
376 "Catching up to HEAD: {heaviest_epoch}{} -> {expected_head}"
377 , heaviest_tipset.key()
378 ),
379 (false, true) => {
380 info!("Downloading {to_download} tipsets")
381 }
382 (false, false) => {}
383 }
384 }
385 }
386 });
387
388 set.join_all().await;
389 Ok(())
390}
391
392fn inc_gossipsub_event_metrics(event: &NetworkEvent) {
394 let label = match event {
395 NetworkEvent::HelloRequestInbound => metrics::values::HELLO_REQUEST_INBOUND,
396 NetworkEvent::HelloResponseOutbound { .. } => metrics::values::HELLO_RESPONSE_OUTBOUND,
397 NetworkEvent::HelloRequestOutbound => metrics::values::HELLO_REQUEST_OUTBOUND,
398 NetworkEvent::HelloResponseInbound => metrics::values::HELLO_RESPONSE_INBOUND,
399 NetworkEvent::PeerConnected(_) => metrics::values::PEER_CONNECTED,
400 NetworkEvent::PeerDisconnected(_) => metrics::values::PEER_DISCONNECTED,
401 NetworkEvent::PubsubMessage { message } => match message {
402 PubsubMessage::Block(_) => metrics::values::PUBSUB_BLOCK,
403 PubsubMessage::Message(_) => metrics::values::PUBSUB_MESSAGE,
404 },
405 NetworkEvent::ChainExchangeRequestOutbound => {
406 metrics::values::CHAIN_EXCHANGE_REQUEST_OUTBOUND
407 }
408 NetworkEvent::ChainExchangeResponseInbound => {
409 metrics::values::CHAIN_EXCHANGE_RESPONSE_INBOUND
410 }
411 NetworkEvent::ChainExchangeRequestInbound => {
412 metrics::values::CHAIN_EXCHANGE_REQUEST_INBOUND
413 }
414 NetworkEvent::ChainExchangeResponseOutbound => {
415 metrics::values::CHAIN_EXCHANGE_RESPONSE_OUTBOUND
416 }
417 };
418
419 metrics::LIBP2P_MESSAGE_TOTAL.get_or_create(&label).inc();
420}
421
422fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
424 event: &NetworkEvent,
425 network: &SyncNetworkContext<DB>,
426 chain_store: Arc<ChainStore<DB>>,
427 genesis: &Tipset,
428) {
429 match event {
430 NetworkEvent::PeerConnected(peer_id) => {
431 let genesis_cid = *genesis.block_headers().first().cid();
432 tokio::task::spawn(handle_peer_connected_event(
434 network.shallow_clone(),
435 chain_store,
436 *peer_id,
437 genesis_cid,
438 ));
439 }
440 NetworkEvent::PeerDisconnected(peer_id) => {
441 handle_peer_disconnected_event(network, *peer_id);
442 }
443 _ => {}
444 }
445}
446
447async fn handle_peer_connected_event<DB: Blockstore + Sync + Send + 'static>(
448 network: SyncNetworkContext<DB>,
449 chain_store: Arc<ChainStore<DB>>,
450 peer_id: PeerId,
451 genesis_block_cid: Cid,
452) {
453 if network.peer_manager().is_peer_new(&peer_id) {
455 let heaviest = chain_store.heaviest_tipset();
458 let request = HelloRequest {
459 heaviest_tip_set: heaviest.cids(),
460 heaviest_tipset_height: heaviest.epoch(),
461 heaviest_tipset_weight: heaviest.weight().clone().into(),
462 genesis_cid: genesis_block_cid,
463 };
464 let (peer_id, moment_sent, response) = match network.hello_request(peer_id, request).await {
465 Ok(response) => response,
466 Err(e) => {
467 debug!("Hello request failed: {}", e);
468 return;
469 }
470 };
471 let dur = Instant::now().duration_since(moment_sent);
472
473 match response {
475 Some(_) => {
476 network.peer_manager().log_success(&peer_id, dur);
477 }
478 None => {
479 network.peer_manager().log_failure(&peer_id, dur);
480 }
481 }
482 }
483}
484
485fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
486 network: &SyncNetworkContext<DB>,
487 peer_id: PeerId,
488) {
489 network.peer_manager().remove_peer(&peer_id);
490 network.peer_manager().unmark_peer_bad(&peer_id);
491}
492
493pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
494 network: &SyncNetworkContext<DB>,
495 chain_store: &ChainStore<DB>,
496 peer_id: Option<PeerId>,
497 tipset_keys: &TipsetKey,
498) -> anyhow::Result<FullTipset> {
499 if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
501 return Ok(full_tipset);
502 }
503 let tipset = network
505 .chain_exchange_full_tipset(peer_id, tipset_keys)
506 .await
507 .map_err(|e| anyhow::anyhow!(e))?;
508 tipset.persist(chain_store.blockstore())?;
509
510 Ok(tipset)
511}
512
513async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
514 network: &SyncNetworkContext<DB>,
515 chain_store: &ChainStore<DB>,
516 peer_id: Option<PeerId>,
517 tipset_keys: &TipsetKey,
518) -> anyhow::Result<Vec<FullTipset>> {
519 if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
521 return Ok(vec![full_tipset]);
522 }
523 let tipsets = network
525 .chain_exchange_full_tipsets(peer_id, tipset_keys)
526 .await
527 .map_err(|e| anyhow::anyhow!(e))?;
528
529 for tipset in tipsets.iter() {
530 tipset.persist(chain_store.blockstore())?;
531 }
532
533 Ok(tipsets)
534}
535
536pub fn load_full_tipset<DB: Blockstore>(
537 chain_store: &ChainStore<DB>,
538 tipset_keys: &TipsetKey,
539) -> anyhow::Result<FullTipset> {
540 let ts = chain_store
542 .chain_index()
543 .load_required_tipset(tipset_keys)?;
544 let blocks: Vec<_> = ts
545 .block_headers()
546 .iter()
547 .map(|header| -> anyhow::Result<Block> {
548 let (bls_msgs, secp_msgs) =
549 crate::chain::block_messages(chain_store.blockstore(), header)?;
550 Ok(Block {
551 header: header.clone(),
552 bls_messages: bls_msgs,
553 secp_messages: secp_msgs,
554 })
555 })
556 .try_collect()?;
557 let fts = FullTipset::new(blocks)?;
559 Ok(fts)
560}
561
562enum SyncEvent {
563 NewFullTipsets(Vec<FullTipset>),
564 BadTipset(FullTipset),
565 ValidatedTipset {
566 tipset: FullTipset,
567 is_proposed_head: bool,
568 },
569}
570
571impl std::fmt::Display for SyncEvent {
572 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
573 fn tss_to_string(tss: &[FullTipset]) -> String {
574 format!(
575 "epoch: {}-{}",
576 tss.first().map(|ts| ts.epoch()).unwrap_or_default(),
577 tss.last().map(|ts| ts.epoch()).unwrap_or_default()
578 )
579 }
580
581 match self {
582 Self::NewFullTipsets(tss) => write!(f, "NewFullTipsets({})", tss_to_string(tss)),
583 Self::BadTipset(ts) => {
584 write!(f, "BadTipset(epoch: {}, key: {})", ts.epoch(), ts.key())
585 }
586 Self::ValidatedTipset {
587 tipset,
588 is_proposed_head,
589 } => write!(
590 f,
591 "ValidatedTipset(epoch: {}, key: {}, is_proposed_head: {is_proposed_head})",
592 tipset.epoch(),
593 tipset.key()
594 ),
595 }
596 }
597}
598
599struct SyncStateMachine<DB> {
600 cs: Arc<ChainStore<DB>>,
601 bad_block_cache: Option<Arc<BadBlockCache>>,
602 tipsets: HashMap<TipsetKey, FullTipset>,
604 stateless_mode: bool,
605}
606
607impl<DB: Blockstore> SyncStateMachine<DB> {
608 pub fn new(
609 cs: Arc<ChainStore<DB>>,
610 bad_block_cache: Option<Arc<BadBlockCache>>,
611 stateless_mode: bool,
612 ) -> Self {
613 Self {
614 cs,
615 bad_block_cache,
616 tipsets: HashMap::default(),
617 stateless_mode,
618 }
619 }
620
621 fn chains(&self) -> Vec<Vec<FullTipset>> {
623 let mut chains = Vec::new();
624 let mut remaining_tipsets = self.tipsets.clone();
625
626 while let Some(heaviest) = remaining_tipsets
627 .values()
628 .max_by_key(|ts| ts.weight())
629 .cloned()
630 {
631 let mut chain = Vec::new();
633 let mut current = Some(heaviest);
634
635 while let Some(tipset) = current.take() {
636 remaining_tipsets.remove(tipset.key());
637
638 current = self.tipsets.get(tipset.parents()).cloned();
640
641 chain.push(tipset);
642 }
643 chain.reverse();
644 chains.push(chain);
645 }
646
647 chains
648 }
649
650 fn is_parent_validated(&self, tipset: &FullTipset) -> bool {
651 let db = self.cs.blockstore();
652 self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
653 }
654
655 fn is_ready_for_validation(&self, tipset: &FullTipset) -> bool {
656 if self.stateless_mode || tipset.key() == self.cs.genesis_tipset().key() {
657 true
659 } else if let Ok(parent_ts) = load_full_tipset(&self.cs, tipset.parents()) {
660 let head_ts = self.cs.heaviest_tipset();
661 if parent_ts.key() == head_ts.key() {
667 true
668 } else if parent_ts.epoch() >= head_ts.epoch() {
669 false
670 } else {
671 self.is_parent_validated(tipset)
672 }
673 } else {
674 false
675 }
676 }
677
678 fn add_full_tipset(&mut self, tipset: FullTipset)
679 where
680 DB: EthMappingsStore,
681 {
682 if let Err(why) = TipsetValidator(&tipset).validate(
683 &self.cs,
684 self.bad_block_cache.as_ref().map(AsRef::as_ref),
685 &self.cs.genesis_tipset(),
686 self.cs.chain_config().block_delay_secs,
687 ) {
688 metrics::INVALID_TIPSET_TOTAL.inc();
689 trace!("Skipping invalid tipset: {}", why);
690 self.mark_bad_tipset(tipset);
691 return;
692 }
693
694 let heaviest = self.cs.heaviest_tipset();
696 let epoch_diff = heaviest.epoch() - tipset.epoch();
697
698 if epoch_diff > self.cs.chain_config().policy.chain_finality {
699 self.mark_bad_tipset(tipset);
700 return;
701 }
702
703 if self.tipsets.contains_key(tipset.key()) {
705 return;
706 }
707
708 if let Ok(Some(ts)) = self.cs.chain_index().tipset_by_height(
710 tipset.epoch(),
711 self.cs.heaviest_tipset(),
712 ResolveNullTipset::TakeOlder,
713 ) && ts.key() == tipset.key()
714 {
715 return;
716 }
717
718 let mut to_remove = Vec::new();
720 #[allow(clippy::mutable_key_type)]
721 let mut merged_blocks: HashSet<_> = tipset.blocks().iter().cloned().collect();
722
723 let parent_refs: HashSet<_> = self
725 .tipsets
726 .values()
727 .map(|ts| ts.parents().clone())
728 .collect();
729
730 for (key, existing_ts) in self.tipsets.iter() {
731 if existing_ts.epoch() == tipset.epoch() && existing_ts.parents() == tipset.parents() {
732 if !parent_refs.contains(key) {
734 to_remove.push(key.clone());
735 }
736 merged_blocks.extend(existing_ts.blocks().iter().cloned());
738 }
739 }
740
741 for key in to_remove {
743 self.tipsets.remove(&key);
744 }
745
746 if let Ok(merged_tipset) = FullTipset::new(merged_blocks) {
748 self.tipsets
749 .insert(merged_tipset.key().clone(), merged_tipset);
750 }
751 }
752
753 fn mark_bad_tipset(&mut self, tipset: FullTipset) {
757 let mut stack = vec![tipset];
758 while let Some(tipset) = stack.pop() {
759 self.tipsets.remove(tipset.key());
760 let mut to_remove = Vec::new();
762 let mut descendants = Vec::new();
763
764 for (key, ts) in self.tipsets.iter() {
765 if ts.parents() == tipset.key() {
766 to_remove.push(key.clone());
767 descendants.push(ts.clone());
768 }
769 }
770
771 for key in to_remove {
773 self.tipsets.remove(&key);
774 }
775
776 stack.extend(descendants);
778 }
779 }
780
781 fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) {
782 if !self.is_parent_validated(&tipset) {
783 tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), parent_state = %tipset.parent_state(), "Parent tipset must be validated");
784 return;
785 }
786
787 self.tipsets.remove(tipset.key());
788 let tipset = tipset.into_tipset();
789 if self.stateless_mode {
791 let epoch = tipset.epoch();
792 let terse_key = tipset.key().terse();
793 if self.cs.heaviest_tipset().weight() < tipset.weight() {
794 if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
795 error!("Error setting heaviest tipset: {}", e);
796 } else {
797 info!("Heaviest tipset: {} ({})", epoch, terse_key);
798 }
799 }
800 } else if is_proposed_head {
801 if let Err(e) = self.cs.put_tipset(&tipset) {
802 error!("Error putting tipset: {e}");
803 }
804 } else if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
805 error!("Error setting heaviest tipset: {e}");
806 }
807 }
808
809 pub fn update(&mut self, event: SyncEvent)
810 where
811 DB: EthMappingsStore,
812 {
813 tracing::trace!("update: {event}");
814 match event {
815 SyncEvent::NewFullTipsets(tipsets) => {
816 for tipset in tipsets {
817 self.add_full_tipset(tipset);
818 }
819 }
820 SyncEvent::BadTipset(tipset) => self.mark_bad_tipset(tipset),
821 SyncEvent::ValidatedTipset {
822 tipset,
823 is_proposed_head,
824 } => self.mark_validated_tipset(tipset, is_proposed_head),
825 }
826 }
827
828 pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
829 let current_validated_epoch = self.cs.heaviest_tipset().epoch();
831 let now = Utc::now();
832
833 let mut active_sync_info = Vec::new();
834 let mut tasks = Vec::new();
835 for chain in self.chains() {
836 if let Some(first_ts) = chain.first() {
837 let last_ts = chain.last().expect("Infallible");
838 let stage: ForkSyncStage;
839 let start_time = Some(now);
840
841 if !self.is_ready_for_validation(first_ts) {
842 stage = ForkSyncStage::FetchingHeaders;
843 tasks.push(SyncTask::FetchTipset(
844 first_ts.parents().clone(),
845 first_ts.epoch(),
846 ));
847 } else {
848 stage = ForkSyncStage::ValidatingTipsets;
849 tasks.push(SyncTask::ValidateTipset {
850 tipset: first_ts.clone(),
851 is_proposed_head: chain.len() == 1,
852 });
853 }
854
855 let fork_info = ForkSyncInfo {
856 target_tipset_key: last_ts.key().clone(),
857 target_epoch: last_ts.epoch(),
858 target_sync_epoch_start: first_ts.epoch(),
859 stage,
860 validated_chain_head_epoch: current_validated_epoch,
861 start_time,
862 last_updated: Some(now),
863 };
864
865 active_sync_info.push(fork_info);
866 }
867 }
868 (tasks, active_sync_info)
869 }
870}
871
872#[derive(PartialEq, Eq, Hash, Clone, Debug)]
873enum SyncTask {
874 ValidateTipset {
875 tipset: FullTipset,
876 is_proposed_head: bool,
877 },
878 FetchTipset(TipsetKey, ChainEpoch),
879}
880
881impl std::fmt::Display for SyncTask {
882 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
883 match self {
884 SyncTask::ValidateTipset {
885 tipset,
886 is_proposed_head,
887 } => write!(
888 f,
889 "ValidateTipset(epoch: {}, is_proposed_head: {is_proposed_head})",
890 tipset.epoch()
891 ),
892 SyncTask::FetchTipset(key, epoch) => {
893 let s = key.to_string();
894 write!(
895 f,
896 "FetchTipset({}, epoch: {})",
897 &s[s.len().saturating_sub(8)..],
898 epoch
899 )
900 }
901 }
902 }
903}
904
905impl SyncTask {
906 async fn execute<DB: Blockstore + EthMappingsStore + Sync + Send + 'static>(
907 self,
908 network: SyncNetworkContext<DB>,
909 state_manager: Arc<StateManager<DB>>,
910 stateless_mode: bool,
911 bad_block_cache: Option<Arc<BadBlockCache>>,
912 ) -> Option<SyncEvent> {
913 tracing::trace!("SyncTask::execute {self}");
914 match self {
915 SyncTask::ValidateTipset {
916 tipset,
917 is_proposed_head,
918 } if stateless_mode => Some(SyncEvent::ValidatedTipset {
919 tipset,
920 is_proposed_head,
921 }),
922 SyncTask::ValidateTipset {
923 tipset,
924 is_proposed_head,
925 } => match validate_tipset(&state_manager, tipset.clone(), bad_block_cache).await {
926 Ok(()) => Some(SyncEvent::ValidatedTipset {
927 tipset,
928 is_proposed_head,
929 }),
930 Err(e) if matches!(e, TipsetSyncerError::TimeTravellingBlock { .. }) => {
935 warn!("Time travelling block detected, skipping tipset for now: {e}");
936 None
937 }
938 Err(e) => {
939 warn!("Error validating tipset: {e}");
940 Some(SyncEvent::BadTipset(tipset))
941 }
942 },
943 SyncTask::FetchTipset(key, epoch) => {
944 match get_full_tipset_batch(&network, state_manager.chain_store(), None, &key).await
945 {
946 Ok(parents) => Some(SyncEvent::NewFullTipsets(parents)),
947 Err(e) => {
948 tracing::warn!(%key, %epoch, "failed to fetch tipset: {e:#}");
949 None
950 }
951 }
952 }
953 }
954 }
955}
956
957#[cfg(test)]
958mod tests {
959 use super::*;
960 use crate::blocks::{Chain4U, HeaderBuilder, chain4u};
961 use crate::db::MemoryDB;
962 use crate::utils::db::CborStoreExt as _;
963 use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
964 use num_bigint::BigInt;
965 use num_traits::ToPrimitive;
966 use std::sync::Arc;
967 use tracing::level_filters::LevelFilter;
968 use tracing_subscriber::EnvFilter;
969
970 fn setup() -> (Arc<ChainStore<MemoryDB>>, Chain4U<Arc<MemoryDB>>) {
971 let _ = tracing_subscriber::fmt()
973 .without_time()
974 .with_env_filter(
975 EnvFilter::builder()
976 .with_default_directive(LevelFilter::DEBUG.into())
977 .from_env()
978 .unwrap(),
979 )
980 .try_init();
981
982 let db = Arc::new(MemoryDB::default());
983
984 {
986 let empty_amt = Amt::<Cid, _>::new(&db).flush().unwrap();
987 db.put_cbor_default(&crate::blocks::TxMeta {
988 bls_message_root: empty_amt,
989 secp_message_root: empty_amt,
990 })
991 .unwrap();
992 }
993
994 let c4u = Chain4U::with_blockstore(db.clone());
996 chain4u! {
997 in c4u;
998 [genesis_header = dummy_node(&db, 0)]
999 };
1000
1001 let cs = Arc::new(
1002 ChainStore::new(
1003 db.clone(),
1004 db.clone(),
1005 db.clone(),
1006 Default::default(),
1007 genesis_header.clone().into(),
1008 )
1009 .unwrap(),
1010 );
1011
1012 cs.set_heaviest_tipset(cs.genesis_tipset()).unwrap();
1013
1014 (cs, c4u)
1015 }
1016
1017 fn dummy_state(db: impl Blockstore, i: ChainEpoch) -> Cid {
1018 db.put_cbor_default(&i).unwrap()
1019 }
1020
1021 fn dummy_node(db: impl Blockstore, i: ChainEpoch) -> HeaderBuilder {
1022 HeaderBuilder {
1023 state_root: dummy_state(db, i).into(),
1024 weight: BigInt::from(i).into(),
1025 epoch: i.into(),
1026 ..Default::default()
1027 }
1028 }
1029
1030 #[test]
1031 fn test_state_machine_validation_order() {
1032 let (cs, c4u) = setup();
1033 let db = cs.blockstore().clone();
1034
1035 chain4u! {
1036 from [genesis_header] in c4u;
1037 [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)] -> [c = dummy_node(&db, 3)] -> [d = dummy_node(&db, 4)] -> [e = dummy_node(&db, 5)]
1038 };
1039
1040 let mut state_machine = SyncStateMachine::new(cs, Default::default(), true);
1042
1043 let tipsets = vec![e, b, d, c, a];
1045
1046 for block in tipsets {
1048 let full_tipset = FullTipset::new(vec![Block {
1049 header: block.clone().into(),
1050 bls_messages: vec![],
1051 secp_messages: vec![],
1052 }])
1053 .unwrap();
1054 state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1055 }
1056
1057 let mut validation_tasks = Vec::new();
1059 loop {
1060 let (tasks, _) = state_machine.tasks();
1061
1062 let validation_tipsets: Vec<_> = tasks
1064 .into_iter()
1065 .filter_map(|task| {
1066 if let SyncTask::ValidateTipset {
1067 tipset,
1068 is_proposed_head,
1069 } = task
1070 {
1071 Some((tipset, is_proposed_head))
1072 } else {
1073 None
1074 }
1075 })
1076 .collect();
1077
1078 if validation_tipsets.is_empty() {
1079 break;
1080 }
1081
1082 for (ts, is_proposed_head) in validation_tipsets {
1084 validation_tasks.push(ts.epoch());
1085 db.put_cbor_default(&ts.epoch()).unwrap();
1086 state_machine.mark_validated_tipset(ts, is_proposed_head);
1087 }
1088 }
1089
1090 assert_eq!(validation_tasks, vec![1, 2, 3, 4, 5]);
1092 }
1093
1094 #[test]
1095 fn test_sync_state_machine_chain_fragments() {
1096 let (cs, c4u) = setup();
1097 let db = cs.blockstore().clone();
1098
1099 chain4u! {
1103 in c4u;
1104 [a = dummy_node(&db, 1)] -> [b = dummy_node(&db, 2)]
1105 };
1106 chain4u! {
1107 from [a] in c4u;
1108 [c = dummy_node(&db, 3)]
1109 };
1110
1111 let mut state_machine = SyncStateMachine::new(cs, Default::default(), false);
1113
1114 for block in [a, b, c] {
1116 let full_tipset = FullTipset::new(vec![Block {
1117 header: block.clone().into(),
1118 bls_messages: vec![],
1119 secp_messages: vec![],
1120 }])
1121 .unwrap();
1122 state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset]));
1123 }
1124
1125 let chains = state_machine
1126 .chains()
1127 .into_iter()
1128 .map(|v| {
1129 v.into_iter()
1130 .map(|ts| ts.weight().to_i64().unwrap_or(0))
1131 .collect_vec()
1132 })
1133 .collect_vec();
1134
1135 assert_eq!(chains, vec![vec![1, 3], vec![1, 2]]);
1137 }
1138}