1use crate::{
32 block_relay_protocol::{BlockDownloader, BlockResponseError},
33 blocks::BlockCollection,
34 justification_requests::ExtraRequests,
35 schema::v1::{StateRequest, StateResponse},
36 service::network::NetworkServiceHandle,
37 strategy::{
38 disconnected_peers::DisconnectedPeers,
39 state_sync::{ImportResult, StateSync, StateSyncProvider},
40 warp::{WarpSyncPhase, WarpSyncProgress},
41 StrategyKey, SyncingAction, SyncingStrategy,
42 },
43 types::{BadPeer, SyncState, SyncStatus},
44 LOG_TARGET,
45};
46
47use futures::{channel::oneshot, FutureExt};
48use log::{debug, error, info, trace, warn};
49use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
50use prost::Message;
51use sc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
52use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
53use sc_network::{IfDisconnected, ProtocolName};
54use sc_network_common::sync::message::{
55 BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
56};
57use sc_network_types::PeerId;
58use sp_arithmetic::traits::Saturating;
59use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
60use sp_consensus::{BlockOrigin, BlockStatus};
61use sp_runtime::{
62 traits::{
63 Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One, SaturatedConversion, Zero,
64 },
65 EncodedJustification, Justifications,
66};
67
68use std::{
69 any::Any,
70 collections::{HashMap, HashSet},
71 ops::Range,
72 sync::Arc,
73};
74
75#[cfg(test)]
76mod test;
77
78const MAX_IMPORTING_BLOCKS: usize = 2048;
80
81const MAX_DOWNLOAD_AHEAD: u32 = 2048;
83
84const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
87
88const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
90
91const MAJOR_SYNC_BLOCKS: u8 = 5;
97
98mod rep {
99 use sc_network::ReputationChange as Rep;
100 pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
103
104 pub const GENESIS_MISMATCH: Rep = Rep::new(i32::MIN, "Genesis mismatch");
107
108 pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
110
111 pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
113
114 pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
116
117 pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
119
120 pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
122
123 pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
125
126 pub const UNKNOWN_ANCESTOR: Rep = Rep::new(-(1 << 16), "DB Error");
128
129 pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
131
132 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
134}
135
136struct Metrics {
137 queued_blocks: Gauge<U64>,
138 fork_targets: Gauge<U64>,
139}
140
141impl Metrics {
142 fn register(r: &Registry) -> Result<Self, PrometheusError> {
143 Ok(Self {
144 queued_blocks: {
145 let g =
146 Gauge::new("substrate_sync_queued_blocks", "Number of blocks in import queue")?;
147 register(g, r)?
148 },
149 fork_targets: {
150 let g = Gauge::new("substrate_sync_fork_targets", "Number of fork sync targets")?;
151 register(g, r)?
152 },
153 })
154 }
155}
156
157#[derive(Debug, Clone)]
158enum AllowedRequests {
159 Some(HashSet<PeerId>),
160 All,
161}
162
163impl AllowedRequests {
164 fn add(&mut self, id: &PeerId) {
165 if let Self::Some(ref mut set) = self {
166 set.insert(*id);
167 }
168 }
169
170 fn take(&mut self) -> Self {
171 std::mem::take(self)
172 }
173
174 fn set_all(&mut self) {
175 *self = Self::All;
176 }
177
178 fn contains(&self, id: &PeerId) -> bool {
179 match self {
180 Self::Some(set) => set.contains(id),
181 Self::All => true,
182 }
183 }
184
185 fn is_empty(&self) -> bool {
186 match self {
187 Self::Some(set) => set.is_empty(),
188 Self::All => false,
189 }
190 }
191
192 fn clear(&mut self) {
193 std::mem::take(self);
194 }
195}
196
197impl Default for AllowedRequests {
198 fn default() -> Self {
199 Self::Some(HashSet::default())
200 }
201}
202
203struct GapSync<B: BlockT> {
204 blocks: BlockCollection<B>,
205 best_queued_number: NumberFor<B>,
206 target: NumberFor<B>,
207}
208
209#[derive(Copy, Clone, Debug, Eq, PartialEq)]
211pub enum ChainSyncMode {
212 Full,
214 LightState {
216 skip_proofs: bool,
218 storage_chain_mode: bool,
220 },
221}
222
223#[derive(Debug, Clone)]
225pub(crate) struct PeerSync<B: BlockT> {
226 pub peer_id: PeerId,
228 pub common_number: NumberFor<B>,
231 pub best_hash: B::Hash,
233 pub best_number: NumberFor<B>,
235 pub state: PeerSyncState<B>,
238}
239
240impl<B: BlockT> PeerSync<B> {
241 fn update_common_number(&mut self, new_common: NumberFor<B>) {
243 if self.common_number < new_common {
244 trace!(
245 target: LOG_TARGET,
246 "Updating peer {} common number from={} => to={}.",
247 self.peer_id,
248 self.common_number,
249 new_common,
250 );
251 self.common_number = new_common;
252 }
253 }
254}
255
256struct ForkTarget<B: BlockT> {
257 number: NumberFor<B>,
258 parent_hash: Option<B::Hash>,
259 peers: HashSet<PeerId>,
260}
261
262#[derive(Copy, Clone, Eq, PartialEq, Debug)]
267pub(crate) enum PeerSyncState<B: BlockT> {
268 Available,
270 AncestorSearch { start: NumberFor<B>, current: NumberFor<B>, state: AncestorSearchState<B> },
272 DownloadingNew(NumberFor<B>),
274 DownloadingStale(B::Hash),
278 DownloadingJustification(B::Hash),
280 DownloadingState,
282 DownloadingGap(NumberFor<B>),
284}
285
286impl<B: BlockT> PeerSyncState<B> {
287 pub fn is_available(&self) -> bool {
288 matches!(self, Self::Available)
289 }
290}
291
292pub struct ChainSync<B: BlockT, Client> {
295 client: Arc<Client>,
297 peers: HashMap<PeerId, PeerSync<B>>,
299 disconnected_peers: DisconnectedPeers,
300 blocks: BlockCollection<B>,
302 best_queued_number: NumberFor<B>,
304 best_queued_hash: B::Hash,
306 mode: ChainSyncMode,
308 extra_justifications: ExtraRequests<B>,
310 queue_blocks: HashSet<B::Hash>,
313 pending_state_sync_attempt: Option<(B::Hash, NumberFor<B>, bool)>,
321 fork_targets: HashMap<B::Hash, ForkTarget<B>>,
323 allowed_requests: AllowedRequests,
325 max_parallel_downloads: u32,
327 max_blocks_per_request: u32,
329 state_request_protocol_name: ProtocolName,
331 downloaded_blocks: usize,
333 state_sync: Option<StateSync<B, Client>>,
335 import_existing: bool,
338 block_downloader: Arc<dyn BlockDownloader<B>>,
340 gap_sync: Option<GapSync<B>>,
342 actions: Vec<SyncingAction<B>>,
344 metrics: Option<Metrics>,
346}
347
348impl<B, Client> SyncingStrategy<B> for ChainSync<B, Client>
349where
350 B: BlockT,
351 Client: HeaderBackend<B>
352 + BlockBackend<B>
353 + HeaderMetadata<B, Error = sp_blockchain::Error>
354 + ProofProvider<B>
355 + Send
356 + Sync
357 + 'static,
358{
359 fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
360 match self.add_peer_inner(peer_id, best_hash, best_number) {
361 Ok(Some(request)) => {
362 let action = self.create_block_request_action(peer_id, request);
363 self.actions.push(action);
364 },
365 Ok(None) => {},
366 Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)),
367 }
368 }
369
370 fn remove_peer(&mut self, peer_id: &PeerId) {
371 self.blocks.clear_peer_download(peer_id);
372 if let Some(gap_sync) = &mut self.gap_sync {
373 gap_sync.blocks.clear_peer_download(peer_id)
374 }
375
376 if let Some(state) = self.peers.remove(peer_id) {
377 if !state.state.is_available() {
378 if let Some(bad_peer) =
379 self.disconnected_peers.on_disconnect_during_request(*peer_id)
380 {
381 self.actions.push(SyncingAction::DropPeer(bad_peer));
382 }
383 }
384 }
385
386 self.extra_justifications.peer_disconnected(peer_id);
387 self.allowed_requests.set_all();
388 self.fork_targets.retain(|_, target| {
389 target.peers.remove(peer_id);
390 !target.peers.is_empty()
391 });
392 if let Some(metrics) = &self.metrics {
393 metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
394 }
395
396 let blocks = self.ready_blocks();
397
398 if !blocks.is_empty() {
399 self.validate_and_queue_blocks(blocks, false);
400 }
401 }
402
403 fn on_validated_block_announce(
404 &mut self,
405 is_best: bool,
406 peer_id: PeerId,
407 announce: &BlockAnnounce<B::Header>,
408 ) -> Option<(B::Hash, NumberFor<B>)> {
409 let number = *announce.header.number();
410 let hash = announce.header.hash();
411 let parent_status =
412 self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
413 let known_parent = parent_status != BlockStatus::Unknown;
414 let ancient_parent = parent_status == BlockStatus::InChainPruned;
415
416 let known = self.is_known(&hash);
417 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
418 peer
419 } else {
420 error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}");
421 return Some((hash, number))
422 };
423
424 if let PeerSyncState::AncestorSearch { .. } = peer.state {
425 trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
426 return None
427 }
428
429 let peer_info = is_best.then(|| {
430 peer.best_number = number;
432 peer.best_hash = hash;
433
434 (hash, number)
435 });
436
437 if is_best {
440 if known && self.best_queued_number >= number {
441 self.update_peer_common_number(&peer_id, number);
442 } else if announce.header.parent_hash() == &self.best_queued_hash ||
443 known_parent && self.best_queued_number >= number
444 {
445 self.update_peer_common_number(&peer_id, number.saturating_sub(One::one()));
446 }
447 }
448 self.allowed_requests.add(&peer_id);
449
450 if known || self.is_already_downloading(&hash) {
452 trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash);
453 if let Some(target) = self.fork_targets.get_mut(&hash) {
454 target.peers.insert(peer_id);
455 }
456 return peer_info
457 }
458
459 if ancient_parent {
460 trace!(
461 target: LOG_TARGET,
462 "Ignored ancient block announced from {}: {} {:?}",
463 peer_id,
464 hash,
465 announce.header,
466 );
467 return peer_info
468 }
469
470 if self.status().state == SyncState::Idle {
471 trace!(
472 target: LOG_TARGET,
473 "Added sync target for block announced from {}: {} {:?}",
474 peer_id,
475 hash,
476 announce.summary(),
477 );
478 self.fork_targets
479 .entry(hash)
480 .or_insert_with(|| {
481 if let Some(metrics) = &self.metrics {
482 metrics.fork_targets.inc();
483 }
484
485 ForkTarget {
486 number,
487 parent_hash: Some(*announce.header.parent_hash()),
488 peers: Default::default(),
489 }
490 })
491 .peers
492 .insert(peer_id);
493 }
494
495 peer_info
496 }
497
498 fn set_sync_fork_request(
500 &mut self,
501 mut peers: Vec<PeerId>,
502 hash: &B::Hash,
503 number: NumberFor<B>,
504 ) {
505 if peers.is_empty() {
506 peers = self
507 .peers
508 .iter()
509 .filter(|(_, peer)| peer.best_number >= number)
511 .map(|(id, _)| *id)
512 .collect();
513
514 debug!(
515 target: LOG_TARGET,
516 "Explicit sync request for block {hash:?} with no peers specified. \
517 Syncing from these peers {peers:?} instead.",
518 );
519 } else {
520 debug!(
521 target: LOG_TARGET,
522 "Explicit sync request for block {hash:?} with {peers:?}",
523 );
524 }
525
526 if self.is_known(hash) {
527 debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
528 return
529 }
530
531 trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
532 for peer_id in &peers {
533 if let Some(peer) = self.peers.get_mut(peer_id) {
534 if let PeerSyncState::AncestorSearch { .. } = peer.state {
535 continue
536 }
537
538 if number > peer.best_number {
539 peer.best_number = number;
540 peer.best_hash = *hash;
541 }
542 self.allowed_requests.add(peer_id);
543 }
544 }
545
546 self.fork_targets
547 .entry(*hash)
548 .or_insert_with(|| {
549 if let Some(metrics) = &self.metrics {
550 metrics.fork_targets.inc();
551 }
552
553 ForkTarget { number, peers: Default::default(), parent_hash: None }
554 })
555 .peers
556 .extend(peers);
557 }
558
559 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
560 let client = &self.client;
561 self.extra_justifications
562 .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block))
563 }
564
565 fn clear_justification_requests(&mut self) {
566 self.extra_justifications.reset();
567 }
568
569 fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
570 let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
571 self.extra_justifications
572 .try_finalize_root((hash, number), finalization_result, true);
573 self.allowed_requests.set_all();
574 }
575
576 fn on_generic_response(
577 &mut self,
578 peer_id: &PeerId,
579 key: StrategyKey,
580 protocol_name: ProtocolName,
581 response: Box<dyn Any + Send>,
582 ) {
583 if Self::STRATEGY_KEY != key {
584 warn!(
585 target: LOG_TARGET,
586 "Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
587 );
588 debug_assert!(false);
589 return;
590 }
591
592 if protocol_name == self.state_request_protocol_name {
593 let Ok(response) = response.downcast::<Vec<u8>>() else {
594 warn!(target: LOG_TARGET, "Failed to downcast state response");
595 debug_assert!(false);
596 return;
597 };
598
599 if let Err(bad_peer) = self.on_state_data(&peer_id, &response) {
600 self.actions.push(SyncingAction::DropPeer(bad_peer));
601 }
602 } else if &protocol_name == self.block_downloader.protocol_name() {
603 let Ok(response) = response
604 .downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
605 else {
606 warn!(target: LOG_TARGET, "Failed to downcast block response");
607 debug_assert!(false);
608 return;
609 };
610
611 let (request, response) = *response;
612 let blocks = match response {
613 Ok(blocks) => blocks,
614 Err(BlockResponseError::DecodeFailed(e)) => {
615 debug!(
616 target: LOG_TARGET,
617 "Failed to decode block response from peer {:?}: {:?}.",
618 peer_id,
619 e
620 );
621 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
622 return;
623 },
624 Err(BlockResponseError::ExtractionFailed(e)) => {
625 debug!(
626 target: LOG_TARGET,
627 "Failed to extract blocks from peer response {:?}: {:?}.",
628 peer_id,
629 e
630 );
631 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
632 return;
633 },
634 };
635
636 if let Err(bad_peer) = self.on_block_response(peer_id, key, request, blocks) {
637 self.actions.push(SyncingAction::DropPeer(bad_peer));
638 }
639 } else {
640 warn!(
641 target: LOG_TARGET,
642 "Unexpected generic response protocol {protocol_name}, strategy key \
643 {key:?}",
644 );
645 debug_assert!(false);
646 }
647 }
648
649 fn on_blocks_processed(
650 &mut self,
651 imported: usize,
652 count: usize,
653 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
654 ) {
655 trace!(target: LOG_TARGET, "Imported {imported} of {count}");
656
657 let mut has_error = false;
658 for (_, hash) in &results {
659 if self.queue_blocks.remove(hash) {
660 if let Some(metrics) = &self.metrics {
661 metrics.queued_blocks.dec();
662 }
663 }
664 self.blocks.clear_queued(hash);
665 if let Some(gap_sync) = &mut self.gap_sync {
666 gap_sync.blocks.clear_queued(hash);
667 }
668 }
669 for (result, hash) in results {
670 if has_error {
671 break
672 }
673
674 has_error |= result.is_err();
675
676 match result {
677 Ok(BlockImportStatus::ImportedKnown(number, peer_id)) =>
678 if let Some(peer) = peer_id {
679 self.update_peer_common_number(&peer, number);
680 },
681 Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
682 if aux.clear_justification_requests {
683 trace!(
684 target: LOG_TARGET,
685 "Block imported clears all pending justification requests {number}: {hash:?}",
686 );
687 self.clear_justification_requests();
688 }
689
690 if aux.needs_justification {
691 trace!(
692 target: LOG_TARGET,
693 "Block imported but requires justification {number}: {hash:?}",
694 );
695 self.request_justification(&hash, number);
696 }
697
698 if aux.bad_justification {
699 if let Some(ref peer) = peer_id {
700 warn!("💔 Sent block with bad justification to import");
701 self.actions.push(SyncingAction::DropPeer(BadPeer(
702 *peer,
703 rep::BAD_JUSTIFICATION,
704 )));
705 }
706 }
707
708 if let Some(peer) = peer_id {
709 self.update_peer_common_number(&peer, number);
710 }
711 let state_sync_complete =
712 self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash);
713 if state_sync_complete {
714 info!(
715 target: LOG_TARGET,
716 "State sync is complete ({} MiB), restarting block sync.",
717 self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
718 );
719 self.state_sync = None;
720 self.mode = ChainSyncMode::Full;
721 self.restart();
722 }
723 let gap_sync_complete =
724 self.gap_sync.as_ref().map_or(false, |s| s.target == number);
725 if gap_sync_complete {
726 info!(
727 target: LOG_TARGET,
728 "Block history download is complete."
729 );
730 self.gap_sync = None;
731 }
732 },
733 Err(BlockImportError::IncompleteHeader(peer_id)) =>
734 if let Some(peer) = peer_id {
735 warn!(
736 target: LOG_TARGET,
737 "💔 Peer sent block with incomplete header to import",
738 );
739 self.actions
740 .push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
741 self.restart();
742 },
743 Err(BlockImportError::VerificationFailed(peer_id, e)) => {
744 let extra_message = peer_id
745 .map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
746
747 warn!(
748 target: LOG_TARGET,
749 "💔 Verification failed for block {hash:?}{extra_message}: {e:?}",
750 );
751
752 if let Some(peer) = peer_id {
753 self.actions
754 .push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
755 }
756
757 self.restart();
758 },
759 Err(BlockImportError::BadBlock(peer_id)) =>
760 if let Some(peer) = peer_id {
761 warn!(
762 target: LOG_TARGET,
763 "💔 Block {hash:?} received from peer {peer} has been blacklisted",
764 );
765 self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
766 },
767 Err(BlockImportError::MissingState) => {
768 trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
772 },
773 e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
774 warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err());
775 self.state_sync = None;
776 self.restart();
777 },
778 Err(BlockImportError::Cancelled) => {},
779 };
780 }
781
782 self.allowed_requests.set_all();
783 }
784
785 fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
786 let client = &self.client;
787 let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
788 is_descendent_of(&**client, base, block)
789 });
790
791 if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode {
792 if self.state_sync.is_none() {
793 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
794 self.attempt_state_sync(*hash, number, *skip_proofs);
795 } else {
796 self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs));
797 }
798 }
799 }
800
801 if let Err(err) = r {
802 warn!(
803 target: LOG_TARGET,
804 "💔 Error cleaning up pending extra justification data requests: {err}",
805 );
806 }
807 }
808
809 fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
810 self.on_block_queued(best_hash, best_number);
811 }
812
813 fn is_major_syncing(&self) -> bool {
814 self.status().state.is_major_syncing()
815 }
816
817 fn num_peers(&self) -> usize {
818 self.peers.len()
819 }
820
821 fn status(&self) -> SyncStatus<B> {
822 let median_seen = self.median_seen();
823 let best_seen_block =
824 median_seen.and_then(|median| (median > self.best_queued_number).then_some(median));
825 let sync_state = if let Some(target) = median_seen {
826 let best_block = self.client.info().best_number;
830 if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() {
831 if target > self.best_queued_number {
833 SyncState::Downloading { target }
834 } else {
835 SyncState::Importing { target }
836 }
837 } else {
838 SyncState::Idle
839 }
840 } else {
841 SyncState::Idle
842 };
843
844 let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress {
845 phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
846 total_bytes: 0,
847 });
848
849 SyncStatus {
850 state: sync_state,
851 best_seen_block,
852 num_peers: self.peers.len() as u32,
853 queued_blocks: self.queue_blocks.len() as u32,
854 state_sync: self.state_sync.as_ref().map(|s| s.progress()),
855 warp_sync: warp_sync_progress,
856 }
857 }
858
859 fn num_downloaded_blocks(&self) -> usize {
860 self.downloaded_blocks
861 }
862
863 fn num_sync_requests(&self) -> usize {
864 self.fork_targets
865 .values()
866 .filter(|f| f.number <= self.best_queued_number)
867 .count()
868 }
869
870 fn actions(
871 &mut self,
872 network_service: &NetworkServiceHandle,
873 ) -> Result<Vec<SyncingAction<B>>, ClientError> {
874 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
875 if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() {
876 self.attempt_state_sync(hash, number, skip_proofs);
877 }
878 }
879
880 let block_requests = self
881 .block_requests()
882 .into_iter()
883 .map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
884 .collect::<Vec<_>>();
885 self.actions.extend(block_requests);
886
887 let justification_requests = self
888 .justification_requests()
889 .into_iter()
890 .map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
891 .collect::<Vec<_>>();
892 self.actions.extend(justification_requests);
893
894 let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
895 trace!(
896 target: LOG_TARGET,
897 "Created `StrategyRequest` to {peer_id}.",
898 );
899
900 let (tx, rx) = oneshot::channel();
901
902 network_service.start_request(
903 peer_id,
904 self.state_request_protocol_name.clone(),
905 request.encode_to_vec(),
906 tx,
907 IfDisconnected::ImmediateError,
908 );
909
910 SyncingAction::StartRequest {
911 peer_id,
912 key: Self::STRATEGY_KEY,
913 request: async move {
914 Ok(rx.await?.and_then(|(response, protocol_name)| {
915 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
916 }))
917 }
918 .boxed(),
919 remove_obsolete: false,
920 }
921 });
922 self.actions.extend(state_request);
923
924 Ok(std::mem::take(&mut self.actions))
925 }
926}
927
928impl<B, Client> ChainSync<B, Client>
929where
930 B: BlockT,
931 Client: HeaderBackend<B>
932 + BlockBackend<B>
933 + HeaderMetadata<B, Error = sp_blockchain::Error>
934 + ProofProvider<B>
935 + Send
936 + Sync
937 + 'static,
938{
939 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("ChainSync");
941
942 pub fn new(
944 mode: ChainSyncMode,
945 client: Arc<Client>,
946 max_parallel_downloads: u32,
947 max_blocks_per_request: u32,
948 state_request_protocol_name: ProtocolName,
949 block_downloader: Arc<dyn BlockDownloader<B>>,
950 metrics_registry: Option<&Registry>,
951 initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
952 ) -> Result<Self, ClientError> {
953 let mut sync = Self {
954 client,
955 peers: HashMap::new(),
956 disconnected_peers: DisconnectedPeers::new(),
957 blocks: BlockCollection::new(),
958 best_queued_hash: Default::default(),
959 best_queued_number: Zero::zero(),
960 extra_justifications: ExtraRequests::new("justification", metrics_registry),
961 mode,
962 queue_blocks: Default::default(),
963 pending_state_sync_attempt: None,
964 fork_targets: Default::default(),
965 allowed_requests: Default::default(),
966 max_parallel_downloads,
967 max_blocks_per_request,
968 state_request_protocol_name,
969 downloaded_blocks: 0,
970 state_sync: None,
971 import_existing: false,
972 block_downloader,
973 gap_sync: None,
974 actions: Vec::new(),
975 metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
976 Ok(metrics) => Some(metrics),
977 Err(err) => {
978 log::error!(
979 target: LOG_TARGET,
980 "Failed to register `ChainSync` metrics {err:?}",
981 );
982 None
983 },
984 }),
985 };
986
987 sync.reset_sync_start_point()?;
988 initial_peers.for_each(|(peer_id, best_hash, best_number)| {
989 sync.add_peer(peer_id, best_hash, best_number);
990 });
991
992 Ok(sync)
993 }
994
995 #[must_use]
996 fn add_peer_inner(
997 &mut self,
998 peer_id: PeerId,
999 best_hash: B::Hash,
1000 best_number: NumberFor<B>,
1001 ) -> Result<Option<BlockRequest<B>>, BadPeer> {
1002 match self.block_status(&best_hash) {
1004 Err(e) => {
1005 debug!(target: LOG_TARGET, "Error reading blockchain: {e}");
1006 Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
1007 },
1008 Ok(BlockStatus::KnownBad) => {
1009 info!(
1010 "💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})."
1011 );
1012 Err(BadPeer(peer_id, rep::BAD_BLOCK))
1013 },
1014 Ok(BlockStatus::Unknown) => {
1015 if best_number.is_zero() {
1016 info!(
1017 "💔 New peer {} with unknown genesis hash {} ({}).",
1018 peer_id, best_hash, best_number,
1019 );
1020 return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH));
1021 }
1022
1023 if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS.into() {
1027 debug!(
1028 target: LOG_TARGET,
1029 "New peer {} with unknown best hash {} ({}), assuming common block.",
1030 peer_id,
1031 self.best_queued_hash,
1032 self.best_queued_number
1033 );
1034 self.peers.insert(
1035 peer_id,
1036 PeerSync {
1037 peer_id,
1038 common_number: self.best_queued_number,
1039 best_hash,
1040 best_number,
1041 state: PeerSyncState::Available,
1042 },
1043 );
1044 return Ok(None);
1045 }
1046
1047 let (state, req) = if self.best_queued_number.is_zero() {
1049 debug!(
1050 target: LOG_TARGET,
1051 "New peer {peer_id} with best hash {best_hash} ({best_number}).",
1052 );
1053
1054 (PeerSyncState::Available, None)
1055 } else {
1056 let common_best = std::cmp::min(self.best_queued_number, best_number);
1057
1058 debug!(
1059 target: LOG_TARGET,
1060 "New peer {} with unknown best hash {} ({}), searching for common ancestor.",
1061 peer_id,
1062 best_hash,
1063 best_number
1064 );
1065
1066 (
1067 PeerSyncState::AncestorSearch {
1068 current: common_best,
1069 start: self.best_queued_number,
1070 state: AncestorSearchState::ExponentialBackoff(One::one()),
1071 },
1072 Some(ancestry_request::<B>(common_best)),
1073 )
1074 };
1075
1076 self.allowed_requests.add(&peer_id);
1077 self.peers.insert(
1078 peer_id,
1079 PeerSync {
1080 peer_id,
1081 common_number: Zero::zero(),
1082 best_hash,
1083 best_number,
1084 state,
1085 },
1086 );
1087
1088 Ok(req)
1089 },
1090 Ok(BlockStatus::Queued) |
1091 Ok(BlockStatus::InChainWithState) |
1092 Ok(BlockStatus::InChainPruned) => {
1093 debug!(
1094 target: LOG_TARGET,
1095 "New peer {peer_id} with known best hash {best_hash} ({best_number}).",
1096 );
1097 self.peers.insert(
1098 peer_id,
1099 PeerSync {
1100 peer_id,
1101 common_number: std::cmp::min(self.best_queued_number, best_number),
1102 best_hash,
1103 best_number,
1104 state: PeerSyncState::Available,
1105 },
1106 );
1107 self.allowed_requests.add(&peer_id);
1108 Ok(None)
1109 },
1110 }
1111 }
1112
1113 fn create_block_request_action(
1114 &mut self,
1115 peer_id: PeerId,
1116 request: BlockRequest<B>,
1117 ) -> SyncingAction<B> {
1118 let downloader = self.block_downloader.clone();
1119
1120 SyncingAction::StartRequest {
1121 peer_id,
1122 key: Self::STRATEGY_KEY,
1123 request: async move {
1124 Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
1125 |(response, protocol_name)| {
1126 let decoded_response =
1127 downloader.block_response_into_blocks(&request, response);
1128 let result = Box::new((request, decoded_response)) as Box<dyn Any + Send>;
1129 Ok((result, protocol_name))
1130 },
1131 ))
1132 }
1133 .boxed(),
1134 remove_obsolete: true,
1137 }
1138 }
1139
1140 #[must_use]
1142 fn on_block_data(
1143 &mut self,
1144 peer_id: &PeerId,
1145 request: Option<BlockRequest<B>>,
1146 response: BlockResponse<B>,
1147 ) -> Result<(), BadPeer> {
1148 self.downloaded_blocks += response.blocks.len();
1149 let mut gap = false;
1150 let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
1151 let mut blocks = response.blocks;
1152 if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
1153 trace!(target: LOG_TARGET, "Reversing incoming block list");
1154 blocks.reverse()
1155 }
1156 self.allowed_requests.add(peer_id);
1157 if let Some(request) = request {
1158 match &mut peer.state {
1159 PeerSyncState::DownloadingNew(_) => {
1160 self.blocks.clear_peer_download(peer_id);
1161 peer.state = PeerSyncState::Available;
1162 if let Some(start_block) =
1163 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1164 {
1165 self.blocks.insert(start_block, blocks, *peer_id);
1166 }
1167 self.ready_blocks()
1168 },
1169 PeerSyncState::DownloadingGap(_) => {
1170 peer.state = PeerSyncState::Available;
1171 if let Some(gap_sync) = &mut self.gap_sync {
1172 gap_sync.blocks.clear_peer_download(peer_id);
1173 if let Some(start_block) =
1174 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1175 {
1176 gap_sync.blocks.insert(start_block, blocks, *peer_id);
1177 }
1178 gap = true;
1179 let blocks: Vec<_> = gap_sync
1180 .blocks
1181 .ready_blocks(gap_sync.best_queued_number + One::one())
1182 .into_iter()
1183 .map(|block_data| {
1184 let justifications =
1185 block_data.block.justifications.or_else(|| {
1186 legacy_justification_mapping(
1187 block_data.block.justification,
1188 )
1189 });
1190 IncomingBlock {
1191 hash: block_data.block.hash,
1192 header: block_data.block.header,
1193 body: block_data.block.body,
1194 indexed_body: block_data.block.indexed_body,
1195 justifications,
1196 origin: block_data.origin,
1197 allow_missing_state: true,
1198 import_existing: self.import_existing,
1199 skip_execution: true,
1200 state: None,
1201 }
1202 })
1203 .collect();
1204 debug!(
1205 target: LOG_TARGET,
1206 "Drained {} gap blocks from {}",
1207 blocks.len(),
1208 gap_sync.best_queued_number,
1209 );
1210 blocks
1211 } else {
1212 debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
1213 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1214 }
1215 },
1216 PeerSyncState::DownloadingStale(_) => {
1217 peer.state = PeerSyncState::Available;
1218 if blocks.is_empty() {
1219 debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
1220 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1221 }
1222 validate_blocks::<B>(&blocks, peer_id, Some(request))?;
1223 blocks
1224 .into_iter()
1225 .map(|b| {
1226 let justifications = b
1227 .justifications
1228 .or_else(|| legacy_justification_mapping(b.justification));
1229 IncomingBlock {
1230 hash: b.hash,
1231 header: b.header,
1232 body: b.body,
1233 indexed_body: None,
1234 justifications,
1235 origin: Some(*peer_id),
1236 allow_missing_state: true,
1237 import_existing: self.import_existing,
1238 skip_execution: self.skip_execution(),
1239 state: None,
1240 }
1241 })
1242 .collect()
1243 },
1244 PeerSyncState::AncestorSearch { current, start, state } => {
1245 let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
1246 (Some(block), Ok(maybe_our_block_hash)) => {
1247 trace!(
1248 target: LOG_TARGET,
1249 "Got ancestry block #{} ({}) from peer {}",
1250 current,
1251 block.hash,
1252 peer_id,
1253 );
1254 maybe_our_block_hash.filter(|x| x == &block.hash)
1255 },
1256 (None, _) => {
1257 debug!(
1258 target: LOG_TARGET,
1259 "Invalid response when searching for ancestor from {peer_id}",
1260 );
1261 return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR));
1262 },
1263 (_, Err(e)) => {
1264 info!(
1265 target: LOG_TARGET,
1266 "❌ Error answering legitimate blockchain query: {e}",
1267 );
1268 return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR));
1269 },
1270 };
1271 if matching_hash.is_some() {
1272 if *start < self.best_queued_number &&
1273 self.best_queued_number <= peer.best_number
1274 {
1275 trace!(
1279 target: LOG_TARGET,
1280 "Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
1281 *peer_id,
1282 peer.common_number,
1283 self.best_queued_number,
1284 );
1285 peer.common_number = self.best_queued_number;
1286 } else if peer.common_number < *current {
1287 trace!(
1288 target: LOG_TARGET,
1289 "Ancestry search: updating peer {} common number from={} => to={}.",
1290 *peer_id,
1291 peer.common_number,
1292 *current,
1293 );
1294 peer.common_number = *current;
1295 }
1296 }
1297 if matching_hash.is_none() && current.is_zero() {
1298 trace!(
1299 target: LOG_TARGET,
1300 "Ancestry search: genesis mismatch for peer {peer_id}",
1301 );
1302 return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH));
1303 }
1304 if let Some((next_state, next_num)) =
1305 handle_ancestor_search_state(state, *current, matching_hash.is_some())
1306 {
1307 peer.state = PeerSyncState::AncestorSearch {
1308 current: next_num,
1309 start: *start,
1310 state: next_state,
1311 };
1312 let request = ancestry_request::<B>(next_num);
1313 let action = self.create_block_request_action(*peer_id, request);
1314 self.actions.push(action);
1315 return Ok(());
1316 } else {
1317 trace!(
1320 target: LOG_TARGET,
1321 "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
1322 self.best_queued_hash,
1323 self.best_queued_number,
1324 peer.best_hash,
1325 peer.best_number,
1326 matching_hash,
1327 peer.common_number,
1328 );
1329 if peer.common_number < peer.best_number &&
1330 peer.best_number < self.best_queued_number
1331 {
1332 trace!(
1333 target: LOG_TARGET,
1334 "Added fork target {} for {}",
1335 peer.best_hash,
1336 peer_id,
1337 );
1338 self.fork_targets
1339 .entry(peer.best_hash)
1340 .or_insert_with(|| {
1341 if let Some(metrics) = &self.metrics {
1342 metrics.fork_targets.inc();
1343 }
1344
1345 ForkTarget {
1346 number: peer.best_number,
1347 parent_hash: None,
1348 peers: Default::default(),
1349 }
1350 })
1351 .peers
1352 .insert(*peer_id);
1353 }
1354 peer.state = PeerSyncState::Available;
1355 return Ok(());
1356 }
1357 },
1358 PeerSyncState::Available |
1359 PeerSyncState::DownloadingJustification(..) |
1360 PeerSyncState::DownloadingState => Vec::new(),
1361 }
1362 } else {
1363 validate_blocks::<B>(&blocks, peer_id, None)?;
1365 blocks
1366 .into_iter()
1367 .map(|b| {
1368 let justifications = b
1369 .justifications
1370 .or_else(|| legacy_justification_mapping(b.justification));
1371 IncomingBlock {
1372 hash: b.hash,
1373 header: b.header,
1374 body: b.body,
1375 indexed_body: None,
1376 justifications,
1377 origin: Some(*peer_id),
1378 allow_missing_state: true,
1379 import_existing: false,
1380 skip_execution: true,
1381 state: None,
1382 }
1383 })
1384 .collect()
1385 }
1386 } else {
1387 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1389 };
1390
1391 self.validate_and_queue_blocks(new_blocks, gap);
1392
1393 Ok(())
1394 }
1395
1396 fn on_block_response(
1397 &mut self,
1398 peer_id: &PeerId,
1399 key: StrategyKey,
1400 request: BlockRequest<B>,
1401 blocks: Vec<BlockData<B>>,
1402 ) -> Result<(), BadPeer> {
1403 if key != Self::STRATEGY_KEY {
1404 error!(
1405 target: LOG_TARGET,
1406 "`on_block_response()` called with unexpected key {key:?} for chain sync",
1407 );
1408 debug_assert!(false);
1409 }
1410 let block_response = BlockResponse::<B> { id: request.id, blocks };
1411
1412 let blocks_range = || match (
1413 block_response
1414 .blocks
1415 .first()
1416 .and_then(|b| b.header.as_ref().map(|h| h.number())),
1417 block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
1418 ) {
1419 (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
1420 (Some(first), Some(_)) => format!(" ({})", first),
1421 _ => Default::default(),
1422 };
1423 trace!(
1424 target: LOG_TARGET,
1425 "BlockResponse {} from {} with {} blocks {}",
1426 block_response.id,
1427 peer_id,
1428 block_response.blocks.len(),
1429 blocks_range(),
1430 );
1431
1432 if request.fields == BlockAttributes::JUSTIFICATION {
1433 self.on_block_justification(*peer_id, block_response)
1434 } else {
1435 self.on_block_data(peer_id, Some(request), block_response)
1436 }
1437 }
1438
1439 #[must_use]
1441 fn on_block_justification(
1442 &mut self,
1443 peer_id: PeerId,
1444 response: BlockResponse<B>,
1445 ) -> Result<(), BadPeer> {
1446 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
1447 peer
1448 } else {
1449 error!(
1450 target: LOG_TARGET,
1451 "💔 Called on_block_justification with a peer ID of an unknown peer",
1452 );
1453 return Ok(());
1454 };
1455
1456 self.allowed_requests.add(&peer_id);
1457 if let PeerSyncState::DownloadingJustification(hash) = peer.state {
1458 peer.state = PeerSyncState::Available;
1459
1460 let justification = if let Some(block) = response.blocks.into_iter().next() {
1462 if hash != block.hash {
1463 warn!(
1464 target: LOG_TARGET,
1465 "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}",
1466 peer_id,
1467 hash,
1468 block.hash,
1469 );
1470 return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION));
1471 }
1472
1473 block
1474 .justifications
1475 .or_else(|| legacy_justification_mapping(block.justification))
1476 } else {
1477 trace!(
1480 target: LOG_TARGET,
1481 "Peer {peer_id:?} provided empty response for justification request {hash:?}",
1482 );
1483
1484 None
1485 };
1486
1487 if let Some((peer_id, hash, number, justifications)) =
1488 self.extra_justifications.on_response(peer_id, justification)
1489 {
1490 self.actions.push(SyncingAction::ImportJustifications {
1491 peer_id,
1492 hash,
1493 number,
1494 justifications,
1495 });
1496 return Ok(());
1497 }
1498 }
1499
1500 Ok(())
1501 }
1502
1503 fn median_seen(&self) -> Option<NumberFor<B>> {
1505 let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
1506
1507 if best_seens.is_empty() {
1508 None
1509 } else {
1510 let middle = best_seens.len() / 2;
1511
1512 Some(*best_seens.select_nth_unstable(middle).1)
1514 }
1515 }
1516
1517 fn required_block_attributes(&self) -> BlockAttributes {
1518 match self.mode {
1519 ChainSyncMode::Full =>
1520 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1521 ChainSyncMode::LightState { storage_chain_mode: false, .. } =>
1522 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1523 ChainSyncMode::LightState { storage_chain_mode: true, .. } =>
1524 BlockAttributes::HEADER |
1525 BlockAttributes::JUSTIFICATION |
1526 BlockAttributes::INDEXED_BODY,
1527 }
1528 }
1529
1530 fn skip_execution(&self) -> bool {
1531 match self.mode {
1532 ChainSyncMode::Full => false,
1533 ChainSyncMode::LightState { .. } => true,
1534 }
1535 }
1536
1537 fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec<IncomingBlock<B>>, gap: bool) {
1538 let orig_len = new_blocks.len();
1539 new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
1540 if new_blocks.len() != orig_len {
1541 debug!(
1542 target: LOG_TARGET,
1543 "Ignoring {} blocks that are already queued",
1544 orig_len - new_blocks.len(),
1545 );
1546 }
1547
1548 let origin = if !gap && !self.status().state.is_major_syncing() {
1549 BlockOrigin::NetworkBroadcast
1550 } else {
1551 BlockOrigin::NetworkInitialSync
1552 };
1553
1554 if let Some((h, n)) = new_blocks
1555 .last()
1556 .and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
1557 {
1558 trace!(
1559 target: LOG_TARGET,
1560 "Accepted {} blocks ({:?}) with origin {:?}",
1561 new_blocks.len(),
1562 h,
1563 origin,
1564 );
1565 self.on_block_queued(h, n)
1566 }
1567 self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
1568 if let Some(metrics) = &self.metrics {
1569 metrics
1570 .queued_blocks
1571 .set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
1572 }
1573
1574 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks })
1575 }
1576
1577 fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
1578 if let Some(peer) = self.peers.get_mut(peer_id) {
1579 peer.update_common_number(new_common);
1580 }
1581 }
1582
1583 fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
1588 if self.fork_targets.remove(hash).is_some() {
1589 if let Some(metrics) = &self.metrics {
1590 metrics.fork_targets.dec();
1591 }
1592 trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
1593 }
1594 if let Some(gap_sync) = &mut self.gap_sync {
1595 if number > gap_sync.best_queued_number && number <= gap_sync.target {
1596 gap_sync.best_queued_number = number;
1597 }
1598 }
1599 if number > self.best_queued_number {
1600 self.best_queued_number = number;
1601 self.best_queued_hash = *hash;
1602 for (n, peer) in self.peers.iter_mut() {
1604 if let PeerSyncState::AncestorSearch { .. } = peer.state {
1605 continue;
1607 }
1608 let new_common_number =
1609 if peer.best_number >= number { number } else { peer.best_number };
1610 trace!(
1611 target: LOG_TARGET,
1612 "Updating peer {} info, ours={}, common={}->{}, their best={}",
1613 n,
1614 number,
1615 peer.common_number,
1616 new_common_number,
1617 peer.best_number,
1618 );
1619 peer.common_number = new_common_number;
1620 }
1621 }
1622 self.allowed_requests.set_all();
1623 }
1624
1625 fn restart(&mut self) {
1629 self.blocks.clear();
1630 if let Err(e) = self.reset_sync_start_point() {
1631 warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}");
1632 }
1633 self.allowed_requests.set_all();
1634 debug!(
1635 target: LOG_TARGET,
1636 "Restarted with {} ({})",
1637 self.best_queued_number,
1638 self.best_queued_hash,
1639 );
1640 let old_peers = std::mem::take(&mut self.peers);
1641
1642 old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| {
1643 match peer_sync.state {
1644 PeerSyncState::Available => {
1645 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1646 },
1647 PeerSyncState::AncestorSearch { .. } |
1648 PeerSyncState::DownloadingNew(_) |
1649 PeerSyncState::DownloadingStale(_) |
1650 PeerSyncState::DownloadingGap(_) |
1651 PeerSyncState::DownloadingState => {
1652 self.actions
1654 .push(SyncingAction::CancelRequest { peer_id, key: Self::STRATEGY_KEY });
1655 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1656 },
1657 PeerSyncState::DownloadingJustification(_) => {
1658 trace!(
1662 target: LOG_TARGET,
1663 "Keeping peer {} after restart, updating common number from={} => to={} (our best).",
1664 peer_id,
1665 peer_sync.common_number,
1666 self.best_queued_number,
1667 );
1668 peer_sync.common_number = self.best_queued_number;
1669 self.peers.insert(peer_id, peer_sync);
1670 },
1671 }
1672 });
1673 }
1674
1675 fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
1678 let info = self.client.info();
1679 if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
1680 warn!(
1681 target: LOG_TARGET,
1682 "Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
1683 );
1684 self.mode = ChainSyncMode::Full;
1685 }
1686
1687 self.import_existing = false;
1688 self.best_queued_hash = info.best_hash;
1689 self.best_queued_number = info.best_number;
1690
1691 if self.mode == ChainSyncMode::Full &&
1692 self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
1693 {
1694 self.import_existing = true;
1695 if let Some((hash, number)) = info.finalized_state {
1697 debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
1698 self.best_queued_hash = hash;
1699 self.best_queued_number = number;
1700 } else {
1701 debug!(target: LOG_TARGET, "Restarting from genesis");
1702 self.best_queued_hash = Default::default();
1703 self.best_queued_number = Zero::zero();
1704 }
1705 }
1706
1707 if let Some(BlockGap { start, end, .. }) = info.block_gap {
1708 debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end}");
1709 self.gap_sync = Some(GapSync {
1710 best_queued_number: start - One::one(),
1711 target: end,
1712 blocks: BlockCollection::new(),
1713 });
1714 }
1715 trace!(
1716 target: LOG_TARGET,
1717 "Restarted sync at #{} ({:?})",
1718 self.best_queued_number,
1719 self.best_queued_hash,
1720 );
1721 Ok(())
1722 }
1723
1724 fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
1726 if self.queue_blocks.contains(hash) {
1727 return Ok(BlockStatus::Queued);
1728 }
1729 self.client.block_status(*hash)
1730 }
1731
1732 fn is_known(&self, hash: &B::Hash) -> bool {
1734 self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
1735 }
1736
1737 fn is_already_downloading(&self, hash: &B::Hash) -> bool {
1739 self.peers
1740 .iter()
1741 .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
1742 }
1743
1744 fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
1746 self.blocks
1747 .ready_blocks(self.best_queued_number + One::one())
1748 .into_iter()
1749 .map(|block_data| {
1750 let justifications = block_data
1751 .block
1752 .justifications
1753 .or_else(|| legacy_justification_mapping(block_data.block.justification));
1754 IncomingBlock {
1755 hash: block_data.block.hash,
1756 header: block_data.block.header,
1757 body: block_data.block.body,
1758 indexed_body: block_data.block.indexed_body,
1759 justifications,
1760 origin: block_data.origin,
1761 allow_missing_state: true,
1762 import_existing: self.import_existing,
1763 skip_execution: self.skip_execution(),
1764 state: None,
1765 }
1766 })
1767 .collect()
1768 }
1769
1770 fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1772 let peers = &mut self.peers;
1773 let mut matcher = self.extra_justifications.matcher();
1774 std::iter::from_fn(move || {
1775 if let Some((peer, request)) = matcher.next(peers) {
1776 peers
1777 .get_mut(&peer)
1778 .expect(
1779 "`Matcher::next` guarantees the `PeerId` comes from the given peers; qed",
1780 )
1781 .state = PeerSyncState::DownloadingJustification(request.0);
1782 let req = BlockRequest::<B> {
1783 id: 0,
1784 fields: BlockAttributes::JUSTIFICATION,
1785 from: FromBlock::Hash(request.0),
1786 direction: Direction::Ascending,
1787 max: Some(1),
1788 };
1789 Some((peer, req))
1790 } else {
1791 None
1792 }
1793 })
1794 .collect()
1795 }
1796
1797 fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1799 if self.allowed_requests.is_empty() || self.state_sync.is_some() {
1800 return Vec::new();
1801 }
1802
1803 if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
1804 trace!(target: LOG_TARGET, "Too many blocks in the queue.");
1805 return Vec::new();
1806 }
1807 let is_major_syncing = self.status().state.is_major_syncing();
1808 let attrs = self.required_block_attributes();
1809 let blocks = &mut self.blocks;
1810 let fork_targets = &mut self.fork_targets;
1811 let last_finalized =
1812 std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
1813 let best_queued = self.best_queued_number;
1814 let client = &self.client;
1815 let queue_blocks = &self.queue_blocks;
1816 let allowed_requests = self.allowed_requests.clone();
1817 let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
1818 let max_blocks_per_request = self.max_blocks_per_request;
1819 let gap_sync = &mut self.gap_sync;
1820 let disconnected_peers = &mut self.disconnected_peers;
1821 let metrics = self.metrics.as_ref();
1822 let requests = self
1823 .peers
1824 .iter_mut()
1825 .filter_map(move |(&id, peer)| {
1826 if !peer.state.is_available() ||
1827 !allowed_requests.contains(&id) ||
1828 !disconnected_peers.is_peer_available(&id)
1829 {
1830 return None;
1831 }
1832
1833 if best_queued.saturating_sub(peer.common_number) >
1839 MAX_BLOCKS_TO_LOOK_BACKWARDS.into() &&
1840 best_queued < peer.best_number &&
1841 peer.common_number < last_finalized &&
1842 queue_blocks.len() <= MAJOR_SYNC_BLOCKS.into()
1843 {
1844 trace!(
1845 target: LOG_TARGET,
1846 "Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
1847 id,
1848 peer.common_number,
1849 best_queued,
1850 );
1851 let current = std::cmp::min(peer.best_number, best_queued);
1852 peer.state = PeerSyncState::AncestorSearch {
1853 current,
1854 start: best_queued,
1855 state: AncestorSearchState::ExponentialBackoff(One::one()),
1856 };
1857 Some((id, ancestry_request::<B>(current)))
1858 } else if let Some((range, req)) = peer_block_request(
1859 &id,
1860 peer,
1861 blocks,
1862 attrs,
1863 max_parallel,
1864 max_blocks_per_request,
1865 last_finalized,
1866 best_queued,
1867 ) {
1868 peer.state = PeerSyncState::DownloadingNew(range.start);
1869 trace!(
1870 target: LOG_TARGET,
1871 "New block request for {}, (best:{}, common:{}) {:?}",
1872 id,
1873 peer.best_number,
1874 peer.common_number,
1875 req,
1876 );
1877 Some((id, req))
1878 } else if let Some((hash, req)) = fork_sync_request(
1879 &id,
1880 fork_targets,
1881 best_queued,
1882 last_finalized,
1883 attrs,
1884 |hash| {
1885 if queue_blocks.contains(hash) {
1886 BlockStatus::Queued
1887 } else {
1888 client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
1889 }
1890 },
1891 max_blocks_per_request,
1892 metrics,
1893 ) {
1894 trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
1895 peer.state = PeerSyncState::DownloadingStale(hash);
1896 Some((id, req))
1897 } else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
1898 peer_gap_block_request(
1899 &id,
1900 peer,
1901 &mut sync.blocks,
1902 attrs,
1903 sync.target,
1904 sync.best_queued_number,
1905 max_blocks_per_request,
1906 )
1907 }) {
1908 peer.state = PeerSyncState::DownloadingGap(range.start);
1909 trace!(
1910 target: LOG_TARGET,
1911 "New gap block request for {}, (best:{}, common:{}) {:?}",
1912 id,
1913 peer.best_number,
1914 peer.common_number,
1915 req,
1916 );
1917 Some((id, req))
1918 } else {
1919 None
1920 }
1921 })
1922 .collect::<Vec<_>>();
1923
1924 if !requests.is_empty() {
1927 self.allowed_requests.take();
1928 }
1929
1930 requests
1931 }
1932
1933 fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
1935 if self.allowed_requests.is_empty() {
1936 return None;
1937 }
1938 if self.state_sync.is_some() &&
1939 self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
1940 {
1941 return None;
1943 }
1944 if let Some(sync) = &self.state_sync {
1945 if sync.is_complete() {
1946 return None;
1947 }
1948
1949 for (id, peer) in self.peers.iter_mut() {
1950 if peer.state.is_available() &&
1951 peer.common_number >= sync.target_number() &&
1952 self.disconnected_peers.is_peer_available(&id)
1953 {
1954 peer.state = PeerSyncState::DownloadingState;
1955 let request = sync.next_request();
1956 trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
1957 self.allowed_requests.clear();
1958 return Some((*id, request));
1959 }
1960 }
1961 }
1962 None
1963 }
1964
1965 #[must_use]
1966 fn on_state_data(&mut self, peer_id: &PeerId, response: &[u8]) -> Result<(), BadPeer> {
1967 let response = match StateResponse::decode(response) {
1968 Ok(response) => response,
1969 Err(error) => {
1970 debug!(
1971 target: LOG_TARGET,
1972 "Failed to decode state response from peer {peer_id:?}: {error:?}.",
1973 );
1974
1975 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
1976 },
1977 };
1978
1979 if let Some(peer) = self.peers.get_mut(peer_id) {
1980 if let PeerSyncState::DownloadingState = peer.state {
1981 peer.state = PeerSyncState::Available;
1982 self.allowed_requests.set_all();
1983 }
1984 }
1985 let import_result = if let Some(sync) = &mut self.state_sync {
1986 debug!(
1987 target: LOG_TARGET,
1988 "Importing state data from {} with {} keys, {} proof nodes.",
1989 peer_id,
1990 response.entries.len(),
1991 response.proof.len(),
1992 );
1993 sync.import(response)
1994 } else {
1995 debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
1996 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1997 };
1998
1999 match import_result {
2000 ImportResult::Import(hash, header, state, body, justifications) => {
2001 let origin = BlockOrigin::NetworkInitialSync;
2002 let block = IncomingBlock {
2003 hash,
2004 header: Some(header),
2005 body,
2006 indexed_body: None,
2007 justifications,
2008 origin: None,
2009 allow_missing_state: true,
2010 import_existing: true,
2011 skip_execution: self.skip_execution(),
2012 state: Some(state),
2013 };
2014 debug!(target: LOG_TARGET, "State download is complete. Import is queued");
2015 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
2016 Ok(())
2017 },
2018 ImportResult::Continue => Ok(()),
2019 ImportResult::BadResponse => {
2020 debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
2021 Err(BadPeer(*peer_id, rep::BAD_BLOCK))
2022 },
2023 }
2024 }
2025
2026 fn attempt_state_sync(
2027 &mut self,
2028 finalized_hash: B::Hash,
2029 finalized_number: NumberFor<B>,
2030 skip_proofs: bool,
2031 ) {
2032 let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect();
2033 heads.sort();
2034 let median = heads[heads.len() / 2];
2035 if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
2036 if let Ok(Some(header)) = self.client.header(finalized_hash) {
2037 log::debug!(
2038 target: LOG_TARGET,
2039 "Starting state sync for #{finalized_number} ({finalized_hash})",
2040 );
2041 self.state_sync =
2042 Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs));
2043 self.allowed_requests.set_all();
2044 } else {
2045 log::error!(
2046 target: LOG_TARGET,
2047 "Failed to start state sync: header for finalized block \
2048 #{finalized_number} ({finalized_hash}) is not available",
2049 );
2050 debug_assert!(false);
2051 }
2052 }
2053 }
2054
2055 #[cfg(test)]
2057 #[must_use]
2058 fn take_actions(&mut self) -> impl Iterator<Item = SyncingAction<B>> {
2059 std::mem::take(&mut self.actions).into_iter()
2060 }
2061}
2062
2063fn legacy_justification_mapping(
2068 justification: Option<EncodedJustification>,
2069) -> Option<Justifications> {
2070 justification.map(|just| (*b"FRNK", just).into())
2071}
2072
2073fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> {
2076 BlockRequest::<B> {
2077 id: 0,
2078 fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
2079 from: FromBlock::Number(block),
2080 direction: Direction::Ascending,
2081 max: Some(1),
2082 }
2083}
2084
2085#[derive(Copy, Clone, Eq, PartialEq, Debug)]
2088pub(crate) enum AncestorSearchState<B: BlockT> {
2089 ExponentialBackoff(NumberFor<B>),
2092 BinarySearch(NumberFor<B>, NumberFor<B>),
2095}
2096
2097fn handle_ancestor_search_state<B: BlockT>(
2105 state: &AncestorSearchState<B>,
2106 curr_block_num: NumberFor<B>,
2107 block_hash_match: bool,
2108) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
2109 let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
2110 match state {
2111 AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
2112 let next_distance_to_tip = *next_distance_to_tip;
2113 if block_hash_match && next_distance_to_tip == One::one() {
2114 return None;
2117 }
2118 if block_hash_match {
2119 let left = curr_block_num;
2120 let right = left + next_distance_to_tip / two;
2121 let middle = left + (right - left) / two;
2122 Some((AncestorSearchState::BinarySearch(left, right), middle))
2123 } else {
2124 let next_block_num =
2125 curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or_else(Zero::zero);
2126 let next_distance_to_tip = next_distance_to_tip * two;
2127 Some((
2128 AncestorSearchState::ExponentialBackoff(next_distance_to_tip),
2129 next_block_num,
2130 ))
2131 }
2132 },
2133 AncestorSearchState::BinarySearch(mut left, mut right) => {
2134 if left >= curr_block_num {
2135 return None;
2136 }
2137 if block_hash_match {
2138 left = curr_block_num;
2139 } else {
2140 right = curr_block_num;
2141 }
2142 assert!(right >= left);
2143 let middle = left + (right - left) / two;
2144 if middle == curr_block_num {
2145 None
2146 } else {
2147 Some((AncestorSearchState::BinarySearch(left, right), middle))
2148 }
2149 },
2150 }
2151}
2152
2153fn peer_block_request<B: BlockT>(
2155 id: &PeerId,
2156 peer: &PeerSync<B>,
2157 blocks: &mut BlockCollection<B>,
2158 attrs: BlockAttributes,
2159 max_parallel_downloads: u32,
2160 max_blocks_per_request: u32,
2161 finalized: NumberFor<B>,
2162 best_num: NumberFor<B>,
2163) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2164 if best_num >= peer.best_number {
2165 return None;
2167 } else if peer.common_number < finalized {
2168 trace!(
2169 target: LOG_TARGET,
2170 "Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
2171 id, peer.common_number, finalized, peer.best_number, best_num,
2172 );
2173 }
2174 let range = blocks.needed_blocks(
2175 *id,
2176 max_blocks_per_request,
2177 peer.best_number,
2178 peer.common_number,
2179 max_parallel_downloads,
2180 MAX_DOWNLOAD_AHEAD,
2181 )?;
2182
2183 let last = range.end.saturating_sub(One::one());
2185
2186 let from = if peer.best_number == last {
2187 FromBlock::Hash(peer.best_hash)
2188 } else {
2189 FromBlock::Number(last)
2190 };
2191
2192 let request = BlockRequest::<B> {
2193 id: 0,
2194 fields: attrs,
2195 from,
2196 direction: Direction::Descending,
2197 max: Some((range.end - range.start).saturated_into::<u32>()),
2198 };
2199
2200 Some((range, request))
2201}
2202
2203fn peer_gap_block_request<B: BlockT>(
2205 id: &PeerId,
2206 peer: &PeerSync<B>,
2207 blocks: &mut BlockCollection<B>,
2208 attrs: BlockAttributes,
2209 target: NumberFor<B>,
2210 common_number: NumberFor<B>,
2211 max_blocks_per_request: u32,
2212) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2213 let range = blocks.needed_blocks(
2214 *id,
2215 max_blocks_per_request,
2216 std::cmp::min(peer.best_number, target),
2217 common_number,
2218 1,
2219 MAX_DOWNLOAD_AHEAD,
2220 )?;
2221
2222 let last = range.end.saturating_sub(One::one());
2224 let from = FromBlock::Number(last);
2225
2226 let request = BlockRequest::<B> {
2227 id: 0,
2228 fields: attrs,
2229 from,
2230 direction: Direction::Descending,
2231 max: Some((range.end - range.start).saturated_into::<u32>()),
2232 };
2233 Some((range, request))
2234}
2235
2236fn fork_sync_request<B: BlockT>(
2238 id: &PeerId,
2239 fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
2240 best_num: NumberFor<B>,
2241 finalized: NumberFor<B>,
2242 attributes: BlockAttributes,
2243 check_block: impl Fn(&B::Hash) -> BlockStatus,
2244 max_blocks_per_request: u32,
2245 metrics: Option<&Metrics>,
2246) -> Option<(B::Hash, BlockRequest<B>)> {
2247 fork_targets.retain(|hash, r| {
2248 if r.number <= finalized {
2249 trace!(
2250 target: LOG_TARGET,
2251 "Removed expired fork sync request {:?} (#{})",
2252 hash,
2253 r.number,
2254 );
2255 return false;
2256 }
2257 if check_block(hash) != BlockStatus::Unknown {
2258 trace!(
2259 target: LOG_TARGET,
2260 "Removed obsolete fork sync request {:?} (#{})",
2261 hash,
2262 r.number,
2263 );
2264 return false;
2265 }
2266 true
2267 });
2268 if let Some(metrics) = metrics {
2269 metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
2270 }
2271 for (hash, r) in fork_targets {
2272 if !r.peers.contains(&id) {
2273 continue;
2274 }
2275 if r.number <= best_num ||
2278 (r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
2279 {
2280 let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
2281 let count = if parent_status == BlockStatus::Unknown {
2282 (r.number - finalized).saturated_into::<u32>() } else {
2284 1
2286 };
2287 trace!(
2288 target: LOG_TARGET,
2289 "Downloading requested fork {hash:?} from {id}, {count} blocks",
2290 );
2291 return Some((
2292 *hash,
2293 BlockRequest::<B> {
2294 id: 0,
2295 fields: attributes,
2296 from: FromBlock::Hash(*hash),
2297 direction: Direction::Descending,
2298 max: Some(count),
2299 },
2300 ));
2301 } else {
2302 trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
2303 }
2304 }
2305 None
2306}
2307
2308fn is_descendent_of<Block, T>(
2310 client: &T,
2311 base: &Block::Hash,
2312 block: &Block::Hash,
2313) -> sp_blockchain::Result<bool>
2314where
2315 Block: BlockT,
2316 T: HeaderMetadata<Block, Error = sp_blockchain::Error> + ?Sized,
2317{
2318 if base == block {
2319 return Ok(false);
2320 }
2321
2322 let ancestor = sp_blockchain::lowest_common_ancestor(client, *block, *base)?;
2323
2324 Ok(ancestor.hash == *base)
2325}
2326
2327pub fn validate_blocks<Block: BlockT>(
2332 blocks: &Vec<BlockData<Block>>,
2333 peer_id: &PeerId,
2334 request: Option<BlockRequest<Block>>,
2335) -> Result<Option<NumberFor<Block>>, BadPeer> {
2336 if let Some(request) = request {
2337 if Some(blocks.len() as _) > request.max {
2338 debug!(
2339 target: LOG_TARGET,
2340 "Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
2341 peer_id,
2342 request.max,
2343 blocks.len(),
2344 );
2345
2346 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2347 }
2348
2349 let block_header =
2350 if request.direction == Direction::Descending { blocks.last() } else { blocks.first() }
2351 .and_then(|b| b.header.as_ref());
2352
2353 let expected_block = block_header.as_ref().map_or(false, |h| match request.from {
2354 FromBlock::Hash(hash) => h.hash() == hash,
2355 FromBlock::Number(n) => h.number() == &n,
2356 });
2357
2358 if !expected_block {
2359 debug!(
2360 target: LOG_TARGET,
2361 "Received block that was not requested. Requested {:?}, got {:?}.",
2362 request.from,
2363 block_header,
2364 );
2365
2366 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2367 }
2368
2369 if request.fields.contains(BlockAttributes::HEADER) &&
2370 blocks.iter().any(|b| b.header.is_none())
2371 {
2372 trace!(
2373 target: LOG_TARGET,
2374 "Missing requested header for a block in response from {peer_id}.",
2375 );
2376
2377 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2378 }
2379
2380 if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
2381 {
2382 trace!(
2383 target: LOG_TARGET,
2384 "Missing requested body for a block in response from {peer_id}.",
2385 );
2386
2387 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2388 }
2389 }
2390
2391 for b in blocks {
2392 if let Some(header) = &b.header {
2393 let hash = header.hash();
2394 if hash != b.hash {
2395 debug!(
2396 target: LOG_TARGET,
2397 "Bad header received from {}. Expected hash {:?}, got {:?}",
2398 peer_id,
2399 b.hash,
2400 hash,
2401 );
2402 return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2403 }
2404 }
2405 }
2406
2407 Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
2408}