1use crate::{
20 sync::{
21 block_relay_protocol::{BlockDownloader, BlockResponseError},
22 blocks::BlockCollection,
23 justification_requests::ExtraRequests,
24 schema::v1::{StateRequest, StateResponse},
25 service::network::NetworkServiceHandle,
26 strategy::{
27 disconnected_peers::DisconnectedPeers,
28 state_sync::{ImportResult, StateSync, StateSyncProvider},
29 warp::{WarpSyncPhase, WarpSyncProgress},
30 StrategyKey, SyncingAction, SyncingStrategy,
31 },
32 types::{BadPeer, SyncState, SyncStatus},
33 },
34 LOG_TARGET,
35};
36
37use codec::Encode;
38use futures::{channel::oneshot, FutureExt};
39use log::{debug, error, info, trace, warn};
40use soil_prometheus::{register, Gauge, PrometheusError, Registry, U64};
41use prost::Message;
42use soil_client::blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
43use soil_client::client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
44use soil_client::consensus::{BlockOrigin, BlockStatus};
45use soil_client::import::{BlockImportError, BlockImportStatus, IncomingBlock};
46use soil_network::common::sync::message::{
47 BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
48};
49use soil_network::types::PeerId;
50use soil_network::{IfDisconnected, ProtocolName};
51use subsoil::arithmetic::traits::Saturating;
52use subsoil::runtime::{
53 traits::{
54 Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One, SaturatedConversion, Zero,
55 },
56 EncodedJustification, Justifications,
57};
58
59use std::{
60 any::Any,
61 collections::{HashMap, HashSet},
62 fmt,
63 ops::{AddAssign, Range},
64 sync::Arc,
65};
66
67#[cfg(test)]
68mod test;
69
70const MAX_IMPORTING_BLOCKS: usize = 2048;
72
73const MAX_DOWNLOAD_AHEAD: u32 = 2048;
75
76const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
79
80const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
82
83const MAJOR_SYNC_BLOCKS: u8 = 5;
89
90mod rep {
91 use soil_network::ReputationChange as Rep;
92 pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
95
96 pub const GENESIS_MISMATCH: Rep = Rep::new(i32::MIN, "Genesis mismatch");
99
100 pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
102
103 pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
105
106 pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
108
109 pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
111
112 pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
114
115 pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
117
118 pub const UNKNOWN_ANCESTOR: Rep = Rep::new(-(1 << 16), "DB Error");
120
121 pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
123
124 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
126}
127
128struct Metrics {
129 queued_blocks: Gauge<U64>,
130 fork_targets: Gauge<U64>,
131}
132
133impl Metrics {
134 fn register(r: &Registry) -> Result<Self, PrometheusError> {
135 Ok(Self {
136 queued_blocks: {
137 let g =
138 Gauge::new("substrate_sync_queued_blocks", "Number of blocks in import queue")?;
139 register(g, r)?
140 },
141 fork_targets: {
142 let g = Gauge::new("substrate_sync_fork_targets", "Number of fork sync targets")?;
143 register(g, r)?
144 },
145 })
146 }
147}
148
149#[derive(Debug, Clone)]
150enum AllowedRequests {
151 Some(HashSet<PeerId>),
152 All,
153}
154
155impl AllowedRequests {
156 fn add(&mut self, id: &PeerId) {
157 if let Self::Some(ref mut set) = self {
158 set.insert(*id);
159 }
160 }
161
162 fn take(&mut self) -> Self {
163 std::mem::take(self)
164 }
165
166 fn set_all(&mut self) {
167 *self = Self::All;
168 }
169
170 fn contains(&self, id: &PeerId) -> bool {
171 match self {
172 Self::Some(set) => set.contains(id),
173 Self::All => true,
174 }
175 }
176
177 fn is_empty(&self) -> bool {
178 match self {
179 Self::Some(set) => set.is_empty(),
180 Self::All => false,
181 }
182 }
183
184 fn clear(&mut self) {
185 std::mem::take(self);
186 }
187}
188
189impl Default for AllowedRequests {
190 fn default() -> Self {
191 Self::Some(HashSet::default())
192 }
193}
194
195#[derive(Debug, Default, Clone)]
197struct GapSyncStats {
198 header_bytes: usize,
200 body_bytes: usize,
202 justification_bytes: usize,
204}
205
206impl GapSyncStats {
207 fn new() -> Self {
208 Self::default()
209 }
210
211 fn total_bytes(&self) -> usize {
212 self.header_bytes + self.body_bytes + self.justification_bytes
213 }
214
215 fn bytes_to_mib(bytes: usize) -> f64 {
216 bytes as f64 / (1024.0 * 1024.0)
217 }
218}
219
220impl fmt::Display for GapSyncStats {
221 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222 let total = self.total_bytes();
223 write!(
224 f,
225 "hdr: {} B ({:.2} MiB), body: {} B ({:.2} MiB), just: {} B ({:.2} MiB) | total: {} B ({:.2} MiB)",
226 self.header_bytes,
227 Self::bytes_to_mib(self.header_bytes),
228 self.body_bytes,
229 Self::bytes_to_mib(self.body_bytes),
230 self.justification_bytes,
231 Self::bytes_to_mib(self.justification_bytes),
232 total,
233 Self::bytes_to_mib(total),
234 )
235 }
236}
237
238impl AddAssign for GapSyncStats {
239 fn add_assign(&mut self, other: Self) {
240 self.header_bytes += other.header_bytes;
241 self.body_bytes += other.body_bytes;
242 self.justification_bytes += other.justification_bytes;
243 }
244}
245
246struct GapSync<B: BlockT> {
247 blocks: BlockCollection<B>,
248 best_queued_number: NumberFor<B>,
249 target: NumberFor<B>,
250 stats: GapSyncStats,
251}
252
253#[derive(Copy, Clone, Debug, Eq, PartialEq)]
255pub enum ChainSyncMode {
256 Full,
258 LightState {
260 skip_proofs: bool,
262 storage_chain_mode: bool,
264 },
265}
266
267impl ChainSyncMode {
268 pub fn required_block_attributes(&self, is_gap: bool, is_archive: bool) -> BlockAttributes {
270 let attrs = match self {
271 ChainSyncMode::Full => {
272 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
273 },
274 ChainSyncMode::LightState { storage_chain_mode: false, .. } => {
275 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
276 },
277 ChainSyncMode::LightState { storage_chain_mode: true, .. } => {
278 BlockAttributes::HEADER
279 | BlockAttributes::JUSTIFICATION
280 | BlockAttributes::INDEXED_BODY
281 },
282 };
283 if is_gap && !is_archive {
286 attrs & !BlockAttributes::BODY
287 } else {
288 attrs
289 }
290 }
291}
292
293#[derive(Debug, Clone)]
295pub(crate) struct PeerSync<B: BlockT> {
296 pub peer_id: PeerId,
298 pub common_number: NumberFor<B>,
301 pub best_hash: B::Hash,
303 pub best_number: NumberFor<B>,
305 pub state: PeerSyncState<B>,
308}
309
310impl<B: BlockT> PeerSync<B> {
311 fn update_common_number(&mut self, new_common: NumberFor<B>) {
313 if self.common_number < new_common {
314 trace!(
315 target: LOG_TARGET,
316 "Updating peer {} common number from={} => to={}.",
317 self.peer_id,
318 self.common_number,
319 new_common,
320 );
321 self.common_number = new_common;
322 }
323 }
324}
325
326struct ForkTarget<B: BlockT> {
327 number: NumberFor<B>,
328 parent_hash: Option<B::Hash>,
329 peers: HashSet<PeerId>,
330}
331
332#[derive(Copy, Clone, Eq, PartialEq, Debug)]
337pub(crate) enum PeerSyncState<B: BlockT> {
338 Available,
340 AncestorSearch {
342 start: NumberFor<B>,
344 current: NumberFor<B>,
346 state: AncestorSearchState<B>,
348 },
349 DownloadingNew(NumberFor<B>),
351 DownloadingStale(B::Hash),
355 DownloadingJustification(B::Hash),
357 DownloadingState,
359 DownloadingGap(NumberFor<B>),
361}
362
363impl<B: BlockT> PeerSyncState<B> {
364 pub fn is_available(&self) -> bool {
365 matches!(self, Self::Available)
366 }
367}
368
369pub struct ChainSync<B: BlockT, Client> {
372 client: Arc<Client>,
374 peers: HashMap<PeerId, PeerSync<B>>,
376 disconnected_peers: DisconnectedPeers,
377 blocks: BlockCollection<B>,
379 best_queued_number: NumberFor<B>,
381 best_queued_hash: B::Hash,
383 mode: ChainSyncMode,
385 extra_justifications: ExtraRequests<B>,
387 queue_blocks: HashSet<B::Hash>,
390 pending_state_sync_attempt: Option<(B::Hash, NumberFor<B>, bool)>,
398 fork_targets: HashMap<B::Hash, ForkTarget<B>>,
400 allowed_requests: AllowedRequests,
402 max_parallel_downloads: u32,
404 max_blocks_per_request: u32,
406 state_request_protocol_name: ProtocolName,
408 downloaded_blocks: usize,
410 state_sync: Option<StateSync<B, Client>>,
412 import_existing: bool,
415 block_downloader: Arc<dyn BlockDownloader<B>>,
417 archive_blocks: bool,
420 gap_sync: Option<GapSync<B>>,
422 actions: Vec<SyncingAction<B>>,
424 metrics: Option<Metrics>,
426}
427
428impl<B, Client> SyncingStrategy<B> for ChainSync<B, Client>
429where
430 B: BlockT,
431 Client: HeaderBackend<B>
432 + BlockBackend<B>
433 + HeaderMetadata<B, Error = soil_client::blockchain::Error>
434 + ProofProvider<B>
435 + Send
436 + Sync
437 + 'static,
438{
439 fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
440 match self.add_peer_inner(peer_id, best_hash, best_number) {
441 Ok(Some(request)) => {
442 let action = self.create_block_request_action(peer_id, request);
443 self.actions.push(action);
444 },
445 Ok(None) => {},
446 Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)),
447 }
448 }
449
450 fn remove_peer(&mut self, peer_id: &PeerId) {
451 self.blocks.clear_peer_download(peer_id);
452 if let Some(gap_sync) = &mut self.gap_sync {
453 gap_sync.blocks.clear_peer_download(peer_id)
454 }
455
456 if let Some(state) = self.peers.remove(peer_id) {
457 if !state.state.is_available() {
458 if let Some(bad_peer) =
459 self.disconnected_peers.on_disconnect_during_request(*peer_id)
460 {
461 self.actions.push(SyncingAction::DropPeer(bad_peer));
462 }
463 }
464 }
465
466 self.extra_justifications.peer_disconnected(peer_id);
467 self.allowed_requests.set_all();
468 self.fork_targets.retain(|_, target| {
469 target.peers.remove(peer_id);
470 !target.peers.is_empty()
471 });
472 if let Some(metrics) = &self.metrics {
473 metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
474 }
475
476 let blocks = self.ready_blocks();
477
478 if !blocks.is_empty() {
479 self.validate_and_queue_blocks(blocks, false);
480 }
481 }
482
483 fn on_validated_block_announce(
484 &mut self,
485 is_best: bool,
486 peer_id: PeerId,
487 announce: &BlockAnnounce<B::Header>,
488 ) -> Option<(B::Hash, NumberFor<B>)> {
489 let number = *announce.header.number();
490 let hash = announce.header.hash();
491 let parent_status =
492 self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
493 let known_parent = parent_status != BlockStatus::Unknown;
494 let ancient_parent = parent_status == BlockStatus::InChainPruned;
495
496 let known = self.is_known(&hash);
497 let is_major_syncing = self.is_major_syncing();
498 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
499 peer
500 } else {
501 error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}");
502 return Some((hash, number));
503 };
504
505 if let PeerSyncState::AncestorSearch { .. } = peer.state {
506 trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
507 return None;
508 }
509
510 let continues_known_fork =
513 known || known_parent || announce.header.parent_hash() == &peer.best_hash;
514
515 let peer_info = is_best.then(|| {
516 peer.best_number = number;
518 peer.best_hash = hash;
519
520 (hash, number)
521 });
522
523 if is_best {
526 let best_queued_number = self.best_queued_number;
527
528 if known && best_queued_number >= number {
529 peer.update_common_number(number);
530 } else if announce.header.parent_hash() == &self.best_queued_hash
531 || known_parent && best_queued_number >= number
532 {
533 peer.update_common_number(number.saturating_sub(One::one()));
534 }
535
536 if !continues_known_fork && !is_major_syncing {
540 let current = number.min(best_queued_number);
541 peer.common_number = peer.common_number.min(self.client.info().finalized_number);
542 peer.state = PeerSyncState::AncestorSearch {
543 current,
544 start: best_queued_number,
545 state: AncestorSearchState::ExponentialBackoff(One::one()),
546 };
547
548 let request = ancestry_request::<B>(current);
549 let action = self.create_block_request_action(peer_id, request);
550 self.actions.push(action);
551
552 return peer_info;
553 }
554 }
555 self.allowed_requests.add(&peer_id);
556
557 if known || self.is_already_downloading(&hash) {
559 trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash);
560 if let Some(target) = self.fork_targets.get_mut(&hash) {
561 target.peers.insert(peer_id);
562 }
563 return peer_info;
564 }
565
566 if ancient_parent {
567 trace!(
568 target: LOG_TARGET,
569 "Ignored ancient block announced from {}: {} {:?}",
570 peer_id,
571 hash,
572 announce.header,
573 );
574 return peer_info;
575 }
576
577 if self.status().state == SyncState::Idle {
578 trace!(
579 target: LOG_TARGET,
580 "Added sync target for block announced from {}: {} {:?}",
581 peer_id,
582 hash,
583 announce.summary(),
584 );
585 self.fork_targets
586 .entry(hash)
587 .or_insert_with(|| {
588 if let Some(metrics) = &self.metrics {
589 metrics.fork_targets.inc();
590 }
591
592 ForkTarget {
593 number,
594 parent_hash: Some(*announce.header.parent_hash()),
595 peers: Default::default(),
596 }
597 })
598 .peers
599 .insert(peer_id);
600 }
601
602 peer_info
603 }
604
605 fn set_sync_fork_request(
607 &mut self,
608 mut peers: Vec<PeerId>,
609 hash: &B::Hash,
610 number: NumberFor<B>,
611 ) {
612 if peers.is_empty() {
613 peers = self
614 .peers
615 .iter()
616 .filter(|(_, peer)| peer.best_number >= number)
618 .map(|(id, _)| *id)
619 .collect();
620
621 debug!(
622 target: LOG_TARGET,
623 "Explicit sync request for block {hash:?} with no peers specified. \
624 Syncing from these peers {peers:?} instead.",
625 );
626 } else {
627 debug!(
628 target: LOG_TARGET,
629 "Explicit sync request for block {hash:?} with {peers:?}",
630 );
631 }
632
633 if self.is_known(hash) {
634 debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
635 return;
636 }
637
638 trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
639 for peer_id in &peers {
640 if let Some(peer) = self.peers.get_mut(peer_id) {
641 if let PeerSyncState::AncestorSearch { .. } = peer.state {
642 continue;
643 }
644
645 if number > peer.best_number {
646 peer.best_number = number;
647 peer.best_hash = *hash;
648 }
649 self.allowed_requests.add(peer_id);
650 }
651 }
652
653 self.fork_targets
654 .entry(*hash)
655 .or_insert_with(|| {
656 if let Some(metrics) = &self.metrics {
657 metrics.fork_targets.inc();
658 }
659
660 ForkTarget { number, peers: Default::default(), parent_hash: None }
661 })
662 .peers
663 .extend(peers);
664 }
665
666 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
667 let client = &self.client;
668 self.extra_justifications
669 .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block))
670 }
671
672 fn clear_justification_requests(&mut self) {
673 self.extra_justifications.reset();
674 }
675
676 fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
677 let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
678 self.extra_justifications
679 .try_finalize_root((hash, number), finalization_result, true);
680 self.allowed_requests.set_all();
681 }
682
683 fn on_generic_response(
684 &mut self,
685 peer_id: &PeerId,
686 key: StrategyKey,
687 protocol_name: ProtocolName,
688 response: Box<dyn Any + Send>,
689 ) {
690 if Self::STRATEGY_KEY != key {
691 warn!(
692 target: LOG_TARGET,
693 "Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
694 );
695 debug_assert!(false);
696 return;
697 }
698
699 if protocol_name == self.state_request_protocol_name {
700 let Ok(response) = response.downcast::<Vec<u8>>() else {
701 warn!(target: LOG_TARGET, "Failed to downcast state response");
702 debug_assert!(false);
703 return;
704 };
705
706 if let Err(bad_peer) = self.on_state_data(&peer_id, &response) {
707 self.actions.push(SyncingAction::DropPeer(bad_peer));
708 }
709 } else if &protocol_name == self.block_downloader.protocol_name() {
710 let Ok(response) = response
711 .downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
712 else {
713 warn!(target: LOG_TARGET, "Failed to downcast block response");
714 debug_assert!(false);
715 return;
716 };
717
718 let (request, response) = *response;
719 let blocks = match response {
720 Ok(blocks) => blocks,
721 Err(BlockResponseError::DecodeFailed(e)) => {
722 debug!(
723 target: LOG_TARGET,
724 "Failed to decode block response from peer {:?}: {:?}.",
725 peer_id,
726 e
727 );
728 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
729 return;
730 },
731 Err(BlockResponseError::ExtractionFailed(e)) => {
732 debug!(
733 target: LOG_TARGET,
734 "Failed to extract blocks from peer response {:?}: {:?}.",
735 peer_id,
736 e
737 );
738 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
739 return;
740 },
741 };
742
743 if let Err(bad_peer) = self.on_block_response(peer_id, key, request, blocks) {
744 self.actions.push(SyncingAction::DropPeer(bad_peer));
745 }
746 } else {
747 warn!(
748 target: LOG_TARGET,
749 "Unexpected generic response protocol {protocol_name}, strategy key \
750 {key:?}",
751 );
752 debug_assert!(false);
753 }
754 }
755
756 fn on_blocks_processed(
757 &mut self,
758 imported: usize,
759 count: usize,
760 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
761 ) {
762 trace!(target: LOG_TARGET, "Imported {imported} of {count}");
763
764 let mut has_error = false;
765 for (_, hash) in &results {
766 if self.queue_blocks.remove(hash) {
767 if let Some(metrics) = &self.metrics {
768 metrics.queued_blocks.dec();
769 }
770 }
771 self.blocks.clear_queued(hash);
772 if let Some(gap_sync) = &mut self.gap_sync {
773 gap_sync.blocks.clear_queued(hash);
774 }
775 }
776 for (result, hash) in results {
777 if has_error {
778 break;
779 }
780
781 has_error |= result.is_err();
782
783 match result {
784 Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => {
785 if let Some(peer) =
786 peer_id.as_ref().and_then(|peer| PeerId::try_from(peer).ok())
787 {
788 self.update_peer_common_number(&peer, number);
789 }
790 self.complete_gap_if_target(number);
791 },
792 Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
793 if aux.clear_justification_requests {
794 trace!(
795 target: LOG_TARGET,
796 "Block imported clears all pending justification requests {number}: {hash:?}",
797 );
798 self.clear_justification_requests();
799 }
800
801 if aux.needs_justification {
802 trace!(
803 target: LOG_TARGET,
804 "Block imported but requires justification {number}: {hash:?}",
805 );
806 self.request_justification(&hash, number);
807 }
808
809 if aux.bad_justification {
810 if let Some(peer) =
811 peer_id.as_ref().and_then(|peer| PeerId::try_from(peer).ok())
812 {
813 warn!("💔 Sent block with bad justification to import");
814 self.actions.push(SyncingAction::DropPeer(BadPeer(
815 peer,
816 rep::BAD_JUSTIFICATION,
817 )));
818 }
819 }
820
821 if let Some(peer) =
822 peer_id.as_ref().and_then(|peer| PeerId::try_from(peer).ok())
823 {
824 self.update_peer_common_number(&peer, number);
825 }
826 let state_sync_complete =
827 self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash);
828 if state_sync_complete {
829 info!(
830 target: LOG_TARGET,
831 "State sync is complete ({} MiB), restarting block sync.",
832 self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
833 );
834 self.state_sync = None;
835 self.mode = ChainSyncMode::Full;
836 self.restart();
837 }
838
839 self.complete_gap_if_target(number);
840 },
841 Err(BlockImportError::IncompleteHeader(peer_id)) => {
842 if let Some(peer) = peer_id.and_then(|peer| PeerId::try_from(peer).ok()) {
843 warn!(
844 target: LOG_TARGET,
845 "💔 Peer sent block with incomplete header to import",
846 );
847 self.actions
848 .push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
849 self.restart();
850 }
851 },
852 Err(BlockImportError::VerificationFailed(peer_id, e)) => {
853 let extra_message = peer_id
854 .as_ref()
855 .map_or_else(|| "".into(), |peer| format!(" received from ({peer:?})"));
856
857 warn!(
858 target: LOG_TARGET,
859 "💔 Verification failed for block {hash:?}{extra_message}: {e:?}",
860 );
861
862 if let Some(peer) = peer_id.and_then(|peer| PeerId::try_from(peer).ok()) {
863 self.actions
864 .push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
865 }
866
867 self.restart();
868 },
869 Err(BlockImportError::BadBlock(peer_id)) => {
870 if let Some(peer) = peer_id.and_then(|peer| PeerId::try_from(peer).ok()) {
871 warn!(
872 target: LOG_TARGET,
873 "💔 Block {hash:?} received from peer {peer:?} has been blacklisted",
874 );
875 self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
876 }
877 },
878 Err(BlockImportError::MissingState) => {
879 trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
883 },
884 e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
885 warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err());
886 self.state_sync = None;
887 self.restart();
888 },
889 Err(BlockImportError::Cancelled) => {},
890 };
891 }
892
893 self.allowed_requests.set_all();
894 }
895
896 fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
897 let client = &self.client;
898 let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
899 is_descendent_of(&**client, base, block)
900 });
901
902 if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode {
903 if self.state_sync.is_none() {
904 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
905 self.attempt_state_sync(*hash, number, *skip_proofs);
906 } else {
907 self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs));
908 }
909 }
910 }
911
912 if let Err(err) = r {
913 warn!(
914 target: LOG_TARGET,
915 "💔 Error cleaning up pending extra justification data requests: {err}",
916 );
917 }
918 }
919
920 fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
921 self.on_block_queued(best_hash, best_number);
922 }
923
924 fn is_major_syncing(&self) -> bool {
925 self.status().state.is_major_syncing()
926 }
927
928 fn num_peers(&self) -> usize {
929 self.peers.len()
930 }
931
932 fn status(&self) -> SyncStatus<B> {
933 let median_seen = self.median_seen();
934 let best_seen_block =
935 median_seen.and_then(|median| (median > self.best_queued_number).then_some(median));
936 let sync_state = if let Some(target) = median_seen {
937 let best_block = self.client.info().best_number;
941 if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() {
942 if target > self.best_queued_number {
944 SyncState::Downloading { target }
945 } else {
946 SyncState::Importing { target }
947 }
948 } else {
949 SyncState::Idle
950 }
951 } else {
952 SyncState::Idle
953 };
954
955 let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress {
956 phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
957 total_bytes: 0,
958 status: None,
959 });
960
961 SyncStatus {
962 state: sync_state,
963 best_seen_block,
964 num_peers: self.peers.len() as u32,
965 queued_blocks: self.queue_blocks.len() as u32,
966 state_sync: self.state_sync.as_ref().map(|s| s.progress()),
967 warp_sync: warp_sync_progress,
968 }
969 }
970
971 fn num_downloaded_blocks(&self) -> usize {
972 self.downloaded_blocks
973 }
974
975 fn num_sync_requests(&self) -> usize {
976 self.fork_targets
977 .values()
978 .filter(|f| f.number <= self.best_queued_number)
979 .count()
980 }
981
982 fn actions(
983 &mut self,
984 network_service: &NetworkServiceHandle,
985 ) -> Result<Vec<SyncingAction<B>>, ClientError> {
986 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
987 if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() {
988 self.attempt_state_sync(hash, number, skip_proofs);
989 }
990 }
991
992 let block_requests = self
993 .block_requests()
994 .into_iter()
995 .map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
996 .collect::<Vec<_>>();
997 self.actions.extend(block_requests);
998
999 let justification_requests = self
1000 .justification_requests()
1001 .into_iter()
1002 .map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
1003 .collect::<Vec<_>>();
1004 self.actions.extend(justification_requests);
1005
1006 let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
1007 trace!(
1008 target: LOG_TARGET,
1009 "Created `StateRequest` to {peer_id}.",
1010 );
1011
1012 let (tx, rx) = oneshot::channel();
1013
1014 network_service.start_request(
1015 peer_id,
1016 self.state_request_protocol_name.clone(),
1017 request.encode_to_vec(),
1018 tx,
1019 IfDisconnected::ImmediateError,
1020 );
1021
1022 SyncingAction::StartRequest {
1023 peer_id,
1024 key: Self::STRATEGY_KEY,
1025 request: async move {
1026 Ok(rx.await?.and_then(|(response, protocol_name)| {
1027 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
1028 }))
1029 }
1030 .boxed(),
1031 remove_obsolete: false,
1032 }
1033 });
1034 self.actions.extend(state_request);
1035
1036 Ok(std::mem::take(&mut self.actions))
1037 }
1038}
1039
1040impl<B, Client> ChainSync<B, Client>
1041where
1042 B: BlockT,
1043 Client: HeaderBackend<B>
1044 + BlockBackend<B>
1045 + HeaderMetadata<B, Error = soil_client::blockchain::Error>
1046 + ProofProvider<B>
1047 + Send
1048 + Sync
1049 + 'static,
1050{
1051 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("ChainSync");
1053
1054 pub fn new(
1056 mode: ChainSyncMode,
1057 client: Arc<Client>,
1058 max_parallel_downloads: u32,
1059 max_blocks_per_request: u32,
1060 state_request_protocol_name: ProtocolName,
1061 block_downloader: Arc<dyn BlockDownloader<B>>,
1062 archive_blocks: bool,
1063 metrics_registry: Option<&Registry>,
1064 initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
1065 ) -> Result<Self, ClientError> {
1066 let mut sync = Self {
1067 client,
1068 peers: HashMap::new(),
1069 disconnected_peers: DisconnectedPeers::new(),
1070 blocks: BlockCollection::new(),
1071 best_queued_hash: Default::default(),
1072 best_queued_number: Zero::zero(),
1073 extra_justifications: ExtraRequests::new("justification", metrics_registry),
1074 mode,
1075 queue_blocks: Default::default(),
1076 pending_state_sync_attempt: None,
1077 fork_targets: Default::default(),
1078 allowed_requests: Default::default(),
1079 max_parallel_downloads,
1080 max_blocks_per_request,
1081 state_request_protocol_name,
1082 downloaded_blocks: 0,
1083 state_sync: None,
1084 import_existing: false,
1085 block_downloader,
1086 archive_blocks,
1087 gap_sync: None,
1088 actions: Vec::new(),
1089 metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
1090 Ok(metrics) => Some(metrics),
1091 Err(err) => {
1092 log::error!(
1093 target: LOG_TARGET,
1094 "Failed to register `ChainSync` metrics {err:?}",
1095 );
1096 None
1097 },
1098 }),
1099 };
1100
1101 sync.reset_sync_start_point()?;
1102 initial_peers.for_each(|(peer_id, best_hash, best_number)| {
1103 sync.add_peer(peer_id, best_hash, best_number);
1104 });
1105
1106 Ok(sync)
1107 }
1108
1109 fn complete_gap_if_target(&mut self, number: NumberFor<B>) {
1111 let Some(gap_sync) = &self.gap_sync else { return };
1112
1113 if gap_sync.target != number {
1114 return;
1115 }
1116
1117 info!(
1118 target: LOG_TARGET,
1119 "Block history download is complete.",
1120 );
1121 self.gap_sync = None;
1122 }
1123
1124 #[must_use]
1125 fn add_peer_inner(
1126 &mut self,
1127 peer_id: PeerId,
1128 best_hash: B::Hash,
1129 best_number: NumberFor<B>,
1130 ) -> Result<Option<BlockRequest<B>>, BadPeer> {
1131 match self.block_status(&best_hash) {
1133 Err(e) => {
1134 debug!(target: LOG_TARGET, "Error reading blockchain: {e}");
1135 Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
1136 },
1137 Ok(BlockStatus::KnownBad) => {
1138 info!(
1139 "💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})."
1140 );
1141 Err(BadPeer(peer_id, rep::BAD_BLOCK))
1142 },
1143 Ok(BlockStatus::Unknown) => {
1144 if best_number.is_zero() {
1145 info!(
1146 "💔 New peer {} with unknown genesis hash {} ({}).",
1147 peer_id, best_hash, best_number,
1148 );
1149 return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH));
1150 }
1151
1152 if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS as usize {
1156 debug!(
1157 target: LOG_TARGET,
1158 "New peer {} with unknown best hash {} ({}), assuming common block.",
1159 peer_id,
1160 self.best_queued_hash,
1161 self.best_queued_number
1162 );
1163 self.peers.insert(
1164 peer_id,
1165 PeerSync {
1166 peer_id,
1167 common_number: self.best_queued_number,
1168 best_hash,
1169 best_number,
1170 state: PeerSyncState::Available,
1171 },
1172 );
1173 return Ok(None);
1174 }
1175
1176 let (state, req) = if self.best_queued_number.is_zero() {
1178 debug!(
1179 target: LOG_TARGET,
1180 "New peer {peer_id} with best hash {best_hash} ({best_number}).",
1181 );
1182
1183 (PeerSyncState::Available, None)
1184 } else {
1185 let common_best = std::cmp::min(self.best_queued_number, best_number);
1186
1187 debug!(
1188 target: LOG_TARGET,
1189 "New peer {} with unknown best hash {} ({}), searching for common ancestor.",
1190 peer_id,
1191 best_hash,
1192 best_number
1193 );
1194
1195 (
1196 PeerSyncState::AncestorSearch {
1197 current: common_best,
1198 start: self.best_queued_number,
1199 state: AncestorSearchState::ExponentialBackoff(One::one()),
1200 },
1201 Some(ancestry_request::<B>(common_best)),
1202 )
1203 };
1204
1205 self.allowed_requests.add(&peer_id);
1206 self.peers.insert(
1207 peer_id,
1208 PeerSync {
1209 peer_id,
1210 common_number: Zero::zero(),
1211 best_hash,
1212 best_number,
1213 state,
1214 },
1215 );
1216
1217 Ok(req)
1218 },
1219 Ok(BlockStatus::Queued)
1220 | Ok(BlockStatus::InChainWithState)
1221 | Ok(BlockStatus::InChainPruned) => {
1222 debug!(
1223 target: LOG_TARGET,
1224 "New peer {peer_id} with known best hash {best_hash} ({best_number}).",
1225 );
1226 self.peers.insert(
1227 peer_id,
1228 PeerSync {
1229 peer_id,
1230 common_number: std::cmp::min(self.best_queued_number, best_number),
1231 best_hash,
1232 best_number,
1233 state: PeerSyncState::Available,
1234 },
1235 );
1236 self.allowed_requests.add(&peer_id);
1237 Ok(None)
1238 },
1239 }
1240 }
1241
1242 fn create_block_request_action(
1243 &mut self,
1244 peer_id: PeerId,
1245 request: BlockRequest<B>,
1246 ) -> SyncingAction<B> {
1247 let downloader = self.block_downloader.clone();
1248
1249 SyncingAction::StartRequest {
1250 peer_id,
1251 key: Self::STRATEGY_KEY,
1252 request: async move {
1253 Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
1254 |(response, protocol_name)| {
1255 let decoded_response =
1256 downloader.block_response_into_blocks(&request, response);
1257 let result = Box::new((request, decoded_response)) as Box<dyn Any + Send>;
1258 Ok((result, protocol_name))
1259 },
1260 ))
1261 }
1262 .boxed(),
1263 remove_obsolete: true,
1266 }
1267 }
1268
1269 #[must_use]
1271 fn on_block_data(
1272 &mut self,
1273 peer_id: &PeerId,
1274 request: Option<BlockRequest<B>>,
1275 response: BlockResponse<B>,
1276 ) -> Result<(), BadPeer> {
1277 self.downloaded_blocks += response.blocks.len();
1278 let mut gap = false;
1279 let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
1280 let mut blocks = response.blocks;
1281 if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
1282 trace!(target: LOG_TARGET, "Reversing incoming block list");
1283 blocks.reverse()
1284 }
1285 self.allowed_requests.add(peer_id);
1286 if let Some(request) = request {
1287 match &mut peer.state {
1288 PeerSyncState::DownloadingNew(_) => {
1289 self.blocks.clear_peer_download(peer_id);
1290 peer.state = PeerSyncState::Available;
1291 if let Some(start_block) =
1292 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1293 {
1294 self.blocks.insert(start_block, blocks, *peer_id);
1295 }
1296 self.ready_blocks()
1297 },
1298 PeerSyncState::DownloadingGap(_) => {
1299 peer.state = PeerSyncState::Available;
1300 if let Some(gap_sync) = &mut self.gap_sync {
1301 gap_sync.blocks.clear_peer_download(peer_id);
1302 if let Some(start_block) =
1303 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1304 {
1305 gap_sync.blocks.insert(start_block, blocks, *peer_id);
1306 }
1307 gap = true;
1308 let mut batch_gap_sync_stats = GapSyncStats::new();
1309 let blocks: Vec<_> = gap_sync
1310 .blocks
1311 .ready_blocks(gap_sync.best_queued_number + One::one())
1312 .into_iter()
1313 .map(|block_data| {
1314 let justifications =
1315 block_data.block.justifications.or_else(|| {
1316 legacy_justification_mapping(
1317 block_data.block.justification,
1318 )
1319 });
1320 let gap_sync_stats = GapSyncStats {
1321 header_bytes: block_data
1322 .block
1323 .header
1324 .as_ref()
1325 .map(|h| h.encoded_size())
1326 .unwrap_or(0),
1327 body_bytes: block_data
1328 .block
1329 .body
1330 .as_ref()
1331 .map(|b| b.encoded_size())
1332 .unwrap_or(0),
1333 justification_bytes: justifications
1334 .as_ref()
1335 .map(|j| j.encoded_size())
1336 .unwrap_or(0),
1337 };
1338 batch_gap_sync_stats += gap_sync_stats;
1339
1340 IncomingBlock {
1341 hash: block_data.block.hash,
1342 header: block_data.block.header,
1343 body: block_data.block.body,
1344 indexed_body: block_data.block.indexed_body,
1345 justifications,
1346 origin: block_data.origin.map(Into::into),
1347 allow_missing_state: true,
1348 import_existing: true,
1351 skip_execution: true,
1352 state: None,
1353 }
1354 })
1355 .collect();
1356
1357 debug!(
1358 target: LOG_TARGET,
1359 "Drained {} gap blocks from {}",
1360 blocks.len(),
1361 gap_sync.best_queued_number,
1362 );
1363
1364 gap_sync.stats += batch_gap_sync_stats;
1365
1366 if blocks.len() > 0 {
1367 trace!(
1368 target: LOG_TARGET,
1369 "Gap sync cumulative stats: {}",
1370 gap_sync.stats
1371 );
1372 }
1373 blocks
1374 } else {
1375 debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
1376 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1377 }
1378 },
1379 PeerSyncState::DownloadingStale(_) => {
1380 peer.state = PeerSyncState::Available;
1381 if blocks.is_empty() {
1382 debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
1383 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1384 }
1385 validate_blocks::<B>(&blocks, peer_id, Some(request))?;
1386 blocks
1387 .into_iter()
1388 .map(|b| {
1389 let justifications = b
1390 .justifications
1391 .or_else(|| legacy_justification_mapping(b.justification));
1392 IncomingBlock {
1393 hash: b.hash,
1394 header: b.header,
1395 body: b.body,
1396 indexed_body: None,
1397 justifications,
1398 origin: Some((*peer_id).into()),
1399 allow_missing_state: true,
1400 import_existing: self.import_existing,
1401 skip_execution: self.skip_execution(),
1402 state: None,
1403 }
1404 })
1405 .collect()
1406 },
1407 PeerSyncState::AncestorSearch { current, start, state } => {
1408 let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
1409 (Some(block), Ok(maybe_our_block_hash)) => {
1410 trace!(
1411 target: LOG_TARGET,
1412 "Got ancestry block #{} ({}) from peer {}",
1413 current,
1414 block.hash,
1415 peer_id,
1416 );
1417 maybe_our_block_hash.filter(|x| x == &block.hash)
1418 },
1419 (None, _) => {
1420 debug!(
1421 target: LOG_TARGET,
1422 "Invalid response when searching for ancestor from {peer_id}",
1423 );
1424 return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR));
1425 },
1426 (_, Err(e)) => {
1427 info!(
1428 target: LOG_TARGET,
1429 "❌ Error answering legitimate blockchain query: {e}",
1430 );
1431 return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR));
1432 },
1433 };
1434 if matching_hash.is_some() {
1435 if *start < self.best_queued_number
1436 && self.best_queued_number <= peer.best_number
1437 {
1438 trace!(
1442 target: LOG_TARGET,
1443 "Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
1444 *peer_id,
1445 peer.common_number,
1446 self.best_queued_number,
1447 );
1448 peer.common_number = self.best_queued_number;
1449 } else if peer.common_number < *current {
1450 trace!(
1451 target: LOG_TARGET,
1452 "Ancestry search: updating peer {} common number from={} => to={}.",
1453 *peer_id,
1454 peer.common_number,
1455 *current,
1456 );
1457 peer.common_number = *current;
1458 }
1459 }
1460 if matching_hash.is_none() && current.is_zero() {
1461 trace!(
1462 target: LOG_TARGET,
1463 "Ancestry search: genesis mismatch for peer {peer_id}",
1464 );
1465 return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH));
1466 }
1467 if let Some((next_state, next_num)) =
1468 handle_ancestor_search_state(state, *current, matching_hash.is_some())
1469 {
1470 peer.state = PeerSyncState::AncestorSearch {
1471 current: next_num,
1472 start: *start,
1473 state: next_state,
1474 };
1475 let request = ancestry_request::<B>(next_num);
1476 let action = self.create_block_request_action(*peer_id, request);
1477 self.actions.push(action);
1478 return Ok(());
1479 } else {
1480 trace!(
1483 target: LOG_TARGET,
1484 "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
1485 self.best_queued_hash,
1486 self.best_queued_number,
1487 peer.best_hash,
1488 peer.best_number,
1489 matching_hash,
1490 peer.common_number,
1491 );
1492 if peer.common_number < peer.best_number
1493 && peer.best_number < self.best_queued_number
1494 {
1495 trace!(
1496 target: LOG_TARGET,
1497 "Added fork target {} for {}",
1498 peer.best_hash,
1499 peer_id,
1500 );
1501 self.fork_targets
1502 .entry(peer.best_hash)
1503 .or_insert_with(|| {
1504 if let Some(metrics) = &self.metrics {
1505 metrics.fork_targets.inc();
1506 }
1507
1508 ForkTarget {
1509 number: peer.best_number,
1510 parent_hash: None,
1511 peers: Default::default(),
1512 }
1513 })
1514 .peers
1515 .insert(*peer_id);
1516 }
1517 peer.state = PeerSyncState::Available;
1518 return Ok(());
1519 }
1520 },
1521 PeerSyncState::Available
1522 | PeerSyncState::DownloadingJustification(..)
1523 | PeerSyncState::DownloadingState => Vec::new(),
1524 }
1525 } else {
1526 validate_blocks::<B>(&blocks, peer_id, None)?;
1528 blocks
1529 .into_iter()
1530 .map(|b| {
1531 let justifications = b
1532 .justifications
1533 .or_else(|| legacy_justification_mapping(b.justification));
1534 IncomingBlock {
1535 hash: b.hash,
1536 header: b.header,
1537 body: b.body,
1538 indexed_body: None,
1539 justifications,
1540 origin: Some((*peer_id).into()),
1541 allow_missing_state: true,
1542 import_existing: false,
1543 skip_execution: true,
1544 state: None,
1545 }
1546 })
1547 .collect()
1548 }
1549 } else {
1550 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1552 };
1553
1554 self.validate_and_queue_blocks(new_blocks, gap);
1555
1556 Ok(())
1557 }
1558
1559 fn on_block_response(
1560 &mut self,
1561 peer_id: &PeerId,
1562 key: StrategyKey,
1563 request: BlockRequest<B>,
1564 blocks: Vec<BlockData<B>>,
1565 ) -> Result<(), BadPeer> {
1566 if key != Self::STRATEGY_KEY {
1567 error!(
1568 target: LOG_TARGET,
1569 "`on_block_response()` called with unexpected key {key:?} for chain sync",
1570 );
1571 debug_assert!(false);
1572 }
1573 let block_response = BlockResponse::<B> { id: request.id, blocks };
1574
1575 let blocks_range = || match (
1576 block_response
1577 .blocks
1578 .first()
1579 .and_then(|b| b.header.as_ref().map(|h| h.number())),
1580 block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
1581 ) {
1582 (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
1583 (Some(first), Some(_)) => format!(" ({})", first),
1584 _ => Default::default(),
1585 };
1586
1587 trace!(
1588 target: LOG_TARGET,
1589 "BlockResponse {} from {} with {} blocks {}",
1590 block_response.id,
1591 peer_id,
1592 block_response.blocks.len(),
1593 blocks_range(),
1594 );
1595
1596 if request.fields == BlockAttributes::JUSTIFICATION {
1597 self.on_block_justification(*peer_id, block_response)
1598 } else {
1599 self.on_block_data(peer_id, Some(request), block_response)
1600 }
1601 }
1602
1603 #[must_use]
1605 fn on_block_justification(
1606 &mut self,
1607 peer_id: PeerId,
1608 response: BlockResponse<B>,
1609 ) -> Result<(), BadPeer> {
1610 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
1611 peer
1612 } else {
1613 error!(
1614 target: LOG_TARGET,
1615 "💔 Called on_block_justification with a peer ID of an unknown peer",
1616 );
1617 return Ok(());
1618 };
1619
1620 self.allowed_requests.add(&peer_id);
1621 if let PeerSyncState::DownloadingJustification(hash) = peer.state {
1622 peer.state = PeerSyncState::Available;
1623
1624 let justification = if let Some(block) = response.blocks.into_iter().next() {
1626 if hash != block.hash {
1627 warn!(
1628 target: LOG_TARGET,
1629 "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}",
1630 peer_id,
1631 hash,
1632 block.hash,
1633 );
1634 return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION));
1635 }
1636
1637 block
1638 .justifications
1639 .or_else(|| legacy_justification_mapping(block.justification))
1640 } else {
1641 trace!(
1644 target: LOG_TARGET,
1645 "Peer {peer_id:?} provided empty response for justification request {hash:?}",
1646 );
1647
1648 None
1649 };
1650
1651 if let Some((peer_id, hash, number, justifications)) =
1652 self.extra_justifications.on_response(peer_id, justification)
1653 {
1654 self.actions.push(SyncingAction::ImportJustifications {
1655 peer_id,
1656 hash,
1657 number,
1658 justifications,
1659 });
1660 return Ok(());
1661 }
1662 }
1663
1664 Ok(())
1665 }
1666
1667 fn median_seen(&self) -> Option<NumberFor<B>> {
1669 let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
1670
1671 if best_seens.is_empty() {
1672 None
1673 } else {
1674 let middle = best_seens.len() / 2;
1675
1676 Some(*best_seens.select_nth_unstable(middle).1)
1678 }
1679 }
1680
1681 fn skip_execution(&self) -> bool {
1682 match self.mode {
1683 ChainSyncMode::Full => false,
1684 ChainSyncMode::LightState { .. } => true,
1685 }
1686 }
1687
1688 fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec<IncomingBlock<B>>, gap: bool) {
1689 let orig_len = new_blocks.len();
1690 new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
1691 if new_blocks.len() != orig_len {
1692 debug!(
1693 target: LOG_TARGET,
1694 "Ignoring {} blocks that are already queued",
1695 orig_len - new_blocks.len(),
1696 );
1697 }
1698
1699 let origin = if gap {
1700 BlockOrigin::GapSync
1702 } else if !self.status().state.is_major_syncing() {
1703 BlockOrigin::NetworkBroadcast
1705 } else {
1706 BlockOrigin::NetworkInitialSync
1708 };
1709
1710 if let Some((h, n)) = new_blocks
1711 .last()
1712 .and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
1713 {
1714 trace!(
1715 target: LOG_TARGET,
1716 "Accepted {} blocks ({:?}) with origin {:?}",
1717 new_blocks.len(),
1718 h,
1719 origin,
1720 );
1721 self.on_block_queued(h, n)
1722 }
1723 self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
1724 if let Some(metrics) = &self.metrics {
1725 metrics
1726 .queued_blocks
1727 .set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
1728 }
1729
1730 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks })
1731 }
1732
1733 fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
1734 if let Some(peer) = self.peers.get_mut(peer_id) {
1735 peer.update_common_number(new_common);
1736 }
1737 }
1738
1739 fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
1744 if self.fork_targets.remove(hash).is_some() {
1745 if let Some(metrics) = &self.metrics {
1746 metrics.fork_targets.dec();
1747 }
1748 trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
1749 }
1750 if let Some(gap_sync) = &mut self.gap_sync {
1751 if number > gap_sync.best_queued_number && number <= gap_sync.target {
1752 gap_sync.best_queued_number = number;
1753 }
1754 }
1755 if number > self.best_queued_number {
1756 self.best_queued_number = number;
1757 self.best_queued_hash = *hash;
1758 for (n, peer) in self.peers.iter_mut() {
1760 if let PeerSyncState::AncestorSearch { .. } = peer.state {
1761 continue;
1763 }
1764 let new_common_number =
1765 if peer.best_number >= number { number } else { peer.best_number };
1766 trace!(
1767 target: LOG_TARGET,
1768 "Updating peer {} info, ours={}, common={}->{}, their best={}",
1769 n,
1770 number,
1771 peer.common_number,
1772 new_common_number,
1773 peer.best_number,
1774 );
1775 peer.common_number = new_common_number;
1776 }
1777 }
1778 self.allowed_requests.set_all();
1779 }
1780
1781 fn restart(&mut self) {
1785 self.blocks.clear();
1786 if let Err(e) = self.reset_sync_start_point() {
1787 warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}");
1788 }
1789 self.allowed_requests.set_all();
1790 debug!(
1791 target: LOG_TARGET,
1792 "Restarted with {} ({})",
1793 self.best_queued_number,
1794 self.best_queued_hash,
1795 );
1796 let old_peers = std::mem::take(&mut self.peers);
1797
1798 old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| {
1799 match peer_sync.state {
1800 PeerSyncState::Available => {
1801 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1802 },
1803 PeerSyncState::AncestorSearch { .. }
1804 | PeerSyncState::DownloadingNew(_)
1805 | PeerSyncState::DownloadingStale(_)
1806 | PeerSyncState::DownloadingGap(_)
1807 | PeerSyncState::DownloadingState => {
1808 self.actions
1810 .push(SyncingAction::CancelRequest { peer_id, key: Self::STRATEGY_KEY });
1811 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1812 },
1813 PeerSyncState::DownloadingJustification(_) => {
1814 trace!(
1818 target: LOG_TARGET,
1819 "Keeping peer {} after restart, updating common number from={} => to={} (our best).",
1820 peer_id,
1821 peer_sync.common_number,
1822 self.best_queued_number,
1823 );
1824 peer_sync.common_number = self.best_queued_number;
1825 self.peers.insert(peer_id, peer_sync);
1826 },
1827 }
1828 });
1829 }
1830
1831 fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
1834 let info = self.client.info();
1835 debug!(target: LOG_TARGET, "Restarting sync with client info {info:?}");
1836
1837 if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
1838 warn!(
1839 target: LOG_TARGET,
1840 "Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
1841 );
1842 self.mode = ChainSyncMode::Full;
1843 }
1844
1845 self.import_existing = false;
1846 self.best_queued_hash = info.best_hash;
1847 self.best_queued_number = info.best_number;
1848
1849 if self.mode == ChainSyncMode::Full
1850 && self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
1851 {
1852 self.import_existing = true;
1853 if let Some((hash, number)) = info.finalized_state {
1855 debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
1856 self.best_queued_hash = hash;
1857 self.best_queued_number = number;
1858 } else {
1859 debug!(target: LOG_TARGET, "Restarting from genesis");
1860 self.best_queued_hash = Default::default();
1861 self.best_queued_number = Zero::zero();
1862 }
1863 }
1864
1865 if let Some(BlockGap { start, end, .. }) = info.block_gap {
1866 let old_gap = self.gap_sync.take().map(|g| (g.best_queued_number, g.target));
1867 debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end} (old gap best and target: {old_gap:?})");
1868 self.gap_sync = Some(GapSync {
1869 best_queued_number: start - One::one(),
1870 target: end,
1871 blocks: BlockCollection::new(),
1872 stats: GapSyncStats::new(),
1873 });
1874 }
1875 trace!(
1876 target: LOG_TARGET,
1877 "Restarted sync at #{} ({:?})",
1878 self.best_queued_number,
1879 self.best_queued_hash,
1880 );
1881 Ok(())
1882 }
1883
1884 fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
1886 if self.queue_blocks.contains(hash) {
1887 return Ok(BlockStatus::Queued);
1888 }
1889 self.client.block_status(*hash)
1890 }
1891
1892 fn is_known(&self, hash: &B::Hash) -> bool {
1894 self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
1895 }
1896
1897 fn is_already_downloading(&self, hash: &B::Hash) -> bool {
1899 self.peers
1900 .iter()
1901 .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
1902 }
1903
1904 fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
1906 self.blocks
1907 .ready_blocks(self.best_queued_number + One::one())
1908 .into_iter()
1909 .map(|block_data| {
1910 let justifications = block_data
1911 .block
1912 .justifications
1913 .or_else(|| legacy_justification_mapping(block_data.block.justification));
1914 IncomingBlock {
1915 hash: block_data.block.hash,
1916 header: block_data.block.header,
1917 body: block_data.block.body,
1918 indexed_body: block_data.block.indexed_body,
1919 justifications,
1920 origin: block_data.origin.map(Into::into),
1921 allow_missing_state: true,
1922 import_existing: self.import_existing,
1923 skip_execution: self.skip_execution(),
1924 state: None,
1925 }
1926 })
1927 .collect()
1928 }
1929
1930 fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1932 let peers = &mut self.peers;
1933 let mut matcher = self.extra_justifications.matcher();
1934 std::iter::from_fn(move || {
1935 if let Some((peer, request)) = matcher.next(peers) {
1936 peers
1937 .get_mut(&peer)
1938 .expect(
1939 "`Matcher::next` guarantees the `PeerId` comes from the given peers; qed",
1940 )
1941 .state = PeerSyncState::DownloadingJustification(request.0);
1942 let req = BlockRequest::<B> {
1943 id: 0,
1944 fields: BlockAttributes::JUSTIFICATION,
1945 from: FromBlock::Hash(request.0),
1946 direction: Direction::Ascending,
1947 max: Some(1),
1948 };
1949 Some((peer, req))
1950 } else {
1951 None
1952 }
1953 })
1954 .collect()
1955 }
1956
1957 fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1959 if self.allowed_requests.is_empty() || self.state_sync.is_some() {
1960 return Vec::new();
1961 }
1962
1963 if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
1964 trace!(target: LOG_TARGET, "Too many blocks in the queue.");
1965 return Vec::new();
1966 }
1967 let is_major_syncing = self.status().state.is_major_syncing();
1968 let mode = self.mode;
1969 let is_archive = self.archive_blocks;
1970 let blocks = &mut self.blocks;
1971 let fork_targets = &mut self.fork_targets;
1972 let last_finalized =
1973 std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
1974 let best_queued = self.best_queued_number;
1975 let client = &self.client;
1976 let queue_blocks = &self.queue_blocks;
1977 let allowed_requests = self.allowed_requests.clone();
1978 let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
1979 let max_blocks_per_request = self.max_blocks_per_request;
1980 let gap_sync = &mut self.gap_sync;
1981 let disconnected_peers = &mut self.disconnected_peers;
1982 let metrics = self.metrics.as_ref();
1983 let requests = self
1984 .peers
1985 .iter_mut()
1986 .filter_map(move |(&id, peer)| {
1987 if !peer.state.is_available()
1988 || !allowed_requests.contains(&id)
1989 || !disconnected_peers.is_peer_available(&id)
1990 {
1991 return None;
1992 }
1993
1994 if best_queued.saturating_sub(peer.common_number)
2000 > MAX_BLOCKS_TO_LOOK_BACKWARDS.into()
2001 && best_queued < peer.best_number
2002 && peer.common_number < last_finalized
2003 && queue_blocks.len() <= MAJOR_SYNC_BLOCKS as usize
2004 {
2005 trace!(
2006 target: LOG_TARGET,
2007 "Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
2008 id,
2009 peer.common_number,
2010 best_queued,
2011 );
2012 let current = std::cmp::min(peer.best_number, best_queued);
2013 peer.state = PeerSyncState::AncestorSearch {
2014 current,
2015 start: best_queued,
2016 state: AncestorSearchState::ExponentialBackoff(One::one()),
2017 };
2018 Some((id, ancestry_request::<B>(current)))
2019 } else if let Some((range, req)) = peer_block_request(
2020 &id,
2021 peer,
2022 blocks,
2023 mode.required_block_attributes(false, is_archive),
2024 max_parallel,
2025 max_blocks_per_request,
2026 last_finalized,
2027 best_queued,
2028 ) {
2029 peer.state = PeerSyncState::DownloadingNew(range.start);
2030 trace!(
2031 target: LOG_TARGET,
2032 "New block request for {}, (best:{}, common:{}) {:?}",
2033 id,
2034 peer.best_number,
2035 peer.common_number,
2036 req,
2037 );
2038 Some((id, req))
2039 } else if let Some((hash, req)) = fork_sync_request(
2040 &id,
2041 fork_targets,
2042 best_queued,
2043 last_finalized,
2044 mode.required_block_attributes(false, is_archive),
2045 |hash| {
2046 if queue_blocks.contains(hash) {
2047 BlockStatus::Queued
2048 } else {
2049 client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
2050 }
2051 },
2052 max_blocks_per_request,
2053 metrics,
2054 ) {
2055 trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
2056 peer.state = PeerSyncState::DownloadingStale(hash);
2057 Some((id, req))
2058 } else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
2059 peer_gap_block_request(
2060 &id,
2061 peer,
2062 &mut sync.blocks,
2063 mode.required_block_attributes(true, is_archive),
2064 sync.target,
2065 sync.best_queued_number,
2066 max_blocks_per_request,
2067 )
2068 }) {
2069 peer.state = PeerSyncState::DownloadingGap(range.start);
2070 trace!(
2071 target: LOG_TARGET,
2072 "New gap block request for {}, (best:{}, common:{}) {:?}",
2073 id,
2074 peer.best_number,
2075 peer.common_number,
2076 req,
2077 );
2078 Some((id, req))
2079 } else {
2080 None
2081 }
2082 })
2083 .collect::<Vec<_>>();
2084
2085 if !requests.is_empty() {
2088 self.allowed_requests.take();
2089 }
2090
2091 requests
2092 }
2093
2094 fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
2096 if self.allowed_requests.is_empty() {
2097 return None;
2098 }
2099 if self.state_sync.is_some()
2100 && self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
2101 {
2102 return None;
2104 }
2105 if let Some(sync) = &self.state_sync {
2106 if sync.is_complete() {
2107 return None;
2108 }
2109
2110 for (id, peer) in self.peers.iter_mut() {
2111 if peer.state.is_available()
2112 && peer.common_number >= sync.target_number()
2113 && self.disconnected_peers.is_peer_available(&id)
2114 {
2115 peer.state = PeerSyncState::DownloadingState;
2116 let request = sync.next_request();
2117 trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
2118 self.allowed_requests.clear();
2119 return Some((*id, request));
2120 }
2121 }
2122 }
2123 None
2124 }
2125
2126 #[must_use]
2127 fn on_state_data(&mut self, peer_id: &PeerId, response: &[u8]) -> Result<(), BadPeer> {
2128 let response = match StateResponse::decode(response) {
2129 Ok(response) => response,
2130 Err(error) => {
2131 debug!(
2132 target: LOG_TARGET,
2133 "Failed to decode state response from peer {peer_id:?}: {error:?}.",
2134 );
2135
2136 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2137 },
2138 };
2139
2140 if let Some(peer) = self.peers.get_mut(peer_id) {
2141 if let PeerSyncState::DownloadingState = peer.state {
2142 peer.state = PeerSyncState::Available;
2143 self.allowed_requests.set_all();
2144 }
2145 }
2146 let import_result = if let Some(sync) = &mut self.state_sync {
2147 debug!(
2148 target: LOG_TARGET,
2149 "Importing state data from {} with {} keys, {} proof nodes.",
2150 peer_id,
2151 response.entries.len(),
2152 response.proof.len(),
2153 );
2154 sync.import(response)
2155 } else {
2156 debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
2157 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2158 };
2159
2160 match import_result {
2161 ImportResult::Import(hash, header, state, body, justifications) => {
2162 let origin = BlockOrigin::NetworkInitialSync;
2163 let block = IncomingBlock {
2164 hash,
2165 header: Some(header),
2166 body,
2167 indexed_body: None,
2168 justifications,
2169 origin: None,
2170 allow_missing_state: true,
2171 import_existing: true,
2172 skip_execution: self.skip_execution(),
2173 state: Some(state),
2174 };
2175 debug!(target: LOG_TARGET, "State download is complete. Import is queued");
2176 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
2177 Ok(())
2178 },
2179 ImportResult::Continue => Ok(()),
2180 ImportResult::BadResponse => {
2181 debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
2182 Err(BadPeer(*peer_id, rep::BAD_BLOCK))
2183 },
2184 }
2185 }
2186
2187 fn attempt_state_sync(
2188 &mut self,
2189 finalized_hash: B::Hash,
2190 finalized_number: NumberFor<B>,
2191 skip_proofs: bool,
2192 ) {
2193 let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect();
2194 heads.sort();
2195 let median = heads[heads.len() / 2];
2196 if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
2197 if let Ok(Some(header)) = self.client.header(finalized_hash) {
2198 log::debug!(
2199 target: LOG_TARGET,
2200 "Starting state sync for #{finalized_number} ({finalized_hash})",
2201 );
2202 self.state_sync =
2203 Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs));
2204 self.allowed_requests.set_all();
2205 } else {
2206 log::error!(
2207 target: LOG_TARGET,
2208 "Failed to start state sync: header for finalized block \
2209 #{finalized_number} ({finalized_hash}) is not available",
2210 );
2211 debug_assert!(false);
2212 }
2213 }
2214 }
2215
2216 #[cfg(test)]
2218 #[must_use]
2219 fn take_actions(&mut self) -> impl Iterator<Item = SyncingAction<B>> {
2220 std::mem::take(&mut self.actions).into_iter()
2221 }
2222}
2223
2224fn legacy_justification_mapping(
2229 justification: Option<EncodedJustification>,
2230) -> Option<Justifications> {
2231 justification.map(|just| (*b"FRNK", just).into())
2232}
2233
2234fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> {
2237 BlockRequest::<B> {
2238 id: 0,
2239 fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
2240 from: FromBlock::Number(block),
2241 direction: Direction::Ascending,
2242 max: Some(1),
2243 }
2244}
2245
2246#[derive(Copy, Clone, Eq, PartialEq, Debug)]
2249pub(crate) enum AncestorSearchState<B: BlockT> {
2250 ExponentialBackoff(NumberFor<B>),
2253 BinarySearch(NumberFor<B>, NumberFor<B>),
2256}
2257
2258fn handle_ancestor_search_state<B: BlockT>(
2266 state: &AncestorSearchState<B>,
2267 curr_block_num: NumberFor<B>,
2268 block_hash_match: bool,
2269) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
2270 let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
2271 match state {
2272 AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
2273 let next_distance_to_tip = *next_distance_to_tip;
2274 if block_hash_match && next_distance_to_tip == One::one() {
2275 return None;
2278 }
2279 if block_hash_match {
2280 let left = curr_block_num;
2281 let right = left + next_distance_to_tip / two;
2282 let middle = left + (right - left) / two;
2283 Some((AncestorSearchState::BinarySearch(left, right), middle))
2284 } else {
2285 let next_block_num =
2286 curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or_else(Zero::zero);
2287 let next_distance_to_tip = next_distance_to_tip * two;
2288 Some((
2289 AncestorSearchState::ExponentialBackoff(next_distance_to_tip),
2290 next_block_num,
2291 ))
2292 }
2293 },
2294 AncestorSearchState::BinarySearch(mut left, mut right) => {
2295 if left >= curr_block_num {
2296 return None;
2297 }
2298 if block_hash_match {
2299 left = curr_block_num;
2300 } else {
2301 right = curr_block_num;
2302 }
2303 assert!(right >= left);
2304 let middle = left + (right - left) / two;
2305 if middle == curr_block_num {
2306 None
2307 } else {
2308 Some((AncestorSearchState::BinarySearch(left, right), middle))
2309 }
2310 },
2311 }
2312}
2313
2314fn peer_block_request<B: BlockT>(
2316 id: &PeerId,
2317 peer: &PeerSync<B>,
2318 blocks: &mut BlockCollection<B>,
2319 attrs: BlockAttributes,
2320 max_parallel_downloads: u32,
2321 max_blocks_per_request: u32,
2322 finalized: NumberFor<B>,
2323 best_num: NumberFor<B>,
2324) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2325 if best_num >= peer.best_number {
2326 return None;
2328 } else if peer.common_number < finalized {
2329 trace!(
2330 target: LOG_TARGET,
2331 "Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
2332 id, peer.common_number, finalized, peer.best_number, best_num,
2333 );
2334 }
2335 let range = blocks.needed_blocks(
2336 *id,
2337 max_blocks_per_request,
2338 peer.best_number,
2339 peer.common_number,
2340 max_parallel_downloads,
2341 MAX_DOWNLOAD_AHEAD,
2342 )?;
2343
2344 let last = range.end.saturating_sub(One::one());
2346
2347 let from = if peer.best_number == last {
2348 FromBlock::Hash(peer.best_hash)
2349 } else {
2350 FromBlock::Number(last)
2351 };
2352
2353 let request = BlockRequest::<B> {
2354 id: 0,
2355 fields: attrs,
2356 from,
2357 direction: Direction::Descending,
2358 max: Some((range.end - range.start).saturated_into::<u32>()),
2359 };
2360
2361 Some((range, request))
2362}
2363
2364fn peer_gap_block_request<B: BlockT>(
2366 id: &PeerId,
2367 peer: &PeerSync<B>,
2368 blocks: &mut BlockCollection<B>,
2369 attrs: BlockAttributes,
2370 target: NumberFor<B>,
2371 common_number: NumberFor<B>,
2372 max_blocks_per_request: u32,
2373) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2374 let range = blocks.needed_blocks(
2375 *id,
2376 max_blocks_per_request,
2377 std::cmp::min(peer.best_number, target),
2378 common_number,
2379 1,
2380 MAX_DOWNLOAD_AHEAD,
2381 )?;
2382
2383 let last = range.end.saturating_sub(One::one());
2385 let from = FromBlock::Number(last);
2386
2387 let request = BlockRequest::<B> {
2388 id: 0,
2389 fields: attrs,
2390 from,
2391 direction: Direction::Descending,
2392 max: Some((range.end - range.start).saturated_into::<u32>()),
2393 };
2394 Some((range, request))
2395}
2396
2397fn fork_sync_request<B: BlockT>(
2399 id: &PeerId,
2400 fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
2401 best_num: NumberFor<B>,
2402 finalized: NumberFor<B>,
2403 attributes: BlockAttributes,
2404 check_block: impl Fn(&B::Hash) -> BlockStatus,
2405 max_blocks_per_request: u32,
2406 metrics: Option<&Metrics>,
2407) -> Option<(B::Hash, BlockRequest<B>)> {
2408 fork_targets.retain(|hash, r| {
2409 if r.number <= finalized {
2410 trace!(
2411 target: LOG_TARGET,
2412 "Removed expired fork sync request {:?} (#{})",
2413 hash,
2414 r.number,
2415 );
2416 return false;
2417 }
2418 if check_block(hash) != BlockStatus::Unknown {
2419 trace!(
2420 target: LOG_TARGET,
2421 "Removed obsolete fork sync request {:?} (#{})",
2422 hash,
2423 r.number,
2424 );
2425 return false;
2426 }
2427 true
2428 });
2429 if let Some(metrics) = metrics {
2430 metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
2431 }
2432 for (hash, r) in fork_targets {
2433 if !r.peers.contains(&id) {
2434 continue;
2435 }
2436 if r.number <= best_num
2439 || (r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
2440 {
2441 let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
2442 let count = if parent_status == BlockStatus::Unknown {
2443 (r.number - finalized).saturated_into::<u32>() } else {
2445 1
2447 };
2448 trace!(
2449 target: LOG_TARGET,
2450 "Downloading requested fork {hash:?} from {id}, {count} blocks",
2451 );
2452 return Some((
2453 *hash,
2454 BlockRequest::<B> {
2455 id: 0,
2456 fields: attributes,
2457 from: FromBlock::Hash(*hash),
2458 direction: Direction::Descending,
2459 max: Some(count),
2460 },
2461 ));
2462 } else {
2463 trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
2464 }
2465 }
2466 None
2467}
2468
2469fn is_descendent_of<Block, T>(
2471 client: &T,
2472 base: &Block::Hash,
2473 block: &Block::Hash,
2474) -> soil_client::blockchain::Result<bool>
2475where
2476 Block: BlockT,
2477 T: HeaderMetadata<Block, Error = soil_client::blockchain::Error> + ?Sized,
2478{
2479 if base == block {
2480 return Ok(false);
2481 }
2482
2483 let ancestor = soil_client::blockchain::lowest_common_ancestor(client, *block, *base)?;
2484
2485 Ok(ancestor.hash == *base)
2486}
2487
2488pub fn validate_blocks<Block: BlockT>(
2493 blocks: &Vec<BlockData<Block>>,
2494 peer_id: &PeerId,
2495 request: Option<BlockRequest<Block>>,
2496) -> Result<Option<NumberFor<Block>>, BadPeer> {
2497 if let Some(request) = request {
2498 if Some(blocks.len() as _) > request.max {
2499 debug!(
2500 target: LOG_TARGET,
2501 "Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
2502 peer_id,
2503 request.max,
2504 blocks.len(),
2505 );
2506
2507 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2508 }
2509
2510 let block_header =
2511 if request.direction == Direction::Descending { blocks.last() } else { blocks.first() }
2512 .and_then(|b| b.header.as_ref());
2513
2514 let expected_block = block_header.as_ref().map_or(false, |h| match request.from {
2515 FromBlock::Hash(hash) => h.hash() == hash,
2516 FromBlock::Number(n) => h.number() == &n,
2517 });
2518
2519 if !expected_block {
2520 debug!(
2521 target: LOG_TARGET,
2522 "Received block that was not requested. Requested {:?}, got {:?}.",
2523 request.from,
2524 block_header,
2525 );
2526
2527 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2528 }
2529
2530 if request.fields.contains(BlockAttributes::HEADER)
2531 && blocks.iter().any(|b| b.header.is_none())
2532 {
2533 trace!(
2534 target: LOG_TARGET,
2535 "Missing requested header for a block in response from {peer_id}.",
2536 );
2537
2538 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2539 }
2540
2541 if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
2542 {
2543 trace!(
2544 target: LOG_TARGET,
2545 "Missing requested body for a block in response from {peer_id}.",
2546 );
2547
2548 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2549 }
2550 }
2551
2552 for b in blocks {
2553 if let Some(header) = &b.header {
2554 let hash = header.hash();
2555 if hash != b.hash {
2556 debug!(
2557 target: LOG_TARGET,
2558 "Bad header received from {}. Expected hash {:?}, got {:?}",
2559 peer_id,
2560 b.hash,
2561 hash,
2562 );
2563 return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2564 }
2565 }
2566 }
2567
2568 Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
2569}