1use crate::{
32 block_relay_protocol::{BlockDownloader, BlockResponseError},
33 blocks::BlockCollection,
34 justification_requests::ExtraRequests,
35 schema::v1::{StateRequest, StateResponse},
36 service::network::NetworkServiceHandle,
37 strategy::{
38 disconnected_peers::DisconnectedPeers,
39 state_sync::{ImportResult, StateSync, StateSyncProvider},
40 warp::{WarpSyncPhase, WarpSyncProgress},
41 StrategyKey, SyncingAction, SyncingStrategy,
42 },
43 types::{BadPeer, SyncState, SyncStatus},
44 LOG_TARGET,
45};
46
47use codec::Encode;
48use futures::{channel::oneshot, FutureExt};
49use log::{debug, error, info, trace, warn};
50use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
51use prost::Message;
52use sc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
53use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
54use sc_network::{IfDisconnected, ProtocolName};
55use sc_network_common::sync::message::{
56 BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
57};
58use sc_network_types::PeerId;
59use sp_arithmetic::traits::Saturating;
60use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
61use sp_consensus::{BlockOrigin, BlockStatus};
62use sp_runtime::{
63 traits::{
64 Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One, SaturatedConversion, Zero,
65 },
66 EncodedJustification, Justifications,
67};
68
69use std::{
70 any::Any,
71 collections::{HashMap, HashSet},
72 fmt,
73 ops::{AddAssign, Range},
74 sync::Arc,
75};
76
77#[cfg(test)]
78mod test;
79
80const MAX_IMPORTING_BLOCKS: usize = 2048;
82
83const MAX_DOWNLOAD_AHEAD: u32 = 2048;
85
86const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
89
90const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
92
93const MAJOR_SYNC_BLOCKS: u8 = 5;
99
100mod rep {
101 use sc_network::ReputationChange as Rep;
102 pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
105
106 pub const GENESIS_MISMATCH: Rep = Rep::new(i32::MIN, "Genesis mismatch");
109
110 pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
112
113 pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
115
116 pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
118
119 pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
121
122 pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
124
125 pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
127
128 pub const UNKNOWN_ANCESTOR: Rep = Rep::new(-(1 << 16), "DB Error");
130
131 pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
133
134 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
136}
137
138struct Metrics {
139 queued_blocks: Gauge<U64>,
140 fork_targets: Gauge<U64>,
141}
142
143impl Metrics {
144 fn register(r: &Registry) -> Result<Self, PrometheusError> {
145 Ok(Self {
146 queued_blocks: {
147 let g =
148 Gauge::new("substrate_sync_queued_blocks", "Number of blocks in import queue")?;
149 register(g, r)?
150 },
151 fork_targets: {
152 let g = Gauge::new("substrate_sync_fork_targets", "Number of fork sync targets")?;
153 register(g, r)?
154 },
155 })
156 }
157}
158
159#[derive(Debug, Clone)]
160enum AllowedRequests {
161 Some(HashSet<PeerId>),
162 All,
163}
164
165impl AllowedRequests {
166 fn add(&mut self, id: &PeerId) {
167 if let Self::Some(ref mut set) = self {
168 set.insert(*id);
169 }
170 }
171
172 fn take(&mut self) -> Self {
173 std::mem::take(self)
174 }
175
176 fn set_all(&mut self) {
177 *self = Self::All;
178 }
179
180 fn contains(&self, id: &PeerId) -> bool {
181 match self {
182 Self::Some(set) => set.contains(id),
183 Self::All => true,
184 }
185 }
186
187 fn is_empty(&self) -> bool {
188 match self {
189 Self::Some(set) => set.is_empty(),
190 Self::All => false,
191 }
192 }
193
194 fn clear(&mut self) {
195 std::mem::take(self);
196 }
197}
198
199impl Default for AllowedRequests {
200 fn default() -> Self {
201 Self::Some(HashSet::default())
202 }
203}
204
205#[derive(Debug, Default, Clone)]
207struct GapSyncStats {
208 header_bytes: usize,
210 body_bytes: usize,
212 justification_bytes: usize,
214}
215
216impl GapSyncStats {
217 fn new() -> Self {
218 Self::default()
219 }
220
221 fn total_bytes(&self) -> usize {
222 self.header_bytes + self.body_bytes + self.justification_bytes
223 }
224
225 fn bytes_to_mib(bytes: usize) -> f64 {
226 bytes as f64 / (1024.0 * 1024.0)
227 }
228}
229
230impl fmt::Display for GapSyncStats {
231 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
232 let total = self.total_bytes();
233 write!(
234 f,
235 "hdr: {} B ({:.2} MiB), body: {} B ({:.2} MiB), just: {} B ({:.2} MiB) | total: {} B ({:.2} MiB)",
236 self.header_bytes,
237 Self::bytes_to_mib(self.header_bytes),
238 self.body_bytes,
239 Self::bytes_to_mib(self.body_bytes),
240 self.justification_bytes,
241 Self::bytes_to_mib(self.justification_bytes),
242 total,
243 Self::bytes_to_mib(total),
244 )
245 }
246}
247
248impl AddAssign for GapSyncStats {
249 fn add_assign(&mut self, other: Self) {
250 self.header_bytes += other.header_bytes;
251 self.body_bytes += other.body_bytes;
252 self.justification_bytes += other.justification_bytes;
253 }
254}
255
256struct GapSync<B: BlockT> {
257 blocks: BlockCollection<B>,
258 best_queued_number: NumberFor<B>,
259 target: NumberFor<B>,
260 stats: GapSyncStats,
261}
262
263#[derive(Copy, Clone, Debug, Eq, PartialEq)]
265pub enum ChainSyncMode {
266 Full,
268 LightState {
270 skip_proofs: bool,
272 storage_chain_mode: bool,
274 },
275}
276
277impl ChainSyncMode {
278 pub fn required_block_attributes(&self, is_gap: bool, is_archive: bool) -> BlockAttributes {
280 let attrs = match self {
281 ChainSyncMode::Full => {
282 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
283 },
284 ChainSyncMode::LightState { storage_chain_mode: false, .. } => {
285 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
286 },
287 ChainSyncMode::LightState { storage_chain_mode: true, .. } => {
288 BlockAttributes::HEADER |
289 BlockAttributes::JUSTIFICATION |
290 BlockAttributes::INDEXED_BODY
291 },
292 };
293 if is_gap && !is_archive {
296 attrs & !BlockAttributes::BODY
297 } else {
298 attrs
299 }
300 }
301}
302
303#[derive(Debug, Clone)]
305pub(crate) struct PeerSync<B: BlockT> {
306 pub peer_id: PeerId,
308 pub common_number: NumberFor<B>,
311 pub best_hash: B::Hash,
313 pub best_number: NumberFor<B>,
315 pub state: PeerSyncState<B>,
318}
319
320impl<B: BlockT> PeerSync<B> {
321 fn update_common_number(&mut self, new_common: NumberFor<B>) {
323 if self.common_number < new_common {
324 trace!(
325 target: LOG_TARGET,
326 "Updating peer {} common number from={} => to={}.",
327 self.peer_id,
328 self.common_number,
329 new_common,
330 );
331 self.common_number = new_common;
332 }
333 }
334}
335
336struct ForkTarget<B: BlockT> {
337 number: NumberFor<B>,
338 parent_hash: Option<B::Hash>,
339 peers: HashSet<PeerId>,
340}
341
342#[derive(Copy, Clone, Eq, PartialEq, Debug)]
347pub(crate) enum PeerSyncState<B: BlockT> {
348 Available,
350 AncestorSearch {
352 start: NumberFor<B>,
354 current: NumberFor<B>,
356 state: AncestorSearchState<B>,
358 },
359 DownloadingNew(NumberFor<B>),
361 DownloadingStale(B::Hash),
365 DownloadingJustification(B::Hash),
367 DownloadingState,
369 DownloadingGap(NumberFor<B>),
371}
372
373impl<B: BlockT> PeerSyncState<B> {
374 pub fn is_available(&self) -> bool {
375 matches!(self, Self::Available)
376 }
377}
378
379pub struct ChainSync<B: BlockT, Client> {
382 client: Arc<Client>,
384 peers: HashMap<PeerId, PeerSync<B>>,
386 disconnected_peers: DisconnectedPeers,
387 blocks: BlockCollection<B>,
389 best_queued_number: NumberFor<B>,
391 best_queued_hash: B::Hash,
393 mode: ChainSyncMode,
395 extra_justifications: ExtraRequests<B>,
397 queue_blocks: HashSet<B::Hash>,
400 pending_state_sync_attempt: Option<(B::Hash, NumberFor<B>, bool)>,
408 fork_targets: HashMap<B::Hash, ForkTarget<B>>,
410 allowed_requests: AllowedRequests,
412 max_parallel_downloads: u32,
414 max_blocks_per_request: u32,
416 state_request_protocol_name: ProtocolName,
418 downloaded_blocks: usize,
420 state_sync: Option<StateSync<B, Client>>,
422 import_existing: bool,
425 block_downloader: Arc<dyn BlockDownloader<B>>,
427 archive_blocks: bool,
430 gap_sync: Option<GapSync<B>>,
432 actions: Vec<SyncingAction<B>>,
434 metrics: Option<Metrics>,
436}
437
438impl<B, Client> SyncingStrategy<B> for ChainSync<B, Client>
439where
440 B: BlockT,
441 Client: HeaderBackend<B>
442 + BlockBackend<B>
443 + HeaderMetadata<B, Error = sp_blockchain::Error>
444 + ProofProvider<B>
445 + Send
446 + Sync
447 + 'static,
448{
449 fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
450 match self.add_peer_inner(peer_id, best_hash, best_number) {
451 Ok(Some(request)) => {
452 let action = self.create_block_request_action(peer_id, request);
453 self.actions.push(action);
454 },
455 Ok(None) => {},
456 Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)),
457 }
458 }
459
460 fn remove_peer(&mut self, peer_id: &PeerId) {
461 self.blocks.clear_peer_download(peer_id);
462 if let Some(gap_sync) = &mut self.gap_sync {
463 gap_sync.blocks.clear_peer_download(peer_id)
464 }
465
466 if let Some(state) = self.peers.remove(peer_id) {
467 if !state.state.is_available() {
468 if let Some(bad_peer) =
469 self.disconnected_peers.on_disconnect_during_request(*peer_id)
470 {
471 self.actions.push(SyncingAction::DropPeer(bad_peer));
472 }
473 }
474 }
475
476 self.extra_justifications.peer_disconnected(peer_id);
477 self.allowed_requests.set_all();
478 self.fork_targets.retain(|_, target| {
479 target.peers.remove(peer_id);
480 !target.peers.is_empty()
481 });
482 if let Some(metrics) = &self.metrics {
483 metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
484 }
485
486 let blocks = self.ready_blocks();
487
488 if !blocks.is_empty() {
489 self.validate_and_queue_blocks(blocks, false);
490 }
491 }
492
493 fn on_validated_block_announce(
494 &mut self,
495 is_best: bool,
496 peer_id: PeerId,
497 announce: &BlockAnnounce<B::Header>,
498 ) -> Option<(B::Hash, NumberFor<B>)> {
499 let number = *announce.header.number();
500 let hash = announce.header.hash();
501 let parent_status =
502 self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
503 let known_parent = parent_status != BlockStatus::Unknown;
504 let ancient_parent = parent_status == BlockStatus::InChainPruned;
505
506 let known = self.is_known(&hash);
507 let is_major_syncing = self.is_major_syncing();
508 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
509 peer
510 } else {
511 error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}");
512 return Some((hash, number));
513 };
514
515 if let PeerSyncState::AncestorSearch { .. } = peer.state {
516 trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
517 return None;
518 }
519
520 let continues_known_fork =
523 known || known_parent || announce.header.parent_hash() == &peer.best_hash;
524
525 let peer_info = is_best.then(|| {
526 peer.best_number = number;
528 peer.best_hash = hash;
529
530 (hash, number)
531 });
532
533 if is_best {
536 let best_queued_number = self.best_queued_number;
537
538 if known && best_queued_number >= number {
539 peer.update_common_number(number);
540 } else if announce.header.parent_hash() == &self.best_queued_hash ||
541 known_parent && best_queued_number >= number
542 {
543 peer.update_common_number(number.saturating_sub(One::one()));
544 }
545
546 if !continues_known_fork && !is_major_syncing {
550 let current = number.min(best_queued_number);
551 peer.common_number = peer.common_number.min(self.client.info().finalized_number);
552 peer.state = PeerSyncState::AncestorSearch {
553 current,
554 start: best_queued_number,
555 state: AncestorSearchState::ExponentialBackoff(One::one()),
556 };
557
558 let request = ancestry_request::<B>(current);
559 let action = self.create_block_request_action(peer_id, request);
560 self.actions.push(action);
561
562 return peer_info;
563 }
564 }
565 self.allowed_requests.add(&peer_id);
566
567 if known || self.is_already_downloading(&hash) {
569 trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash);
570 if let Some(target) = self.fork_targets.get_mut(&hash) {
571 target.peers.insert(peer_id);
572 }
573 return peer_info;
574 }
575
576 if ancient_parent {
577 trace!(
578 target: LOG_TARGET,
579 "Ignored ancient block announced from {}: {} {:?}",
580 peer_id,
581 hash,
582 announce.header,
583 );
584 return peer_info;
585 }
586
587 if self.status().state == SyncState::Idle {
588 trace!(
589 target: LOG_TARGET,
590 "Added sync target for block announced from {}: {} {:?}",
591 peer_id,
592 hash,
593 announce.summary(),
594 );
595 self.fork_targets
596 .entry(hash)
597 .or_insert_with(|| {
598 if let Some(metrics) = &self.metrics {
599 metrics.fork_targets.inc();
600 }
601
602 ForkTarget {
603 number,
604 parent_hash: Some(*announce.header.parent_hash()),
605 peers: Default::default(),
606 }
607 })
608 .peers
609 .insert(peer_id);
610 }
611
612 peer_info
613 }
614
615 fn set_sync_fork_request(
617 &mut self,
618 mut peers: Vec<PeerId>,
619 hash: &B::Hash,
620 number: NumberFor<B>,
621 ) {
622 if peers.is_empty() {
623 peers = self
624 .peers
625 .iter()
626 .filter(|(_, peer)| peer.best_number >= number)
628 .map(|(id, _)| *id)
629 .collect();
630
631 debug!(
632 target: LOG_TARGET,
633 "Explicit sync request for block {hash:?} with no peers specified. \
634 Syncing from these peers {peers:?} instead.",
635 );
636 } else {
637 debug!(
638 target: LOG_TARGET,
639 "Explicit sync request for block {hash:?} with {peers:?}",
640 );
641 }
642
643 if self.is_known(hash) {
644 debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
645 return;
646 }
647
648 trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
649 for peer_id in &peers {
650 if let Some(peer) = self.peers.get_mut(peer_id) {
651 if let PeerSyncState::AncestorSearch { .. } = peer.state {
652 continue;
653 }
654
655 if number > peer.best_number {
656 peer.best_number = number;
657 peer.best_hash = *hash;
658 }
659 self.allowed_requests.add(peer_id);
660 }
661 }
662
663 self.fork_targets
664 .entry(*hash)
665 .or_insert_with(|| {
666 if let Some(metrics) = &self.metrics {
667 metrics.fork_targets.inc();
668 }
669
670 ForkTarget { number, peers: Default::default(), parent_hash: None }
671 })
672 .peers
673 .extend(peers);
674 }
675
676 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
677 let client = &self.client;
678 self.extra_justifications
679 .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block))
680 }
681
682 fn clear_justification_requests(&mut self) {
683 self.extra_justifications.reset();
684 }
685
686 fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
687 let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
688 self.extra_justifications
689 .try_finalize_root((hash, number), finalization_result, true);
690 self.allowed_requests.set_all();
691 }
692
693 fn on_generic_response(
694 &mut self,
695 peer_id: &PeerId,
696 key: StrategyKey,
697 protocol_name: ProtocolName,
698 response: Box<dyn Any + Send>,
699 ) {
700 if Self::STRATEGY_KEY != key {
701 warn!(
702 target: LOG_TARGET,
703 "Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
704 );
705 debug_assert!(false);
706 return;
707 }
708
709 if protocol_name == self.state_request_protocol_name {
710 let Ok(response) = response.downcast::<Vec<u8>>() else {
711 warn!(target: LOG_TARGET, "Failed to downcast state response");
712 debug_assert!(false);
713 return;
714 };
715
716 if let Err(bad_peer) = self.on_state_data(&peer_id, &response) {
717 self.actions.push(SyncingAction::DropPeer(bad_peer));
718 }
719 } else if &protocol_name == self.block_downloader.protocol_name() {
720 let Ok(response) = response
721 .downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
722 else {
723 warn!(target: LOG_TARGET, "Failed to downcast block response");
724 debug_assert!(false);
725 return;
726 };
727
728 let (request, response) = *response;
729 let blocks = match response {
730 Ok(blocks) => blocks,
731 Err(BlockResponseError::DecodeFailed(e)) => {
732 debug!(
733 target: LOG_TARGET,
734 "Failed to decode block response from peer {:?}: {:?}.",
735 peer_id,
736 e
737 );
738 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
739 return;
740 },
741 Err(BlockResponseError::ExtractionFailed(e)) => {
742 debug!(
743 target: LOG_TARGET,
744 "Failed to extract blocks from peer response {:?}: {:?}.",
745 peer_id,
746 e
747 );
748 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
749 return;
750 },
751 };
752
753 if let Err(bad_peer) = self.on_block_response(peer_id, key, request, blocks) {
754 self.actions.push(SyncingAction::DropPeer(bad_peer));
755 }
756 } else {
757 warn!(
758 target: LOG_TARGET,
759 "Unexpected generic response protocol {protocol_name}, strategy key \
760 {key:?}",
761 );
762 debug_assert!(false);
763 }
764 }
765
766 fn on_blocks_processed(
767 &mut self,
768 imported: usize,
769 count: usize,
770 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
771 ) {
772 trace!(target: LOG_TARGET, "Imported {imported} of {count}");
773
774 let mut has_error = false;
775 for (_, hash) in &results {
776 if self.queue_blocks.remove(hash) {
777 if let Some(metrics) = &self.metrics {
778 metrics.queued_blocks.dec();
779 }
780 }
781 self.blocks.clear_queued(hash);
782 if let Some(gap_sync) = &mut self.gap_sync {
783 gap_sync.blocks.clear_queued(hash);
784 }
785 }
786 for (result, hash) in results {
787 if has_error {
788 break;
789 }
790
791 has_error |= result.is_err();
792
793 match result {
794 Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => {
795 if let Some(peer) = peer_id {
796 self.update_peer_common_number(&peer, number);
797 }
798 self.complete_gap_if_target(number);
799 },
800 Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
801 if aux.clear_justification_requests {
802 trace!(
803 target: LOG_TARGET,
804 "Block imported clears all pending justification requests {number}: {hash:?}",
805 );
806 self.clear_justification_requests();
807 }
808
809 if aux.needs_justification {
810 trace!(
811 target: LOG_TARGET,
812 "Block imported but requires justification {number}: {hash:?}",
813 );
814 self.request_justification(&hash, number);
815 }
816
817 if aux.bad_justification {
818 if let Some(ref peer) = peer_id {
819 warn!("💔 Sent block with bad justification to import");
820 self.actions.push(SyncingAction::DropPeer(BadPeer(
821 *peer,
822 rep::BAD_JUSTIFICATION,
823 )));
824 }
825 }
826
827 if let Some(peer) = peer_id {
828 self.update_peer_common_number(&peer, number);
829 }
830 let state_sync_complete =
831 self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash);
832 if state_sync_complete {
833 info!(
834 target: LOG_TARGET,
835 "State sync is complete ({} MiB), restarting block sync.",
836 self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
837 );
838 self.state_sync = None;
839 self.mode = ChainSyncMode::Full;
840 self.restart();
841 }
842
843 self.complete_gap_if_target(number);
844 },
845 Err(BlockImportError::IncompleteHeader(peer_id)) => {
846 if let Some(peer) = peer_id {
847 warn!(
848 target: LOG_TARGET,
849 "💔 Peer sent block with incomplete header to import",
850 );
851 self.actions
852 .push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
853 self.restart();
854 }
855 },
856 Err(BlockImportError::VerificationFailed(peer_id, e)) => {
857 let extra_message = peer_id
858 .map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
859
860 warn!(
861 target: LOG_TARGET,
862 "💔 Verification failed for block {hash:?}{extra_message}: {e:?}",
863 );
864
865 if let Some(peer) = peer_id {
866 self.actions
867 .push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
868 }
869
870 self.restart();
871 },
872 Err(BlockImportError::BadBlock(peer_id)) => {
873 if let Some(peer) = peer_id {
874 warn!(
875 target: LOG_TARGET,
876 "💔 Block {hash:?} received from peer {peer} has been blacklisted",
877 );
878 self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
879 }
880 },
881 Err(BlockImportError::MissingState) => {
882 trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
886 },
887 e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
888 warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err());
889 self.state_sync = None;
890 self.restart();
891 },
892 Err(BlockImportError::Cancelled) => {},
893 };
894 }
895
896 self.allowed_requests.set_all();
897 }
898
899 fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
900 let client = &self.client;
901 let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
902 is_descendent_of(&**client, base, block)
903 });
904
905 if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode {
906 if self.state_sync.is_none() {
907 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
908 self.attempt_state_sync(*hash, number, *skip_proofs);
909 } else {
910 self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs));
911 }
912 }
913 }
914
915 if let Err(err) = r {
916 warn!(
917 target: LOG_TARGET,
918 "💔 Error cleaning up pending extra justification data requests: {err}",
919 );
920 }
921 }
922
923 fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
924 self.on_block_queued(best_hash, best_number);
925 }
926
927 fn is_major_syncing(&self) -> bool {
928 self.status().state.is_major_syncing()
929 }
930
931 fn num_peers(&self) -> usize {
932 self.peers.len()
933 }
934
935 fn status(&self) -> SyncStatus<B> {
936 let median_seen = self.median_seen();
937 let best_seen_block =
938 median_seen.and_then(|median| (median > self.best_queued_number).then_some(median));
939 let sync_state = if let Some(target) = median_seen {
940 let best_block = self.client.info().best_number;
944 if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() {
945 if target > self.best_queued_number {
947 SyncState::Downloading { target }
948 } else {
949 SyncState::Importing { target }
950 }
951 } else {
952 SyncState::Idle
953 }
954 } else {
955 SyncState::Idle
956 };
957
958 let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress {
959 phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
960 total_bytes: 0,
961 status: None,
962 });
963
964 SyncStatus {
965 state: sync_state,
966 best_seen_block,
967 num_peers: self.peers.len() as u32,
968 queued_blocks: self.queue_blocks.len() as u32,
969 state_sync: self.state_sync.as_ref().map(|s| s.progress()),
970 warp_sync: warp_sync_progress,
971 }
972 }
973
974 fn num_downloaded_blocks(&self) -> usize {
975 self.downloaded_blocks
976 }
977
978 fn num_sync_requests(&self) -> usize {
979 self.fork_targets
980 .values()
981 .filter(|f| f.number <= self.best_queued_number)
982 .count()
983 }
984
985 fn actions(
986 &mut self,
987 network_service: &NetworkServiceHandle,
988 ) -> Result<Vec<SyncingAction<B>>, ClientError> {
989 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
990 if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() {
991 self.attempt_state_sync(hash, number, skip_proofs);
992 }
993 }
994
995 let block_requests = self
996 .block_requests()
997 .into_iter()
998 .map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
999 .collect::<Vec<_>>();
1000 self.actions.extend(block_requests);
1001
1002 let justification_requests = self
1003 .justification_requests()
1004 .into_iter()
1005 .map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
1006 .collect::<Vec<_>>();
1007 self.actions.extend(justification_requests);
1008
1009 let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
1010 trace!(
1011 target: LOG_TARGET,
1012 "Created `StateRequest` to {peer_id}.",
1013 );
1014
1015 let (tx, rx) = oneshot::channel();
1016
1017 network_service.start_request(
1018 peer_id,
1019 self.state_request_protocol_name.clone(),
1020 request.encode_to_vec(),
1021 tx,
1022 IfDisconnected::ImmediateError,
1023 );
1024
1025 SyncingAction::StartRequest {
1026 peer_id,
1027 key: Self::STRATEGY_KEY,
1028 request: async move {
1029 Ok(rx.await?.and_then(|(response, protocol_name)| {
1030 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
1031 }))
1032 }
1033 .boxed(),
1034 remove_obsolete: false,
1035 }
1036 });
1037 self.actions.extend(state_request);
1038
1039 Ok(std::mem::take(&mut self.actions))
1040 }
1041}
1042
1043impl<B, Client> ChainSync<B, Client>
1044where
1045 B: BlockT,
1046 Client: HeaderBackend<B>
1047 + BlockBackend<B>
1048 + HeaderMetadata<B, Error = sp_blockchain::Error>
1049 + ProofProvider<B>
1050 + Send
1051 + Sync
1052 + 'static,
1053{
1054 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("ChainSync");
1056
1057 pub fn new(
1059 mode: ChainSyncMode,
1060 client: Arc<Client>,
1061 max_parallel_downloads: u32,
1062 max_blocks_per_request: u32,
1063 state_request_protocol_name: ProtocolName,
1064 block_downloader: Arc<dyn BlockDownloader<B>>,
1065 archive_blocks: bool,
1066 metrics_registry: Option<&Registry>,
1067 initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
1068 ) -> Result<Self, ClientError> {
1069 let mut sync = Self {
1070 client,
1071 peers: HashMap::new(),
1072 disconnected_peers: DisconnectedPeers::new(),
1073 blocks: BlockCollection::new(),
1074 best_queued_hash: Default::default(),
1075 best_queued_number: Zero::zero(),
1076 extra_justifications: ExtraRequests::new("justification", metrics_registry),
1077 mode,
1078 queue_blocks: Default::default(),
1079 pending_state_sync_attempt: None,
1080 fork_targets: Default::default(),
1081 allowed_requests: Default::default(),
1082 max_parallel_downloads,
1083 max_blocks_per_request,
1084 state_request_protocol_name,
1085 downloaded_blocks: 0,
1086 state_sync: None,
1087 import_existing: false,
1088 block_downloader,
1089 archive_blocks,
1090 gap_sync: None,
1091 actions: Vec::new(),
1092 metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
1093 Ok(metrics) => Some(metrics),
1094 Err(err) => {
1095 log::error!(
1096 target: LOG_TARGET,
1097 "Failed to register `ChainSync` metrics {err:?}",
1098 );
1099 None
1100 },
1101 }),
1102 };
1103
1104 sync.reset_sync_start_point()?;
1105 initial_peers.for_each(|(peer_id, best_hash, best_number)| {
1106 sync.add_peer(peer_id, best_hash, best_number);
1107 });
1108
1109 Ok(sync)
1110 }
1111
1112 fn complete_gap_if_target(&mut self, number: NumberFor<B>) {
1114 let Some(gap_sync) = &self.gap_sync else { return };
1115
1116 if gap_sync.target != number {
1117 return;
1118 }
1119
1120 info!(
1121 target: LOG_TARGET,
1122 "Block history download is complete.",
1123 );
1124 self.gap_sync = None;
1125 }
1126
1127 #[must_use]
1128 fn add_peer_inner(
1129 &mut self,
1130 peer_id: PeerId,
1131 best_hash: B::Hash,
1132 best_number: NumberFor<B>,
1133 ) -> Result<Option<BlockRequest<B>>, BadPeer> {
1134 match self.block_status(&best_hash) {
1136 Err(e) => {
1137 debug!(target: LOG_TARGET, "Error reading blockchain: {e}");
1138 Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
1139 },
1140 Ok(BlockStatus::KnownBad) => {
1141 info!(
1142 "💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})."
1143 );
1144 Err(BadPeer(peer_id, rep::BAD_BLOCK))
1145 },
1146 Ok(BlockStatus::Unknown) => {
1147 if best_number.is_zero() {
1148 info!(
1149 "💔 New peer {} with unknown genesis hash {} ({}).",
1150 peer_id, best_hash, best_number,
1151 );
1152 return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH));
1153 }
1154
1155 if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS as usize {
1159 debug!(
1160 target: LOG_TARGET,
1161 "New peer {} with unknown best hash {} ({}), assuming common block.",
1162 peer_id,
1163 self.best_queued_hash,
1164 self.best_queued_number
1165 );
1166 self.peers.insert(
1167 peer_id,
1168 PeerSync {
1169 peer_id,
1170 common_number: self.best_queued_number,
1171 best_hash,
1172 best_number,
1173 state: PeerSyncState::Available,
1174 },
1175 );
1176 return Ok(None);
1177 }
1178
1179 let (state, req) = if self.best_queued_number.is_zero() {
1181 debug!(
1182 target: LOG_TARGET,
1183 "New peer {peer_id} with best hash {best_hash} ({best_number}).",
1184 );
1185
1186 (PeerSyncState::Available, None)
1187 } else {
1188 let common_best = std::cmp::min(self.best_queued_number, best_number);
1189
1190 debug!(
1191 target: LOG_TARGET,
1192 "New peer {} with unknown best hash {} ({}), searching for common ancestor.",
1193 peer_id,
1194 best_hash,
1195 best_number
1196 );
1197
1198 (
1199 PeerSyncState::AncestorSearch {
1200 current: common_best,
1201 start: self.best_queued_number,
1202 state: AncestorSearchState::ExponentialBackoff(One::one()),
1203 },
1204 Some(ancestry_request::<B>(common_best)),
1205 )
1206 };
1207
1208 self.allowed_requests.add(&peer_id);
1209 self.peers.insert(
1210 peer_id,
1211 PeerSync {
1212 peer_id,
1213 common_number: Zero::zero(),
1214 best_hash,
1215 best_number,
1216 state,
1217 },
1218 );
1219
1220 Ok(req)
1221 },
1222 Ok(BlockStatus::Queued) |
1223 Ok(BlockStatus::InChainWithState) |
1224 Ok(BlockStatus::InChainPruned) => {
1225 debug!(
1226 target: LOG_TARGET,
1227 "New peer {peer_id} with known best hash {best_hash} ({best_number}).",
1228 );
1229 self.peers.insert(
1230 peer_id,
1231 PeerSync {
1232 peer_id,
1233 common_number: std::cmp::min(self.best_queued_number, best_number),
1234 best_hash,
1235 best_number,
1236 state: PeerSyncState::Available,
1237 },
1238 );
1239 self.allowed_requests.add(&peer_id);
1240 Ok(None)
1241 },
1242 }
1243 }
1244
1245 fn create_block_request_action(
1246 &mut self,
1247 peer_id: PeerId,
1248 request: BlockRequest<B>,
1249 ) -> SyncingAction<B> {
1250 let downloader = self.block_downloader.clone();
1251
1252 SyncingAction::StartRequest {
1253 peer_id,
1254 key: Self::STRATEGY_KEY,
1255 request: async move {
1256 Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
1257 |(response, protocol_name)| {
1258 let decoded_response =
1259 downloader.block_response_into_blocks(&request, response);
1260 let result = Box::new((request, decoded_response)) as Box<dyn Any + Send>;
1261 Ok((result, protocol_name))
1262 },
1263 ))
1264 }
1265 .boxed(),
1266 remove_obsolete: true,
1269 }
1270 }
1271
1272 #[must_use]
1274 fn on_block_data(
1275 &mut self,
1276 peer_id: &PeerId,
1277 request: Option<BlockRequest<B>>,
1278 response: BlockResponse<B>,
1279 ) -> Result<(), BadPeer> {
1280 self.downloaded_blocks += response.blocks.len();
1281 let mut gap = false;
1282 let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
1283 let mut blocks = response.blocks;
1284 if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
1285 trace!(target: LOG_TARGET, "Reversing incoming block list");
1286 blocks.reverse()
1287 }
1288 self.allowed_requests.add(peer_id);
1289 if let Some(request) = request {
1290 match &mut peer.state {
1291 PeerSyncState::DownloadingNew(_) => {
1292 self.blocks.clear_peer_download(peer_id);
1293 peer.state = PeerSyncState::Available;
1294 if let Some(start_block) =
1295 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1296 {
1297 self.blocks.insert(start_block, blocks, *peer_id);
1298 }
1299 self.ready_blocks()
1300 },
1301 PeerSyncState::DownloadingGap(_) => {
1302 peer.state = PeerSyncState::Available;
1303 if let Some(gap_sync) = &mut self.gap_sync {
1304 gap_sync.blocks.clear_peer_download(peer_id);
1305 if let Some(start_block) =
1306 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1307 {
1308 gap_sync.blocks.insert(start_block, blocks, *peer_id);
1309 }
1310 gap = true;
1311 let mut batch_gap_sync_stats = GapSyncStats::new();
1312 let blocks: Vec<_> = gap_sync
1313 .blocks
1314 .ready_blocks(gap_sync.best_queued_number + One::one())
1315 .into_iter()
1316 .map(|block_data| {
1317 let justifications =
1318 block_data.block.justifications.or_else(|| {
1319 legacy_justification_mapping(
1320 block_data.block.justification,
1321 )
1322 });
1323 let gap_sync_stats = GapSyncStats {
1324 header_bytes: block_data
1325 .block
1326 .header
1327 .as_ref()
1328 .map(|h| h.encoded_size())
1329 .unwrap_or(0),
1330 body_bytes: block_data
1331 .block
1332 .body
1333 .as_ref()
1334 .map(|b| b.encoded_size())
1335 .unwrap_or(0),
1336 justification_bytes: justifications
1337 .as_ref()
1338 .map(|j| j.encoded_size())
1339 .unwrap_or(0),
1340 };
1341 batch_gap_sync_stats += gap_sync_stats;
1342
1343 IncomingBlock {
1344 hash: block_data.block.hash,
1345 header: block_data.block.header,
1346 body: block_data.block.body,
1347 indexed_body: block_data.block.indexed_body,
1348 justifications,
1349 origin: block_data.origin,
1350 allow_missing_state: true,
1351 import_existing: true,
1354 skip_execution: true,
1355 state: None,
1356 }
1357 })
1358 .collect();
1359
1360 debug!(
1361 target: LOG_TARGET,
1362 "Drained {} gap blocks from {}",
1363 blocks.len(),
1364 gap_sync.best_queued_number,
1365 );
1366
1367 gap_sync.stats += batch_gap_sync_stats;
1368
1369 if blocks.len() > 0 {
1370 trace!(
1371 target: LOG_TARGET,
1372 "Gap sync cumulative stats: {}",
1373 gap_sync.stats
1374 );
1375 }
1376 blocks
1377 } else {
1378 debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
1379 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1380 }
1381 },
1382 PeerSyncState::DownloadingStale(_) => {
1383 peer.state = PeerSyncState::Available;
1384 if blocks.is_empty() {
1385 debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
1386 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1387 }
1388 validate_blocks::<B>(&blocks, peer_id, Some(request))?;
1389 blocks
1390 .into_iter()
1391 .map(|b| {
1392 let justifications = b
1393 .justifications
1394 .or_else(|| legacy_justification_mapping(b.justification));
1395 IncomingBlock {
1396 hash: b.hash,
1397 header: b.header,
1398 body: b.body,
1399 indexed_body: None,
1400 justifications,
1401 origin: Some(*peer_id),
1402 allow_missing_state: true,
1403 import_existing: self.import_existing,
1404 skip_execution: self.skip_execution(),
1405 state: None,
1406 }
1407 })
1408 .collect()
1409 },
1410 PeerSyncState::AncestorSearch { current, start, state } => {
1411 let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
1412 (Some(block), Ok(maybe_our_block_hash)) => {
1413 trace!(
1414 target: LOG_TARGET,
1415 "Got ancestry block #{} ({}) from peer {}",
1416 current,
1417 block.hash,
1418 peer_id,
1419 );
1420 maybe_our_block_hash.filter(|x| x == &block.hash)
1421 },
1422 (None, _) => {
1423 debug!(
1424 target: LOG_TARGET,
1425 "Invalid response when searching for ancestor from {peer_id}",
1426 );
1427 return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR));
1428 },
1429 (_, Err(e)) => {
1430 info!(
1431 target: LOG_TARGET,
1432 "❌ Error answering legitimate blockchain query: {e}",
1433 );
1434 return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR));
1435 },
1436 };
1437 if matching_hash.is_some() {
1438 if *start < self.best_queued_number &&
1439 self.best_queued_number <= peer.best_number
1440 {
1441 trace!(
1445 target: LOG_TARGET,
1446 "Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
1447 *peer_id,
1448 peer.common_number,
1449 self.best_queued_number,
1450 );
1451 peer.common_number = self.best_queued_number;
1452 } else if peer.common_number < *current {
1453 trace!(
1454 target: LOG_TARGET,
1455 "Ancestry search: updating peer {} common number from={} => to={}.",
1456 *peer_id,
1457 peer.common_number,
1458 *current,
1459 );
1460 peer.common_number = *current;
1461 }
1462 }
1463 if matching_hash.is_none() && current.is_zero() {
1464 trace!(
1465 target: LOG_TARGET,
1466 "Ancestry search: genesis mismatch for peer {peer_id}",
1467 );
1468 return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH));
1469 }
1470 if let Some((next_state, next_num)) =
1471 handle_ancestor_search_state(state, *current, matching_hash.is_some())
1472 {
1473 peer.state = PeerSyncState::AncestorSearch {
1474 current: next_num,
1475 start: *start,
1476 state: next_state,
1477 };
1478 let request = ancestry_request::<B>(next_num);
1479 let action = self.create_block_request_action(*peer_id, request);
1480 self.actions.push(action);
1481 return Ok(());
1482 } else {
1483 trace!(
1486 target: LOG_TARGET,
1487 "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
1488 self.best_queued_hash,
1489 self.best_queued_number,
1490 peer.best_hash,
1491 peer.best_number,
1492 matching_hash,
1493 peer.common_number,
1494 );
1495 if peer.common_number < peer.best_number &&
1496 peer.best_number < self.best_queued_number
1497 {
1498 trace!(
1499 target: LOG_TARGET,
1500 "Added fork target {} for {}",
1501 peer.best_hash,
1502 peer_id,
1503 );
1504 self.fork_targets
1505 .entry(peer.best_hash)
1506 .or_insert_with(|| {
1507 if let Some(metrics) = &self.metrics {
1508 metrics.fork_targets.inc();
1509 }
1510
1511 ForkTarget {
1512 number: peer.best_number,
1513 parent_hash: None,
1514 peers: Default::default(),
1515 }
1516 })
1517 .peers
1518 .insert(*peer_id);
1519 }
1520 peer.state = PeerSyncState::Available;
1521 return Ok(());
1522 }
1523 },
1524 PeerSyncState::Available |
1525 PeerSyncState::DownloadingJustification(..) |
1526 PeerSyncState::DownloadingState => Vec::new(),
1527 }
1528 } else {
1529 validate_blocks::<B>(&blocks, peer_id, None)?;
1531 blocks
1532 .into_iter()
1533 .map(|b| {
1534 let justifications = b
1535 .justifications
1536 .or_else(|| legacy_justification_mapping(b.justification));
1537 IncomingBlock {
1538 hash: b.hash,
1539 header: b.header,
1540 body: b.body,
1541 indexed_body: None,
1542 justifications,
1543 origin: Some(*peer_id),
1544 allow_missing_state: true,
1545 import_existing: false,
1546 skip_execution: true,
1547 state: None,
1548 }
1549 })
1550 .collect()
1551 }
1552 } else {
1553 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1555 };
1556
1557 self.validate_and_queue_blocks(new_blocks, gap);
1558
1559 Ok(())
1560 }
1561
1562 fn on_block_response(
1563 &mut self,
1564 peer_id: &PeerId,
1565 key: StrategyKey,
1566 request: BlockRequest<B>,
1567 blocks: Vec<BlockData<B>>,
1568 ) -> Result<(), BadPeer> {
1569 if key != Self::STRATEGY_KEY {
1570 error!(
1571 target: LOG_TARGET,
1572 "`on_block_response()` called with unexpected key {key:?} for chain sync",
1573 );
1574 debug_assert!(false);
1575 }
1576 let block_response = BlockResponse::<B> { id: request.id, blocks };
1577
1578 let blocks_range = || match (
1579 block_response
1580 .blocks
1581 .first()
1582 .and_then(|b| b.header.as_ref().map(|h| h.number())),
1583 block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
1584 ) {
1585 (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
1586 (Some(first), Some(_)) => format!(" ({})", first),
1587 _ => Default::default(),
1588 };
1589
1590 trace!(
1591 target: LOG_TARGET,
1592 "BlockResponse {} from {} with {} blocks {}",
1593 block_response.id,
1594 peer_id,
1595 block_response.blocks.len(),
1596 blocks_range(),
1597 );
1598
1599 if request.fields == BlockAttributes::JUSTIFICATION {
1600 self.on_block_justification(*peer_id, block_response)
1601 } else {
1602 self.on_block_data(peer_id, Some(request), block_response)
1603 }
1604 }
1605
1606 #[must_use]
1608 fn on_block_justification(
1609 &mut self,
1610 peer_id: PeerId,
1611 response: BlockResponse<B>,
1612 ) -> Result<(), BadPeer> {
1613 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
1614 peer
1615 } else {
1616 error!(
1617 target: LOG_TARGET,
1618 "💔 Called on_block_justification with a peer ID of an unknown peer",
1619 );
1620 return Ok(());
1621 };
1622
1623 self.allowed_requests.add(&peer_id);
1624 if let PeerSyncState::DownloadingJustification(hash) = peer.state {
1625 peer.state = PeerSyncState::Available;
1626
1627 let justification = if let Some(block) = response.blocks.into_iter().next() {
1629 if hash != block.hash {
1630 warn!(
1631 target: LOG_TARGET,
1632 "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}",
1633 peer_id,
1634 hash,
1635 block.hash,
1636 );
1637 return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION));
1638 }
1639
1640 block
1641 .justifications
1642 .or_else(|| legacy_justification_mapping(block.justification))
1643 } else {
1644 trace!(
1647 target: LOG_TARGET,
1648 "Peer {peer_id:?} provided empty response for justification request {hash:?}",
1649 );
1650
1651 None
1652 };
1653
1654 if let Some((peer_id, hash, number, justifications)) =
1655 self.extra_justifications.on_response(peer_id, justification)
1656 {
1657 self.actions.push(SyncingAction::ImportJustifications {
1658 peer_id,
1659 hash,
1660 number,
1661 justifications,
1662 });
1663 return Ok(());
1664 }
1665 }
1666
1667 Ok(())
1668 }
1669
1670 fn median_seen(&self) -> Option<NumberFor<B>> {
1672 let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
1673
1674 if best_seens.is_empty() {
1675 None
1676 } else {
1677 let middle = best_seens.len() / 2;
1678
1679 Some(*best_seens.select_nth_unstable(middle).1)
1681 }
1682 }
1683
1684 fn skip_execution(&self) -> bool {
1685 match self.mode {
1686 ChainSyncMode::Full => false,
1687 ChainSyncMode::LightState { .. } => true,
1688 }
1689 }
1690
1691 fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec<IncomingBlock<B>>, gap: bool) {
1692 let orig_len = new_blocks.len();
1693 new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
1694 if new_blocks.len() != orig_len {
1695 debug!(
1696 target: LOG_TARGET,
1697 "Ignoring {} blocks that are already queued",
1698 orig_len - new_blocks.len(),
1699 );
1700 }
1701
1702 let origin = if gap {
1703 BlockOrigin::GapSync
1705 } else if !self.status().state.is_major_syncing() {
1706 BlockOrigin::NetworkBroadcast
1708 } else {
1709 BlockOrigin::NetworkInitialSync
1711 };
1712
1713 if let Some((h, n)) = new_blocks
1714 .last()
1715 .and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
1716 {
1717 trace!(
1718 target: LOG_TARGET,
1719 "Accepted {} blocks ({:?}) with origin {:?}",
1720 new_blocks.len(),
1721 h,
1722 origin,
1723 );
1724 self.on_block_queued(h, n)
1725 }
1726 self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
1727 if let Some(metrics) = &self.metrics {
1728 metrics
1729 .queued_blocks
1730 .set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
1731 }
1732
1733 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks })
1734 }
1735
1736 fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
1737 if let Some(peer) = self.peers.get_mut(peer_id) {
1738 peer.update_common_number(new_common);
1739 }
1740 }
1741
1742 fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
1747 if self.fork_targets.remove(hash).is_some() {
1748 if let Some(metrics) = &self.metrics {
1749 metrics.fork_targets.dec();
1750 }
1751 trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
1752 }
1753 if let Some(gap_sync) = &mut self.gap_sync {
1754 if number > gap_sync.best_queued_number && number <= gap_sync.target {
1755 gap_sync.best_queued_number = number;
1756 }
1757 }
1758 if number > self.best_queued_number {
1759 self.best_queued_number = number;
1760 self.best_queued_hash = *hash;
1761 for (n, peer) in self.peers.iter_mut() {
1763 if let PeerSyncState::AncestorSearch { .. } = peer.state {
1764 continue;
1766 }
1767 let new_common_number =
1768 if peer.best_number >= number { number } else { peer.best_number };
1769 trace!(
1770 target: LOG_TARGET,
1771 "Updating peer {} info, ours={}, common={}->{}, their best={}",
1772 n,
1773 number,
1774 peer.common_number,
1775 new_common_number,
1776 peer.best_number,
1777 );
1778 peer.common_number = new_common_number;
1779 }
1780 }
1781 self.allowed_requests.set_all();
1782 }
1783
1784 fn restart(&mut self) {
1788 self.blocks.clear();
1789 if let Err(e) = self.reset_sync_start_point() {
1790 warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}");
1791 }
1792 self.allowed_requests.set_all();
1793 debug!(
1794 target: LOG_TARGET,
1795 "Restarted with {} ({})",
1796 self.best_queued_number,
1797 self.best_queued_hash,
1798 );
1799 let old_peers = std::mem::take(&mut self.peers);
1800
1801 old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| {
1802 match peer_sync.state {
1803 PeerSyncState::Available => {
1804 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1805 },
1806 PeerSyncState::AncestorSearch { .. } |
1807 PeerSyncState::DownloadingNew(_) |
1808 PeerSyncState::DownloadingStale(_) |
1809 PeerSyncState::DownloadingGap(_) |
1810 PeerSyncState::DownloadingState => {
1811 self.actions
1813 .push(SyncingAction::CancelRequest { peer_id, key: Self::STRATEGY_KEY });
1814 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1815 },
1816 PeerSyncState::DownloadingJustification(_) => {
1817 trace!(
1821 target: LOG_TARGET,
1822 "Keeping peer {} after restart, updating common number from={} => to={} (our best).",
1823 peer_id,
1824 peer_sync.common_number,
1825 self.best_queued_number,
1826 );
1827 peer_sync.common_number = self.best_queued_number;
1828 self.peers.insert(peer_id, peer_sync);
1829 },
1830 }
1831 });
1832 }
1833
1834 fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
1837 let info = self.client.info();
1838 debug!(target: LOG_TARGET, "Restarting sync with client info {info:?}");
1839
1840 if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
1841 warn!(
1842 target: LOG_TARGET,
1843 "Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
1844 );
1845 self.mode = ChainSyncMode::Full;
1846 }
1847
1848 self.import_existing = false;
1849 self.best_queued_hash = info.best_hash;
1850 self.best_queued_number = info.best_number;
1851
1852 if self.mode == ChainSyncMode::Full &&
1853 self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
1854 {
1855 self.import_existing = true;
1856 if let Some((hash, number)) = info.finalized_state {
1858 debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
1859 self.best_queued_hash = hash;
1860 self.best_queued_number = number;
1861 } else {
1862 debug!(target: LOG_TARGET, "Restarting from genesis");
1863 self.best_queued_hash = Default::default();
1864 self.best_queued_number = Zero::zero();
1865 }
1866 }
1867
1868 if let Some(BlockGap { start, end, .. }) = info.block_gap {
1869 let old_gap = self.gap_sync.take().map(|g| (g.best_queued_number, g.target));
1870 debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end} (old gap best and target: {old_gap:?})");
1871 self.gap_sync = Some(GapSync {
1872 best_queued_number: start - One::one(),
1873 target: end,
1874 blocks: BlockCollection::new(),
1875 stats: GapSyncStats::new(),
1876 });
1877 }
1878 trace!(
1879 target: LOG_TARGET,
1880 "Restarted sync at #{} ({:?})",
1881 self.best_queued_number,
1882 self.best_queued_hash,
1883 );
1884 Ok(())
1885 }
1886
1887 fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
1889 if self.queue_blocks.contains(hash) {
1890 return Ok(BlockStatus::Queued);
1891 }
1892 self.client.block_status(*hash)
1893 }
1894
1895 fn is_known(&self, hash: &B::Hash) -> bool {
1897 self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
1898 }
1899
1900 fn is_already_downloading(&self, hash: &B::Hash) -> bool {
1902 self.peers
1903 .iter()
1904 .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
1905 }
1906
1907 fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
1909 self.blocks
1910 .ready_blocks(self.best_queued_number + One::one())
1911 .into_iter()
1912 .map(|block_data| {
1913 let justifications = block_data
1914 .block
1915 .justifications
1916 .or_else(|| legacy_justification_mapping(block_data.block.justification));
1917 IncomingBlock {
1918 hash: block_data.block.hash,
1919 header: block_data.block.header,
1920 body: block_data.block.body,
1921 indexed_body: block_data.block.indexed_body,
1922 justifications,
1923 origin: block_data.origin,
1924 allow_missing_state: true,
1925 import_existing: self.import_existing,
1926 skip_execution: self.skip_execution(),
1927 state: None,
1928 }
1929 })
1930 .collect()
1931 }
1932
1933 fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1935 let peers = &mut self.peers;
1936 let mut matcher = self.extra_justifications.matcher();
1937 std::iter::from_fn(move || {
1938 if let Some((peer, request)) = matcher.next(peers) {
1939 peers
1940 .get_mut(&peer)
1941 .expect(
1942 "`Matcher::next` guarantees the `PeerId` comes from the given peers; qed",
1943 )
1944 .state = PeerSyncState::DownloadingJustification(request.0);
1945 let req = BlockRequest::<B> {
1946 id: 0,
1947 fields: BlockAttributes::JUSTIFICATION,
1948 from: FromBlock::Hash(request.0),
1949 direction: Direction::Ascending,
1950 max: Some(1),
1951 };
1952 Some((peer, req))
1953 } else {
1954 None
1955 }
1956 })
1957 .collect()
1958 }
1959
1960 fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1962 if self.allowed_requests.is_empty() || self.state_sync.is_some() {
1963 return Vec::new();
1964 }
1965
1966 if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
1967 trace!(target: LOG_TARGET, "Too many blocks in the queue.");
1968 return Vec::new();
1969 }
1970 let is_major_syncing = self.status().state.is_major_syncing();
1971 let mode = self.mode;
1972 let is_archive = self.archive_blocks;
1973 let blocks = &mut self.blocks;
1974 let fork_targets = &mut self.fork_targets;
1975 let last_finalized =
1976 std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
1977 let best_queued = self.best_queued_number;
1978 let client = &self.client;
1979 let queue_blocks = &self.queue_blocks;
1980 let allowed_requests = self.allowed_requests.clone();
1981 let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
1982 let max_blocks_per_request = self.max_blocks_per_request;
1983 let gap_sync = &mut self.gap_sync;
1984 let disconnected_peers = &mut self.disconnected_peers;
1985 let metrics = self.metrics.as_ref();
1986 let requests = self
1987 .peers
1988 .iter_mut()
1989 .filter_map(move |(&id, peer)| {
1990 if !peer.state.is_available() ||
1991 !allowed_requests.contains(&id) ||
1992 !disconnected_peers.is_peer_available(&id)
1993 {
1994 return None;
1995 }
1996
1997 if best_queued.saturating_sub(peer.common_number) >
2003 MAX_BLOCKS_TO_LOOK_BACKWARDS.into() &&
2004 best_queued < peer.best_number &&
2005 peer.common_number < last_finalized &&
2006 queue_blocks.len() <= MAJOR_SYNC_BLOCKS as usize
2007 {
2008 trace!(
2009 target: LOG_TARGET,
2010 "Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
2011 id,
2012 peer.common_number,
2013 best_queued,
2014 );
2015 let current = std::cmp::min(peer.best_number, best_queued);
2016 peer.state = PeerSyncState::AncestorSearch {
2017 current,
2018 start: best_queued,
2019 state: AncestorSearchState::ExponentialBackoff(One::one()),
2020 };
2021 Some((id, ancestry_request::<B>(current)))
2022 } else if let Some((range, req)) = peer_block_request(
2023 &id,
2024 peer,
2025 blocks,
2026 mode.required_block_attributes(false, is_archive),
2027 max_parallel,
2028 max_blocks_per_request,
2029 last_finalized,
2030 best_queued,
2031 ) {
2032 peer.state = PeerSyncState::DownloadingNew(range.start);
2033 trace!(
2034 target: LOG_TARGET,
2035 "New block request for {}, (best:{}, common:{}) {:?}",
2036 id,
2037 peer.best_number,
2038 peer.common_number,
2039 req,
2040 );
2041 Some((id, req))
2042 } else if let Some((hash, req)) = fork_sync_request(
2043 &id,
2044 fork_targets,
2045 best_queued,
2046 last_finalized,
2047 mode.required_block_attributes(false, is_archive),
2048 |hash| {
2049 if queue_blocks.contains(hash) {
2050 BlockStatus::Queued
2051 } else {
2052 client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
2053 }
2054 },
2055 max_blocks_per_request,
2056 metrics,
2057 ) {
2058 trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
2059 peer.state = PeerSyncState::DownloadingStale(hash);
2060 Some((id, req))
2061 } else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
2062 peer_gap_block_request(
2063 &id,
2064 peer,
2065 &mut sync.blocks,
2066 mode.required_block_attributes(true, is_archive),
2067 sync.target,
2068 sync.best_queued_number,
2069 max_blocks_per_request,
2070 )
2071 }) {
2072 peer.state = PeerSyncState::DownloadingGap(range.start);
2073 trace!(
2074 target: LOG_TARGET,
2075 "New gap block request for {}, (best:{}, common:{}) {:?}",
2076 id,
2077 peer.best_number,
2078 peer.common_number,
2079 req,
2080 );
2081 Some((id, req))
2082 } else {
2083 None
2084 }
2085 })
2086 .collect::<Vec<_>>();
2087
2088 if !requests.is_empty() {
2091 self.allowed_requests.take();
2092 }
2093
2094 requests
2095 }
2096
2097 fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
2099 if self.allowed_requests.is_empty() {
2100 return None;
2101 }
2102 if self.state_sync.is_some() &&
2103 self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
2104 {
2105 return None;
2107 }
2108 if let Some(sync) = &self.state_sync {
2109 if sync.is_complete() {
2110 return None;
2111 }
2112
2113 for (id, peer) in self.peers.iter_mut() {
2114 if peer.state.is_available() &&
2115 peer.common_number >= sync.target_number() &&
2116 self.disconnected_peers.is_peer_available(&id)
2117 {
2118 peer.state = PeerSyncState::DownloadingState;
2119 let request = sync.next_request();
2120 trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
2121 self.allowed_requests.clear();
2122 return Some((*id, request));
2123 }
2124 }
2125 }
2126 None
2127 }
2128
2129 #[must_use]
2130 fn on_state_data(&mut self, peer_id: &PeerId, response: &[u8]) -> Result<(), BadPeer> {
2131 let response = match StateResponse::decode(response) {
2132 Ok(response) => response,
2133 Err(error) => {
2134 debug!(
2135 target: LOG_TARGET,
2136 "Failed to decode state response from peer {peer_id:?}: {error:?}.",
2137 );
2138
2139 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2140 },
2141 };
2142
2143 if let Some(peer) = self.peers.get_mut(peer_id) {
2144 if let PeerSyncState::DownloadingState = peer.state {
2145 peer.state = PeerSyncState::Available;
2146 self.allowed_requests.set_all();
2147 }
2148 }
2149 let import_result = if let Some(sync) = &mut self.state_sync {
2150 debug!(
2151 target: LOG_TARGET,
2152 "Importing state data from {} with {} keys, {} proof nodes.",
2153 peer_id,
2154 response.entries.len(),
2155 response.proof.len(),
2156 );
2157 sync.import(response)
2158 } else {
2159 debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
2160 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2161 };
2162
2163 match import_result {
2164 ImportResult::Import(hash, header, state, body, justifications) => {
2165 let origin = BlockOrigin::NetworkInitialSync;
2166 let block = IncomingBlock {
2167 hash,
2168 header: Some(header),
2169 body,
2170 indexed_body: None,
2171 justifications,
2172 origin: None,
2173 allow_missing_state: true,
2174 import_existing: true,
2175 skip_execution: self.skip_execution(),
2176 state: Some(state),
2177 };
2178 debug!(target: LOG_TARGET, "State download is complete. Import is queued");
2179 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
2180 Ok(())
2181 },
2182 ImportResult::Continue => Ok(()),
2183 ImportResult::BadResponse => {
2184 debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
2185 Err(BadPeer(*peer_id, rep::BAD_BLOCK))
2186 },
2187 }
2188 }
2189
2190 fn attempt_state_sync(
2191 &mut self,
2192 finalized_hash: B::Hash,
2193 finalized_number: NumberFor<B>,
2194 skip_proofs: bool,
2195 ) {
2196 let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect();
2197 heads.sort();
2198 let median = heads[heads.len() / 2];
2199 if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
2200 if let Ok(Some(header)) = self.client.header(finalized_hash) {
2201 log::debug!(
2202 target: LOG_TARGET,
2203 "Starting state sync for #{finalized_number} ({finalized_hash})",
2204 );
2205 self.state_sync =
2206 Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs));
2207 self.allowed_requests.set_all();
2208 } else {
2209 log::error!(
2210 target: LOG_TARGET,
2211 "Failed to start state sync: header for finalized block \
2212 #{finalized_number} ({finalized_hash}) is not available",
2213 );
2214 debug_assert!(false);
2215 }
2216 }
2217 }
2218
2219 #[cfg(test)]
2221 #[must_use]
2222 fn take_actions(&mut self) -> impl Iterator<Item = SyncingAction<B>> {
2223 std::mem::take(&mut self.actions).into_iter()
2224 }
2225}
2226
2227fn legacy_justification_mapping(
2232 justification: Option<EncodedJustification>,
2233) -> Option<Justifications> {
2234 justification.map(|just| (*b"FRNK", just).into())
2235}
2236
2237fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> {
2240 BlockRequest::<B> {
2241 id: 0,
2242 fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
2243 from: FromBlock::Number(block),
2244 direction: Direction::Ascending,
2245 max: Some(1),
2246 }
2247}
2248
2249#[derive(Copy, Clone, Eq, PartialEq, Debug)]
2252pub(crate) enum AncestorSearchState<B: BlockT> {
2253 ExponentialBackoff(NumberFor<B>),
2256 BinarySearch(NumberFor<B>, NumberFor<B>),
2259}
2260
2261fn handle_ancestor_search_state<B: BlockT>(
2269 state: &AncestorSearchState<B>,
2270 curr_block_num: NumberFor<B>,
2271 block_hash_match: bool,
2272) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
2273 let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
2274 match state {
2275 AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
2276 let next_distance_to_tip = *next_distance_to_tip;
2277 if block_hash_match && next_distance_to_tip == One::one() {
2278 return None;
2281 }
2282 if block_hash_match {
2283 let left = curr_block_num;
2284 let right = left + next_distance_to_tip / two;
2285 let middle = left + (right - left) / two;
2286 Some((AncestorSearchState::BinarySearch(left, right), middle))
2287 } else {
2288 let next_block_num =
2289 curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or_else(Zero::zero);
2290 let next_distance_to_tip = next_distance_to_tip * two;
2291 Some((
2292 AncestorSearchState::ExponentialBackoff(next_distance_to_tip),
2293 next_block_num,
2294 ))
2295 }
2296 },
2297 AncestorSearchState::BinarySearch(mut left, mut right) => {
2298 if left >= curr_block_num {
2299 return None;
2300 }
2301 if block_hash_match {
2302 left = curr_block_num;
2303 } else {
2304 right = curr_block_num;
2305 }
2306 assert!(right >= left);
2307 let middle = left + (right - left) / two;
2308 if middle == curr_block_num {
2309 None
2310 } else {
2311 Some((AncestorSearchState::BinarySearch(left, right), middle))
2312 }
2313 },
2314 }
2315}
2316
2317fn peer_block_request<B: BlockT>(
2319 id: &PeerId,
2320 peer: &PeerSync<B>,
2321 blocks: &mut BlockCollection<B>,
2322 attrs: BlockAttributes,
2323 max_parallel_downloads: u32,
2324 max_blocks_per_request: u32,
2325 finalized: NumberFor<B>,
2326 best_num: NumberFor<B>,
2327) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2328 if best_num >= peer.best_number {
2329 return None;
2331 } else if peer.common_number < finalized {
2332 trace!(
2333 target: LOG_TARGET,
2334 "Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
2335 id, peer.common_number, finalized, peer.best_number, best_num,
2336 );
2337 }
2338 let range = blocks.needed_blocks(
2339 *id,
2340 max_blocks_per_request,
2341 peer.best_number,
2342 peer.common_number,
2343 max_parallel_downloads,
2344 MAX_DOWNLOAD_AHEAD,
2345 )?;
2346
2347 let last = range.end.saturating_sub(One::one());
2349
2350 let from = if peer.best_number == last {
2351 FromBlock::Hash(peer.best_hash)
2352 } else {
2353 FromBlock::Number(last)
2354 };
2355
2356 let request = BlockRequest::<B> {
2357 id: 0,
2358 fields: attrs,
2359 from,
2360 direction: Direction::Descending,
2361 max: Some((range.end - range.start).saturated_into::<u32>()),
2362 };
2363
2364 Some((range, request))
2365}
2366
2367fn peer_gap_block_request<B: BlockT>(
2369 id: &PeerId,
2370 peer: &PeerSync<B>,
2371 blocks: &mut BlockCollection<B>,
2372 attrs: BlockAttributes,
2373 target: NumberFor<B>,
2374 common_number: NumberFor<B>,
2375 max_blocks_per_request: u32,
2376) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2377 let range = blocks.needed_blocks(
2378 *id,
2379 max_blocks_per_request,
2380 std::cmp::min(peer.best_number, target),
2381 common_number,
2382 1,
2383 MAX_DOWNLOAD_AHEAD,
2384 )?;
2385
2386 let last = range.end.saturating_sub(One::one());
2388 let from = FromBlock::Number(last);
2389
2390 let request = BlockRequest::<B> {
2391 id: 0,
2392 fields: attrs,
2393 from,
2394 direction: Direction::Descending,
2395 max: Some((range.end - range.start).saturated_into::<u32>()),
2396 };
2397 Some((range, request))
2398}
2399
2400fn fork_sync_request<B: BlockT>(
2402 id: &PeerId,
2403 fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
2404 best_num: NumberFor<B>,
2405 finalized: NumberFor<B>,
2406 attributes: BlockAttributes,
2407 check_block: impl Fn(&B::Hash) -> BlockStatus,
2408 max_blocks_per_request: u32,
2409 metrics: Option<&Metrics>,
2410) -> Option<(B::Hash, BlockRequest<B>)> {
2411 fork_targets.retain(|hash, r| {
2412 if r.number <= finalized {
2413 trace!(
2414 target: LOG_TARGET,
2415 "Removed expired fork sync request {:?} (#{})",
2416 hash,
2417 r.number,
2418 );
2419 return false;
2420 }
2421 if check_block(hash) != BlockStatus::Unknown {
2422 trace!(
2423 target: LOG_TARGET,
2424 "Removed obsolete fork sync request {:?} (#{})",
2425 hash,
2426 r.number,
2427 );
2428 return false;
2429 }
2430 true
2431 });
2432 if let Some(metrics) = metrics {
2433 metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
2434 }
2435 for (hash, r) in fork_targets {
2436 if !r.peers.contains(&id) {
2437 continue;
2438 }
2439 if r.number <= best_num ||
2442 (r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
2443 {
2444 let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
2445 let count = if parent_status == BlockStatus::Unknown {
2446 (r.number - finalized).saturated_into::<u32>() } else {
2448 1
2450 };
2451 trace!(
2452 target: LOG_TARGET,
2453 "Downloading requested fork {hash:?} from {id}, {count} blocks",
2454 );
2455 return Some((
2456 *hash,
2457 BlockRequest::<B> {
2458 id: 0,
2459 fields: attributes,
2460 from: FromBlock::Hash(*hash),
2461 direction: Direction::Descending,
2462 max: Some(count),
2463 },
2464 ));
2465 } else {
2466 trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
2467 }
2468 }
2469 None
2470}
2471
2472fn is_descendent_of<Block, T>(
2474 client: &T,
2475 base: &Block::Hash,
2476 block: &Block::Hash,
2477) -> sp_blockchain::Result<bool>
2478where
2479 Block: BlockT,
2480 T: HeaderMetadata<Block, Error = sp_blockchain::Error> + ?Sized,
2481{
2482 if base == block {
2483 return Ok(false);
2484 }
2485
2486 let ancestor = sp_blockchain::lowest_common_ancestor(client, *block, *base)?;
2487
2488 Ok(ancestor.hash == *base)
2489}
2490
2491pub fn validate_blocks<Block: BlockT>(
2496 blocks: &Vec<BlockData<Block>>,
2497 peer_id: &PeerId,
2498 request: Option<BlockRequest<Block>>,
2499) -> Result<Option<NumberFor<Block>>, BadPeer> {
2500 if let Some(request) = request {
2501 if Some(blocks.len() as _) > request.max {
2502 debug!(
2503 target: LOG_TARGET,
2504 "Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
2505 peer_id,
2506 request.max,
2507 blocks.len(),
2508 );
2509
2510 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2511 }
2512
2513 let block_header =
2514 if request.direction == Direction::Descending { blocks.last() } else { blocks.first() }
2515 .and_then(|b| b.header.as_ref());
2516
2517 let expected_block = block_header.as_ref().map_or(false, |h| match request.from {
2518 FromBlock::Hash(hash) => h.hash() == hash,
2519 FromBlock::Number(n) => h.number() == &n,
2520 });
2521
2522 if !expected_block {
2523 debug!(
2524 target: LOG_TARGET,
2525 "Received block that was not requested. Requested {:?}, got {:?}.",
2526 request.from,
2527 block_header,
2528 );
2529
2530 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2531 }
2532
2533 if request.fields.contains(BlockAttributes::HEADER) &&
2534 blocks.iter().any(|b| b.header.is_none())
2535 {
2536 trace!(
2537 target: LOG_TARGET,
2538 "Missing requested header for a block in response from {peer_id}.",
2539 );
2540
2541 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2542 }
2543
2544 if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
2545 {
2546 trace!(
2547 target: LOG_TARGET,
2548 "Missing requested body for a block in response from {peer_id}.",
2549 );
2550
2551 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2552 }
2553 }
2554
2555 for b in blocks {
2556 if let Some(header) = &b.header {
2557 let hash = header.hash();
2558 if hash != b.hash {
2559 debug!(
2560 target: LOG_TARGET,
2561 "Bad header received from {}. Expected hash {:?}, got {:?}",
2562 peer_id,
2563 b.hash,
2564 hash,
2565 );
2566 return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2567 }
2568 }
2569 }
2570
2571 Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
2572}