1use crate::{
32 block_relay_protocol::{BlockDownloader, BlockResponseError},
33 blocks::BlockCollection,
34 justification_requests::ExtraRequests,
35 schema::v1::{StateRequest, StateResponse},
36 service::network::NetworkServiceHandle,
37 strategy::{
38 disconnected_peers::DisconnectedPeers,
39 state_sync::{ImportResult, StateSync, StateSyncProvider},
40 warp::{WarpSyncPhase, WarpSyncProgress},
41 StrategyKey, SyncingAction, SyncingStrategy,
42 },
43 types::{BadPeer, SyncState, SyncStatus},
44 LOG_TARGET,
45};
46
47use futures::{channel::oneshot, FutureExt};
48use log::{debug, error, info, trace, warn};
49use pezsc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
50use pezsc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
51use pezsc_network::{IfDisconnected, ProtocolName};
52use pezsc_network_common::sync::message::{
53 BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
54};
55use pezsc_network_types::PeerId;
56use pezsp_arithmetic::traits::Saturating;
57use pezsp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
58use pezsp_consensus::{BlockOrigin, BlockStatus};
59use pezsp_runtime::{
60 traits::{
61 Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One, SaturatedConversion, Zero,
62 },
63 EncodedJustification, Justifications,
64};
65use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
66use prost::Message;
67
68use std::{
69 any::Any,
70 collections::{HashMap, HashSet},
71 ops::Range,
72 sync::Arc,
73};
74
75#[cfg(test)]
76mod test;
77
78const MAX_IMPORTING_BLOCKS: usize = 2048;
80
81const MAX_DOWNLOAD_AHEAD: u32 = 2048;
83
84const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
87
88const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
90
91const MAJOR_SYNC_BLOCKS: u8 = 5;
97
98mod rep {
99 use pezsc_network::ReputationChange as Rep;
100 pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
103
104 pub const GENESIS_MISMATCH: Rep = Rep::new(i32::MIN, "Genesis mismatch");
107
108 pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
110
111 pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
113
114 pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
116
117 pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
119
120 pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
122
123 pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
125
126 pub const UNKNOWN_ANCESTOR: Rep = Rep::new(-(1 << 16), "DB Error");
128
129 pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
131
132 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
134}
135
136struct Metrics {
137 queued_blocks: Gauge<U64>,
138 fork_targets: Gauge<U64>,
139}
140
141impl Metrics {
142 fn register(r: &Registry) -> Result<Self, PrometheusError> {
143 Ok(Self {
144 queued_blocks: {
145 let g = Gauge::new(
146 "bizinikiwi_sync_queued_blocks",
147 "Number of blocks in import queue",
148 )?;
149 register(g, r)?
150 },
151 fork_targets: {
152 let g = Gauge::new("bizinikiwi_sync_fork_targets", "Number of fork sync targets")?;
153 register(g, r)?
154 },
155 })
156 }
157}
158
159#[derive(Debug, Clone)]
160enum AllowedRequests {
161 Some(HashSet<PeerId>),
162 All,
163}
164
165impl AllowedRequests {
166 fn add(&mut self, id: &PeerId) {
167 if let Self::Some(ref mut set) = self {
168 set.insert(*id);
169 }
170 }
171
172 fn take(&mut self) -> Self {
173 std::mem::take(self)
174 }
175
176 fn set_all(&mut self) {
177 *self = Self::All;
178 }
179
180 fn contains(&self, id: &PeerId) -> bool {
181 match self {
182 Self::Some(set) => set.contains(id),
183 Self::All => true,
184 }
185 }
186
187 fn is_empty(&self) -> bool {
188 match self {
189 Self::Some(set) => set.is_empty(),
190 Self::All => false,
191 }
192 }
193
194 fn clear(&mut self) {
195 std::mem::take(self);
196 }
197}
198
199impl Default for AllowedRequests {
200 fn default() -> Self {
201 Self::Some(HashSet::default())
202 }
203}
204
205struct GapSync<B: BlockT> {
206 blocks: BlockCollection<B>,
207 best_queued_number: NumberFor<B>,
208 target: NumberFor<B>,
209}
210
211#[derive(Copy, Clone, Debug, Eq, PartialEq)]
213pub enum ChainSyncMode {
214 Full,
216 LightState {
218 skip_proofs: bool,
220 storage_chain_mode: bool,
222 },
223}
224
225#[derive(Debug, Clone)]
227pub(crate) struct PeerSync<B: BlockT> {
228 pub peer_id: PeerId,
230 pub common_number: NumberFor<B>,
233 pub best_hash: B::Hash,
235 pub best_number: NumberFor<B>,
237 pub state: PeerSyncState<B>,
240}
241
242impl<B: BlockT> PeerSync<B> {
243 fn update_common_number(&mut self, new_common: NumberFor<B>) {
245 if self.common_number < new_common {
246 trace!(
247 target: LOG_TARGET,
248 "Updating peer {} common number from={} => to={}.",
249 self.peer_id,
250 self.common_number,
251 new_common,
252 );
253 self.common_number = new_common;
254 }
255 }
256}
257
258struct ForkTarget<B: BlockT> {
259 number: NumberFor<B>,
260 parent_hash: Option<B::Hash>,
261 peers: HashSet<PeerId>,
262}
263
264#[derive(Copy, Clone, Eq, PartialEq, Debug)]
269pub(crate) enum PeerSyncState<B: BlockT> {
270 Available,
272 AncestorSearch { start: NumberFor<B>, current: NumberFor<B>, state: AncestorSearchState<B> },
274 DownloadingNew(NumberFor<B>),
276 DownloadingStale(B::Hash),
280 DownloadingJustification(B::Hash),
282 DownloadingState,
284 DownloadingGap(NumberFor<B>),
286}
287
288impl<B: BlockT> PeerSyncState<B> {
289 pub fn is_available(&self) -> bool {
290 matches!(self, Self::Available)
291 }
292}
293
294pub struct ChainSync<B: BlockT, Client> {
297 client: Arc<Client>,
299 peers: HashMap<PeerId, PeerSync<B>>,
301 disconnected_peers: DisconnectedPeers,
302 blocks: BlockCollection<B>,
304 best_queued_number: NumberFor<B>,
306 best_queued_hash: B::Hash,
308 mode: ChainSyncMode,
310 extra_justifications: ExtraRequests<B>,
312 queue_blocks: HashSet<B::Hash>,
315 pending_state_sync_attempt: Option<(B::Hash, NumberFor<B>, bool)>,
323 fork_targets: HashMap<B::Hash, ForkTarget<B>>,
325 allowed_requests: AllowedRequests,
327 max_parallel_downloads: u32,
329 max_blocks_per_request: u32,
331 state_request_protocol_name: ProtocolName,
333 downloaded_blocks: usize,
335 state_sync: Option<StateSync<B, Client>>,
337 import_existing: bool,
340 block_downloader: Arc<dyn BlockDownloader<B>>,
342 gap_sync: Option<GapSync<B>>,
344 actions: Vec<SyncingAction<B>>,
346 metrics: Option<Metrics>,
348}
349
350impl<B, Client> SyncingStrategy<B> for ChainSync<B, Client>
351where
352 B: BlockT,
353 Client: HeaderBackend<B>
354 + BlockBackend<B>
355 + HeaderMetadata<B, Error = pezsp_blockchain::Error>
356 + ProofProvider<B>
357 + Send
358 + Sync
359 + 'static,
360{
361 fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
362 match self.add_peer_inner(peer_id, best_hash, best_number) {
363 Ok(Some(request)) => {
364 let action = self.create_block_request_action(peer_id, request);
365 self.actions.push(action);
366 },
367 Ok(None) => {},
368 Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)),
369 }
370 }
371
372 fn remove_peer(&mut self, peer_id: &PeerId) {
373 self.blocks.clear_peer_download(peer_id);
374 if let Some(gap_sync) = &mut self.gap_sync {
375 gap_sync.blocks.clear_peer_download(peer_id)
376 }
377
378 if let Some(state) = self.peers.remove(peer_id) {
379 if !state.state.is_available() {
380 if let Some(bad_peer) =
381 self.disconnected_peers.on_disconnect_during_request(*peer_id)
382 {
383 self.actions.push(SyncingAction::DropPeer(bad_peer));
384 }
385 }
386 }
387
388 self.extra_justifications.peer_disconnected(peer_id);
389 self.allowed_requests.set_all();
390 self.fork_targets.retain(|_, target| {
391 target.peers.remove(peer_id);
392 !target.peers.is_empty()
393 });
394 if let Some(metrics) = &self.metrics {
395 metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
396 }
397
398 let blocks = self.ready_blocks();
399
400 if !blocks.is_empty() {
401 self.validate_and_queue_blocks(blocks, false);
402 }
403 }
404
405 fn on_validated_block_announce(
406 &mut self,
407 is_best: bool,
408 peer_id: PeerId,
409 announce: &BlockAnnounce<B::Header>,
410 ) -> Option<(B::Hash, NumberFor<B>)> {
411 let number = *announce.header.number();
412 let hash = announce.header.hash();
413 let parent_status =
414 self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
415 let known_parent = parent_status != BlockStatus::Unknown;
416 let ancient_parent = parent_status == BlockStatus::InChainPruned;
417
418 let known = self.is_known(&hash);
419 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
420 peer
421 } else {
422 error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}");
423 return Some((hash, number));
424 };
425
426 if let PeerSyncState::AncestorSearch { .. } = peer.state {
427 trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
428 return None;
429 }
430
431 let peer_info = is_best.then(|| {
432 peer.best_number = number;
434 peer.best_hash = hash;
435
436 (hash, number)
437 });
438
439 if is_best {
442 if known && self.best_queued_number >= number {
443 self.update_peer_common_number(&peer_id, number);
444 } else if announce.header.parent_hash() == &self.best_queued_hash
445 || known_parent && self.best_queued_number >= number
446 {
447 self.update_peer_common_number(&peer_id, number.saturating_sub(One::one()));
448 }
449 }
450 self.allowed_requests.add(&peer_id);
451
452 if known || self.is_already_downloading(&hash) {
454 trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash);
455 if let Some(target) = self.fork_targets.get_mut(&hash) {
456 target.peers.insert(peer_id);
457 }
458 return peer_info;
459 }
460
461 if ancient_parent {
462 trace!(
463 target: LOG_TARGET,
464 "Ignored ancient block announced from {}: {} {:?}",
465 peer_id,
466 hash,
467 announce.header,
468 );
469 return peer_info;
470 }
471
472 if self.status().state == SyncState::Idle {
473 trace!(
474 target: LOG_TARGET,
475 "Added sync target for block announced from {}: {} {:?}",
476 peer_id,
477 hash,
478 announce.summary(),
479 );
480 self.fork_targets
481 .entry(hash)
482 .or_insert_with(|| {
483 if let Some(metrics) = &self.metrics {
484 metrics.fork_targets.inc();
485 }
486
487 ForkTarget {
488 number,
489 parent_hash: Some(*announce.header.parent_hash()),
490 peers: Default::default(),
491 }
492 })
493 .peers
494 .insert(peer_id);
495 }
496
497 peer_info
498 }
499
500 fn set_sync_fork_request(
502 &mut self,
503 mut peers: Vec<PeerId>,
504 hash: &B::Hash,
505 number: NumberFor<B>,
506 ) {
507 if peers.is_empty() {
508 peers = self
509 .peers
510 .iter()
511 .filter(|(_, peer)| peer.best_number >= number)
513 .map(|(id, _)| *id)
514 .collect();
515
516 debug!(
517 target: LOG_TARGET,
518 "Explicit sync request for block {hash:?} with no peers specified. \
519 Syncing from these peers {peers:?} instead.",
520 );
521 } else {
522 debug!(
523 target: LOG_TARGET,
524 "Explicit sync request for block {hash:?} with {peers:?}",
525 );
526 }
527
528 if self.is_known(hash) {
529 debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
530 return;
531 }
532
533 trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
534 for peer_id in &peers {
535 if let Some(peer) = self.peers.get_mut(peer_id) {
536 if let PeerSyncState::AncestorSearch { .. } = peer.state {
537 continue;
538 }
539
540 if number > peer.best_number {
541 peer.best_number = number;
542 peer.best_hash = *hash;
543 }
544 self.allowed_requests.add(peer_id);
545 }
546 }
547
548 self.fork_targets
549 .entry(*hash)
550 .or_insert_with(|| {
551 if let Some(metrics) = &self.metrics {
552 metrics.fork_targets.inc();
553 }
554
555 ForkTarget { number, peers: Default::default(), parent_hash: None }
556 })
557 .peers
558 .extend(peers);
559 }
560
561 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
562 let client = &self.client;
563 self.extra_justifications
564 .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block))
565 }
566
567 fn clear_justification_requests(&mut self) {
568 self.extra_justifications.reset();
569 }
570
571 fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
572 let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
573 self.extra_justifications
574 .try_finalize_root((hash, number), finalization_result, true);
575 self.allowed_requests.set_all();
576 }
577
578 fn on_generic_response(
579 &mut self,
580 peer_id: &PeerId,
581 key: StrategyKey,
582 protocol_name: ProtocolName,
583 response: Box<dyn Any + Send>,
584 ) {
585 if Self::STRATEGY_KEY != key {
586 warn!(
587 target: LOG_TARGET,
588 "Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
589 );
590 debug_assert!(false);
591 return;
592 }
593
594 if protocol_name == self.state_request_protocol_name {
595 let Ok(response) = response.downcast::<Vec<u8>>() else {
596 warn!(target: LOG_TARGET, "Failed to downcast state response");
597 debug_assert!(false);
598 return;
599 };
600
601 if let Err(bad_peer) = self.on_state_data(&peer_id, &response) {
602 self.actions.push(SyncingAction::DropPeer(bad_peer));
603 }
604 } else if &protocol_name == self.block_downloader.protocol_name() {
605 let Ok(response) = response
606 .downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
607 else {
608 warn!(target: LOG_TARGET, "Failed to downcast block response");
609 debug_assert!(false);
610 return;
611 };
612
613 let (request, response) = *response;
614 let blocks = match response {
615 Ok(blocks) => blocks,
616 Err(BlockResponseError::DecodeFailed(e)) => {
617 debug!(
618 target: LOG_TARGET,
619 "Failed to decode block response from peer {:?}: {:?}.",
620 peer_id,
621 e
622 );
623 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
624 return;
625 },
626 Err(BlockResponseError::ExtractionFailed(e)) => {
627 debug!(
628 target: LOG_TARGET,
629 "Failed to extract blocks from peer response {:?}: {:?}.",
630 peer_id,
631 e
632 );
633 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
634 return;
635 },
636 };
637
638 if let Err(bad_peer) = self.on_block_response(peer_id, key, request, blocks) {
639 self.actions.push(SyncingAction::DropPeer(bad_peer));
640 }
641 } else {
642 warn!(
643 target: LOG_TARGET,
644 "Unexpected generic response protocol {protocol_name}, strategy key \
645 {key:?}",
646 );
647 debug_assert!(false);
648 }
649 }
650
651 fn on_blocks_processed(
652 &mut self,
653 imported: usize,
654 count: usize,
655 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
656 ) {
657 trace!(target: LOG_TARGET, "Imported {imported} of {count}");
658
659 let mut has_error = false;
660 for (_, hash) in &results {
661 if self.queue_blocks.remove(hash) {
662 if let Some(metrics) = &self.metrics {
663 metrics.queued_blocks.dec();
664 }
665 }
666 self.blocks.clear_queued(hash);
667 if let Some(gap_sync) = &mut self.gap_sync {
668 gap_sync.blocks.clear_queued(hash);
669 }
670 }
671 for (result, hash) in results {
672 if has_error {
673 break;
674 }
675
676 has_error |= result.is_err();
677
678 match result {
679 Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => {
680 if let Some(peer) = peer_id {
681 self.update_peer_common_number(&peer, number);
682 }
683 self.complete_gap_if_target(number);
684 },
685 Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
686 if aux.clear_justification_requests {
687 trace!(
688 target: LOG_TARGET,
689 "Block imported clears all pending justification requests {number}: {hash:?}",
690 );
691 self.clear_justification_requests();
692 }
693
694 if aux.needs_justification {
695 trace!(
696 target: LOG_TARGET,
697 "Block imported but requires justification {number}: {hash:?}",
698 );
699 self.request_justification(&hash, number);
700 }
701
702 if aux.bad_justification {
703 if let Some(ref peer) = peer_id {
704 warn!("💔 Sent block with bad justification to import");
705 self.actions.push(SyncingAction::DropPeer(BadPeer(
706 *peer,
707 rep::BAD_JUSTIFICATION,
708 )));
709 }
710 }
711
712 if let Some(peer) = peer_id {
713 self.update_peer_common_number(&peer, number);
714 }
715 let state_sync_complete =
716 self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash);
717 if state_sync_complete {
718 info!(
719 target: LOG_TARGET,
720 "State sync is complete ({} MiB), restarting block sync.",
721 self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
722 );
723 self.state_sync = None;
724 self.mode = ChainSyncMode::Full;
725 self.restart();
726 }
727
728 self.complete_gap_if_target(number);
729 },
730 Err(BlockImportError::IncompleteHeader(peer_id)) => {
731 if let Some(peer) = peer_id {
732 warn!(
733 target: LOG_TARGET,
734 "💔 Peer sent block with incomplete header to import",
735 );
736 self.actions
737 .push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
738 self.restart();
739 }
740 },
741 Err(BlockImportError::VerificationFailed(peer_id, e)) => {
742 let extra_message = peer_id
743 .map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
744
745 warn!(
746 target: LOG_TARGET,
747 "💔 Verification failed for block {hash:?}{extra_message}: {e:?}",
748 );
749
750 if let Some(peer) = peer_id {
751 self.actions
752 .push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
753 }
754
755 self.restart();
756 },
757 Err(BlockImportError::BadBlock(peer_id)) => {
758 if let Some(peer) = peer_id {
759 warn!(
760 target: LOG_TARGET,
761 "💔 Block {hash:?} received from peer {peer} has been blacklisted",
762 );
763 self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
764 }
765 },
766 Err(BlockImportError::MissingState) => {
767 trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
771 },
772 e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
773 warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err());
774 self.state_sync = None;
775 self.restart();
776 },
777 Err(BlockImportError::Cancelled) => {},
778 };
779 }
780
781 self.allowed_requests.set_all();
782 }
783
784 fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
785 let client = &self.client;
786 let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
787 is_descendent_of(&**client, base, block)
788 });
789
790 if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode {
791 if self.state_sync.is_none() {
792 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
793 self.attempt_state_sync(*hash, number, *skip_proofs);
794 } else {
795 self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs));
796 }
797 }
798 }
799
800 if let Err(err) = r {
801 warn!(
802 target: LOG_TARGET,
803 "💔 Error cleaning up pending extra justification data requests: {err}",
804 );
805 }
806 }
807
808 fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
809 self.on_block_queued(best_hash, best_number);
810 }
811
812 fn is_major_syncing(&self) -> bool {
813 self.status().state.is_major_syncing()
814 }
815
816 fn num_peers(&self) -> usize {
817 self.peers.len()
818 }
819
820 fn status(&self) -> SyncStatus<B> {
821 let median_seen = self.median_seen();
822 let best_seen_block =
823 median_seen.and_then(|median| (median > self.best_queued_number).then_some(median));
824 let sync_state = if let Some(target) = median_seen {
825 let best_block = self.client.info().best_number;
829 if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() {
830 if target > self.best_queued_number {
832 SyncState::Downloading { target }
833 } else {
834 SyncState::Importing { target }
835 }
836 } else {
837 SyncState::Idle
838 }
839 } else {
840 SyncState::Idle
841 };
842
843 let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress {
844 phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
845 total_bytes: 0,
846 });
847
848 SyncStatus {
849 state: sync_state,
850 best_seen_block,
851 num_peers: self.peers.len() as u32,
852 queued_blocks: self.queue_blocks.len() as u32,
853 state_sync: self.state_sync.as_ref().map(|s| s.progress()),
854 warp_sync: warp_sync_progress,
855 }
856 }
857
858 fn num_downloaded_blocks(&self) -> usize {
859 self.downloaded_blocks
860 }
861
862 fn num_sync_requests(&self) -> usize {
863 self.fork_targets
864 .values()
865 .filter(|f| f.number <= self.best_queued_number)
866 .count()
867 }
868
869 fn actions(
870 &mut self,
871 network_service: &NetworkServiceHandle,
872 ) -> Result<Vec<SyncingAction<B>>, ClientError> {
873 if !self.peers.is_empty() && self.queue_blocks.is_empty() {
874 if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() {
875 self.attempt_state_sync(hash, number, skip_proofs);
876 }
877 }
878
879 let block_requests = self
880 .block_requests()
881 .into_iter()
882 .map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
883 .collect::<Vec<_>>();
884 self.actions.extend(block_requests);
885
886 let justification_requests = self
887 .justification_requests()
888 .into_iter()
889 .map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
890 .collect::<Vec<_>>();
891 self.actions.extend(justification_requests);
892
893 let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
894 trace!(
895 target: LOG_TARGET,
896 "Created `StrategyRequest` to {peer_id}.",
897 );
898
899 let (tx, rx) = oneshot::channel();
900
901 network_service.start_request(
902 peer_id,
903 self.state_request_protocol_name.clone(),
904 request.encode_to_vec(),
905 tx,
906 IfDisconnected::ImmediateError,
907 );
908
909 SyncingAction::StartRequest {
910 peer_id,
911 key: Self::STRATEGY_KEY,
912 request: async move {
913 Ok(rx.await?.and_then(|(response, protocol_name)| {
914 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
915 }))
916 }
917 .boxed(),
918 remove_obsolete: false,
919 }
920 });
921 self.actions.extend(state_request);
922
923 Ok(std::mem::take(&mut self.actions))
924 }
925}
926
927impl<B, Client> ChainSync<B, Client>
928where
929 B: BlockT,
930 Client: HeaderBackend<B>
931 + BlockBackend<B>
932 + HeaderMetadata<B, Error = pezsp_blockchain::Error>
933 + ProofProvider<B>
934 + Send
935 + Sync
936 + 'static,
937{
938 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("ChainSync");
940
941 pub fn new(
943 mode: ChainSyncMode,
944 client: Arc<Client>,
945 max_parallel_downloads: u32,
946 max_blocks_per_request: u32,
947 state_request_protocol_name: ProtocolName,
948 block_downloader: Arc<dyn BlockDownloader<B>>,
949 metrics_registry: Option<&Registry>,
950 initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
951 ) -> Result<Self, ClientError> {
952 let mut sync = Self {
953 client,
954 peers: HashMap::new(),
955 disconnected_peers: DisconnectedPeers::new(),
956 blocks: BlockCollection::new(),
957 best_queued_hash: Default::default(),
958 best_queued_number: Zero::zero(),
959 extra_justifications: ExtraRequests::new("justification", metrics_registry),
960 mode,
961 queue_blocks: Default::default(),
962 pending_state_sync_attempt: None,
963 fork_targets: Default::default(),
964 allowed_requests: Default::default(),
965 max_parallel_downloads,
966 max_blocks_per_request,
967 state_request_protocol_name,
968 downloaded_blocks: 0,
969 state_sync: None,
970 import_existing: false,
971 block_downloader,
972 gap_sync: None,
973 actions: Vec::new(),
974 metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
975 Ok(metrics) => Some(metrics),
976 Err(err) => {
977 log::error!(
978 target: LOG_TARGET,
979 "Failed to register `ChainSync` metrics {err:?}",
980 );
981 None
982 },
983 }),
984 };
985
986 sync.reset_sync_start_point()?;
987 initial_peers.for_each(|(peer_id, best_hash, best_number)| {
988 sync.add_peer(peer_id, best_hash, best_number);
989 });
990
991 Ok(sync)
992 }
993
994 fn complete_gap_if_target(&mut self, number: NumberFor<B>) {
996 let gap_sync_complete = self.gap_sync.as_ref().map_or(false, |s| s.target == number);
997 if gap_sync_complete {
998 info!(
999 target: LOG_TARGET,
1000 "Block history download is complete."
1001 );
1002 self.gap_sync = None;
1003 }
1004 }
1005
1006 #[must_use]
1007 fn add_peer_inner(
1008 &mut self,
1009 peer_id: PeerId,
1010 best_hash: B::Hash,
1011 best_number: NumberFor<B>,
1012 ) -> Result<Option<BlockRequest<B>>, BadPeer> {
1013 match self.block_status(&best_hash) {
1015 Err(e) => {
1016 debug!(target: LOG_TARGET, "Error reading blockchain: {e}");
1017 Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
1018 },
1019 Ok(BlockStatus::KnownBad) => {
1020 info!(
1021 "💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})."
1022 );
1023 Err(BadPeer(peer_id, rep::BAD_BLOCK))
1024 },
1025 Ok(BlockStatus::Unknown) => {
1026 if best_number.is_zero() {
1027 info!(
1028 "💔 New peer {} with unknown genesis hash {} ({}).",
1029 peer_id, best_hash, best_number,
1030 );
1031 return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH));
1032 }
1033
1034 if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS as usize {
1038 debug!(
1039 target: LOG_TARGET,
1040 "New peer {} with unknown best hash {} ({}), assuming common block.",
1041 peer_id,
1042 self.best_queued_hash,
1043 self.best_queued_number
1044 );
1045 self.peers.insert(
1046 peer_id,
1047 PeerSync {
1048 peer_id,
1049 common_number: self.best_queued_number,
1050 best_hash,
1051 best_number,
1052 state: PeerSyncState::Available,
1053 },
1054 );
1055 return Ok(None);
1056 }
1057
1058 let (state, req) = if self.best_queued_number.is_zero() {
1060 debug!(
1061 target: LOG_TARGET,
1062 "New peer {peer_id} with best hash {best_hash} ({best_number}).",
1063 );
1064
1065 (PeerSyncState::Available, None)
1066 } else {
1067 let common_best = std::cmp::min(self.best_queued_number, best_number);
1068
1069 debug!(
1070 target: LOG_TARGET,
1071 "New peer {} with unknown best hash {} ({}), searching for common ancestor.",
1072 peer_id,
1073 best_hash,
1074 best_number
1075 );
1076
1077 (
1078 PeerSyncState::AncestorSearch {
1079 current: common_best,
1080 start: self.best_queued_number,
1081 state: AncestorSearchState::ExponentialBackoff(One::one()),
1082 },
1083 Some(ancestry_request::<B>(common_best)),
1084 )
1085 };
1086
1087 self.allowed_requests.add(&peer_id);
1088 self.peers.insert(
1089 peer_id,
1090 PeerSync {
1091 peer_id,
1092 common_number: Zero::zero(),
1093 best_hash,
1094 best_number,
1095 state,
1096 },
1097 );
1098
1099 Ok(req)
1100 },
1101 Ok(BlockStatus::Queued)
1102 | Ok(BlockStatus::InChainWithState)
1103 | Ok(BlockStatus::InChainPruned) => {
1104 debug!(
1105 target: LOG_TARGET,
1106 "New peer {peer_id} with known best hash {best_hash} ({best_number}).",
1107 );
1108 self.peers.insert(
1109 peer_id,
1110 PeerSync {
1111 peer_id,
1112 common_number: std::cmp::min(self.best_queued_number, best_number),
1113 best_hash,
1114 best_number,
1115 state: PeerSyncState::Available,
1116 },
1117 );
1118 self.allowed_requests.add(&peer_id);
1119 Ok(None)
1120 },
1121 }
1122 }
1123
1124 fn create_block_request_action(
1125 &mut self,
1126 peer_id: PeerId,
1127 request: BlockRequest<B>,
1128 ) -> SyncingAction<B> {
1129 let downloader = self.block_downloader.clone();
1130
1131 SyncingAction::StartRequest {
1132 peer_id,
1133 key: Self::STRATEGY_KEY,
1134 request: async move {
1135 Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
1136 |(response, protocol_name)| {
1137 let decoded_response =
1138 downloader.block_response_into_blocks(&request, response);
1139 let result = Box::new((request, decoded_response)) as Box<dyn Any + Send>;
1140 Ok((result, protocol_name))
1141 },
1142 ))
1143 }
1144 .boxed(),
1145 remove_obsolete: true,
1148 }
1149 }
1150
1151 #[must_use]
1153 fn on_block_data(
1154 &mut self,
1155 peer_id: &PeerId,
1156 request: Option<BlockRequest<B>>,
1157 response: BlockResponse<B>,
1158 ) -> Result<(), BadPeer> {
1159 self.downloaded_blocks += response.blocks.len();
1160 let mut gap = false;
1161 let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
1162 let mut blocks = response.blocks;
1163 if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
1164 trace!(target: LOG_TARGET, "Reversing incoming block list");
1165 blocks.reverse()
1166 }
1167 self.allowed_requests.add(peer_id);
1168 if let Some(request) = request {
1169 match &mut peer.state {
1170 PeerSyncState::DownloadingNew(_) => {
1171 self.blocks.clear_peer_download(peer_id);
1172 peer.state = PeerSyncState::Available;
1173 if let Some(start_block) =
1174 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1175 {
1176 self.blocks.insert(start_block, blocks, *peer_id);
1177 }
1178 self.ready_blocks()
1179 },
1180 PeerSyncState::DownloadingGap(_) => {
1181 peer.state = PeerSyncState::Available;
1182 if let Some(gap_sync) = &mut self.gap_sync {
1183 gap_sync.blocks.clear_peer_download(peer_id);
1184 if let Some(start_block) =
1185 validate_blocks::<B>(&blocks, peer_id, Some(request))?
1186 {
1187 gap_sync.blocks.insert(start_block, blocks, *peer_id);
1188 }
1189 gap = true;
1190 let blocks: Vec<_> = gap_sync
1191 .blocks
1192 .ready_blocks(gap_sync.best_queued_number + One::one())
1193 .into_iter()
1194 .map(|block_data| {
1195 let justifications =
1196 block_data.block.justifications.or_else(|| {
1197 legacy_justification_mapping(
1198 block_data.block.justification,
1199 )
1200 });
1201 IncomingBlock {
1202 hash: block_data.block.hash,
1203 header: block_data.block.header,
1204 body: block_data.block.body,
1205 indexed_body: block_data.block.indexed_body,
1206 justifications,
1207 origin: block_data.origin,
1208 allow_missing_state: true,
1209 import_existing: self.import_existing,
1210 skip_execution: true,
1211 state: None,
1212 }
1213 })
1214 .collect();
1215 debug!(
1216 target: LOG_TARGET,
1217 "Drained {} gap blocks from {}",
1218 blocks.len(),
1219 gap_sync.best_queued_number,
1220 );
1221 blocks
1222 } else {
1223 debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
1224 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1225 }
1226 },
1227 PeerSyncState::DownloadingStale(_) => {
1228 peer.state = PeerSyncState::Available;
1229 if blocks.is_empty() {
1230 debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
1231 return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1232 }
1233 validate_blocks::<B>(&blocks, peer_id, Some(request))?;
1234 blocks
1235 .into_iter()
1236 .map(|b| {
1237 let justifications = b
1238 .justifications
1239 .or_else(|| legacy_justification_mapping(b.justification));
1240 IncomingBlock {
1241 hash: b.hash,
1242 header: b.header,
1243 body: b.body,
1244 indexed_body: None,
1245 justifications,
1246 origin: Some(*peer_id),
1247 allow_missing_state: true,
1248 import_existing: self.import_existing,
1249 skip_execution: self.skip_execution(),
1250 state: None,
1251 }
1252 })
1253 .collect()
1254 },
1255 PeerSyncState::AncestorSearch { current, start, state } => {
1256 let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
1257 (Some(block), Ok(maybe_our_block_hash)) => {
1258 trace!(
1259 target: LOG_TARGET,
1260 "Got ancestry block #{} ({}) from peer {}",
1261 current,
1262 block.hash,
1263 peer_id,
1264 );
1265 maybe_our_block_hash.filter(|x| x == &block.hash)
1266 },
1267 (None, _) => {
1268 debug!(
1269 target: LOG_TARGET,
1270 "Invalid response when searching for ancestor from {peer_id}",
1271 );
1272 return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR));
1273 },
1274 (_, Err(e)) => {
1275 info!(
1276 target: LOG_TARGET,
1277 "❌ Error answering legitimate blockchain query: {e}",
1278 );
1279 return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR));
1280 },
1281 };
1282 if matching_hash.is_some() {
1283 if *start < self.best_queued_number
1284 && self.best_queued_number <= peer.best_number
1285 {
1286 trace!(
1290 target: LOG_TARGET,
1291 "Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
1292 *peer_id,
1293 peer.common_number,
1294 self.best_queued_number,
1295 );
1296 peer.common_number = self.best_queued_number;
1297 } else if peer.common_number < *current {
1298 trace!(
1299 target: LOG_TARGET,
1300 "Ancestry search: updating peer {} common number from={} => to={}.",
1301 *peer_id,
1302 peer.common_number,
1303 *current,
1304 );
1305 peer.common_number = *current;
1306 }
1307 }
1308 if matching_hash.is_none() && current.is_zero() {
1309 trace!(
1310 target: LOG_TARGET,
1311 "Ancestry search: genesis mismatch for peer {peer_id}",
1312 );
1313 return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH));
1314 }
1315 if let Some((next_state, next_num)) =
1316 handle_ancestor_search_state(state, *current, matching_hash.is_some())
1317 {
1318 peer.state = PeerSyncState::AncestorSearch {
1319 current: next_num,
1320 start: *start,
1321 state: next_state,
1322 };
1323 let request = ancestry_request::<B>(next_num);
1324 let action = self.create_block_request_action(*peer_id, request);
1325 self.actions.push(action);
1326 return Ok(());
1327 } else {
1328 trace!(
1331 target: LOG_TARGET,
1332 "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
1333 self.best_queued_hash,
1334 self.best_queued_number,
1335 peer.best_hash,
1336 peer.best_number,
1337 matching_hash,
1338 peer.common_number,
1339 );
1340 if peer.common_number < peer.best_number
1341 && peer.best_number < self.best_queued_number
1342 {
1343 trace!(
1344 target: LOG_TARGET,
1345 "Added fork target {} for {}",
1346 peer.best_hash,
1347 peer_id,
1348 );
1349 self.fork_targets
1350 .entry(peer.best_hash)
1351 .or_insert_with(|| {
1352 if let Some(metrics) = &self.metrics {
1353 metrics.fork_targets.inc();
1354 }
1355
1356 ForkTarget {
1357 number: peer.best_number,
1358 parent_hash: None,
1359 peers: Default::default(),
1360 }
1361 })
1362 .peers
1363 .insert(*peer_id);
1364 }
1365 peer.state = PeerSyncState::Available;
1366 return Ok(());
1367 }
1368 },
1369 PeerSyncState::Available
1370 | PeerSyncState::DownloadingJustification(..)
1371 | PeerSyncState::DownloadingState => Vec::new(),
1372 }
1373 } else {
1374 validate_blocks::<B>(&blocks, peer_id, None)?;
1376 blocks
1377 .into_iter()
1378 .map(|b| {
1379 let justifications = b
1380 .justifications
1381 .or_else(|| legacy_justification_mapping(b.justification));
1382 IncomingBlock {
1383 hash: b.hash,
1384 header: b.header,
1385 body: b.body,
1386 indexed_body: None,
1387 justifications,
1388 origin: Some(*peer_id),
1389 allow_missing_state: true,
1390 import_existing: false,
1391 skip_execution: true,
1392 state: None,
1393 }
1394 })
1395 .collect()
1396 }
1397 } else {
1398 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1400 };
1401
1402 self.validate_and_queue_blocks(new_blocks, gap);
1403
1404 Ok(())
1405 }
1406
1407 fn on_block_response(
1408 &mut self,
1409 peer_id: &PeerId,
1410 key: StrategyKey,
1411 request: BlockRequest<B>,
1412 blocks: Vec<BlockData<B>>,
1413 ) -> Result<(), BadPeer> {
1414 if key != Self::STRATEGY_KEY {
1415 error!(
1416 target: LOG_TARGET,
1417 "`on_block_response()` called with unexpected key {key:?} for chain sync",
1418 );
1419 debug_assert!(false);
1420 }
1421 let block_response = BlockResponse::<B> { id: request.id, blocks };
1422
1423 let blocks_range = || match (
1424 block_response
1425 .blocks
1426 .first()
1427 .and_then(|b| b.header.as_ref().map(|h| h.number())),
1428 block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
1429 ) {
1430 (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
1431 (Some(first), Some(_)) => format!(" ({})", first),
1432 _ => Default::default(),
1433 };
1434 trace!(
1435 target: LOG_TARGET,
1436 "BlockResponse {} from {} with {} blocks {}",
1437 block_response.id,
1438 peer_id,
1439 block_response.blocks.len(),
1440 blocks_range(),
1441 );
1442
1443 if request.fields == BlockAttributes::JUSTIFICATION {
1444 self.on_block_justification(*peer_id, block_response)
1445 } else {
1446 self.on_block_data(peer_id, Some(request), block_response)
1447 }
1448 }
1449
1450 #[must_use]
1452 fn on_block_justification(
1453 &mut self,
1454 peer_id: PeerId,
1455 response: BlockResponse<B>,
1456 ) -> Result<(), BadPeer> {
1457 let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
1458 peer
1459 } else {
1460 error!(
1461 target: LOG_TARGET,
1462 "💔 Called on_block_justification with a peer ID of an unknown peer",
1463 );
1464 return Ok(());
1465 };
1466
1467 self.allowed_requests.add(&peer_id);
1468 if let PeerSyncState::DownloadingJustification(hash) = peer.state {
1469 peer.state = PeerSyncState::Available;
1470
1471 let justification = if let Some(block) = response.blocks.into_iter().next() {
1473 if hash != block.hash {
1474 warn!(
1475 target: LOG_TARGET,
1476 "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}",
1477 peer_id,
1478 hash,
1479 block.hash,
1480 );
1481 return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION));
1482 }
1483
1484 block
1485 .justifications
1486 .or_else(|| legacy_justification_mapping(block.justification))
1487 } else {
1488 trace!(
1491 target: LOG_TARGET,
1492 "Peer {peer_id:?} provided empty response for justification request {hash:?}",
1493 );
1494
1495 None
1496 };
1497
1498 if let Some((peer_id, hash, number, justifications)) =
1499 self.extra_justifications.on_response(peer_id, justification)
1500 {
1501 self.actions.push(SyncingAction::ImportJustifications {
1502 peer_id,
1503 hash,
1504 number,
1505 justifications,
1506 });
1507 return Ok(());
1508 }
1509 }
1510
1511 Ok(())
1512 }
1513
1514 fn median_seen(&self) -> Option<NumberFor<B>> {
1516 let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
1517
1518 if best_seens.is_empty() {
1519 None
1520 } else {
1521 let middle = best_seens.len() / 2;
1522
1523 Some(*best_seens.select_nth_unstable(middle).1)
1525 }
1526 }
1527
1528 fn required_block_attributes(&self) -> BlockAttributes {
1529 match self.mode {
1530 ChainSyncMode::Full => {
1531 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
1532 },
1533 ChainSyncMode::LightState { storage_chain_mode: false, .. } => {
1534 BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
1535 },
1536 ChainSyncMode::LightState { storage_chain_mode: true, .. } => {
1537 BlockAttributes::HEADER
1538 | BlockAttributes::JUSTIFICATION
1539 | BlockAttributes::INDEXED_BODY
1540 },
1541 }
1542 }
1543
1544 fn skip_execution(&self) -> bool {
1545 match self.mode {
1546 ChainSyncMode::Full => false,
1547 ChainSyncMode::LightState { .. } => true,
1548 }
1549 }
1550
1551 fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec<IncomingBlock<B>>, gap: bool) {
1552 let orig_len = new_blocks.len();
1553 new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
1554 if new_blocks.len() != orig_len {
1555 debug!(
1556 target: LOG_TARGET,
1557 "Ignoring {} blocks that are already queued",
1558 orig_len - new_blocks.len(),
1559 );
1560 }
1561
1562 let origin = if !gap && !self.status().state.is_major_syncing() {
1563 BlockOrigin::NetworkBroadcast
1564 } else {
1565 BlockOrigin::NetworkInitialSync
1566 };
1567
1568 if let Some((h, n)) = new_blocks
1569 .last()
1570 .and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
1571 {
1572 trace!(
1573 target: LOG_TARGET,
1574 "Accepted {} blocks ({:?}) with origin {:?}",
1575 new_blocks.len(),
1576 h,
1577 origin,
1578 );
1579 self.on_block_queued(h, n)
1580 }
1581 self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
1582 if let Some(metrics) = &self.metrics {
1583 metrics
1584 .queued_blocks
1585 .set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
1586 }
1587
1588 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks })
1589 }
1590
1591 fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
1592 if let Some(peer) = self.peers.get_mut(peer_id) {
1593 peer.update_common_number(new_common);
1594 }
1595 }
1596
1597 fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
1602 if self.fork_targets.remove(hash).is_some() {
1603 if let Some(metrics) = &self.metrics {
1604 metrics.fork_targets.dec();
1605 }
1606 trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
1607 }
1608 if let Some(gap_sync) = &mut self.gap_sync {
1609 if number > gap_sync.best_queued_number && number <= gap_sync.target {
1610 gap_sync.best_queued_number = number;
1611 }
1612 }
1613 if number > self.best_queued_number {
1614 self.best_queued_number = number;
1615 self.best_queued_hash = *hash;
1616 for (n, peer) in self.peers.iter_mut() {
1618 if let PeerSyncState::AncestorSearch { .. } = peer.state {
1619 continue;
1621 }
1622 let new_common_number =
1623 if peer.best_number >= number { number } else { peer.best_number };
1624 trace!(
1625 target: LOG_TARGET,
1626 "Updating peer {} info, ours={}, common={}->{}, their best={}",
1627 n,
1628 number,
1629 peer.common_number,
1630 new_common_number,
1631 peer.best_number,
1632 );
1633 peer.common_number = new_common_number;
1634 }
1635 }
1636 self.allowed_requests.set_all();
1637 }
1638
1639 fn restart(&mut self) {
1643 self.blocks.clear();
1644 if let Err(e) = self.reset_sync_start_point() {
1645 warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}");
1646 }
1647 self.allowed_requests.set_all();
1648 debug!(
1649 target: LOG_TARGET,
1650 "Restarted with {} ({})",
1651 self.best_queued_number,
1652 self.best_queued_hash,
1653 );
1654 let old_peers = std::mem::take(&mut self.peers);
1655
1656 old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| {
1657 match peer_sync.state {
1658 PeerSyncState::Available => {
1659 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1660 },
1661 PeerSyncState::AncestorSearch { .. }
1662 | PeerSyncState::DownloadingNew(_)
1663 | PeerSyncState::DownloadingStale(_)
1664 | PeerSyncState::DownloadingGap(_)
1665 | PeerSyncState::DownloadingState => {
1666 self.actions
1668 .push(SyncingAction::CancelRequest { peer_id, key: Self::STRATEGY_KEY });
1669 self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1670 },
1671 PeerSyncState::DownloadingJustification(_) => {
1672 trace!(
1676 target: LOG_TARGET,
1677 "Keeping peer {} after restart, updating common number from={} => to={} (our best).",
1678 peer_id,
1679 peer_sync.common_number,
1680 self.best_queued_number,
1681 );
1682 peer_sync.common_number = self.best_queued_number;
1683 self.peers.insert(peer_id, peer_sync);
1684 },
1685 }
1686 });
1687 }
1688
1689 fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
1692 let info = self.client.info();
1693 debug!(target: LOG_TARGET, "Restarting sync with client info {info:?}");
1694
1695 if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
1696 warn!(
1697 target: LOG_TARGET,
1698 "Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
1699 );
1700 self.mode = ChainSyncMode::Full;
1701 }
1702
1703 self.import_existing = false;
1704 self.best_queued_hash = info.best_hash;
1705 self.best_queued_number = info.best_number;
1706
1707 if self.mode == ChainSyncMode::Full
1708 && self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
1709 {
1710 self.import_existing = true;
1711 if let Some((hash, number)) = info.finalized_state {
1713 debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
1714 self.best_queued_hash = hash;
1715 self.best_queued_number = number;
1716 } else {
1717 debug!(target: LOG_TARGET, "Restarting from genesis");
1718 self.best_queued_hash = Default::default();
1719 self.best_queued_number = Zero::zero();
1720 }
1721 }
1722
1723 if let Some(BlockGap { start, end, .. }) = info.block_gap {
1724 let old_gap = self.gap_sync.take().map(|g| (g.best_queued_number, g.target));
1725 debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end} (old gap best and target: {old_gap:?})");
1726 self.gap_sync = Some(GapSync {
1727 best_queued_number: start - One::one(),
1728 target: end,
1729 blocks: BlockCollection::new(),
1730 });
1731 }
1732 trace!(
1733 target: LOG_TARGET,
1734 "Restarted sync at #{} ({:?})",
1735 self.best_queued_number,
1736 self.best_queued_hash,
1737 );
1738 Ok(())
1739 }
1740
1741 fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
1743 if self.queue_blocks.contains(hash) {
1744 return Ok(BlockStatus::Queued);
1745 }
1746 self.client.block_status(*hash)
1747 }
1748
1749 fn is_known(&self, hash: &B::Hash) -> bool {
1751 self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
1752 }
1753
1754 fn is_already_downloading(&self, hash: &B::Hash) -> bool {
1756 self.peers
1757 .iter()
1758 .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
1759 }
1760
1761 fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
1763 self.blocks
1764 .ready_blocks(self.best_queued_number + One::one())
1765 .into_iter()
1766 .map(|block_data| {
1767 let justifications = block_data
1768 .block
1769 .justifications
1770 .or_else(|| legacy_justification_mapping(block_data.block.justification));
1771 IncomingBlock {
1772 hash: block_data.block.hash,
1773 header: block_data.block.header,
1774 body: block_data.block.body,
1775 indexed_body: block_data.block.indexed_body,
1776 justifications,
1777 origin: block_data.origin,
1778 allow_missing_state: true,
1779 import_existing: self.import_existing,
1780 skip_execution: self.skip_execution(),
1781 state: None,
1782 }
1783 })
1784 .collect()
1785 }
1786
1787 fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1789 let peers = &mut self.peers;
1790 let mut matcher = self.extra_justifications.matcher();
1791 std::iter::from_fn(move || {
1792 if let Some((peer, request)) = matcher.next(peers) {
1793 peers
1794 .get_mut(&peer)
1795 .expect(
1796 "`Matcher::next` guarantees the `PeerId` comes from the given peers; qed",
1797 )
1798 .state = PeerSyncState::DownloadingJustification(request.0);
1799 let req = BlockRequest::<B> {
1800 id: 0,
1801 fields: BlockAttributes::JUSTIFICATION,
1802 from: FromBlock::Hash(request.0),
1803 direction: Direction::Ascending,
1804 max: Some(1),
1805 };
1806 Some((peer, req))
1807 } else {
1808 None
1809 }
1810 })
1811 .collect()
1812 }
1813
1814 fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1816 if self.allowed_requests.is_empty() || self.state_sync.is_some() {
1817 return Vec::new();
1818 }
1819
1820 if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
1821 trace!(target: LOG_TARGET, "Too many blocks in the queue.");
1822 return Vec::new();
1823 }
1824 let is_major_syncing = self.status().state.is_major_syncing();
1825 let attrs = self.required_block_attributes();
1826 let blocks = &mut self.blocks;
1827 let fork_targets = &mut self.fork_targets;
1828 let last_finalized =
1829 std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
1830 let best_queued = self.best_queued_number;
1831 let client = &self.client;
1832 let queue_blocks = &self.queue_blocks;
1833 let allowed_requests = self.allowed_requests.clone();
1834 let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
1835 let max_blocks_per_request = self.max_blocks_per_request;
1836 let gap_sync = &mut self.gap_sync;
1837 let disconnected_peers = &mut self.disconnected_peers;
1838 let metrics = self.metrics.as_ref();
1839 let requests = self
1840 .peers
1841 .iter_mut()
1842 .filter_map(move |(&id, peer)| {
1843 if !peer.state.is_available()
1844 || !allowed_requests.contains(&id)
1845 || !disconnected_peers.is_peer_available(&id)
1846 {
1847 return None;
1848 }
1849
1850 if best_queued.saturating_sub(peer.common_number)
1856 > MAX_BLOCKS_TO_LOOK_BACKWARDS.into()
1857 && best_queued < peer.best_number
1858 && peer.common_number < last_finalized
1859 && queue_blocks.len() <= MAJOR_SYNC_BLOCKS as usize
1860 {
1861 trace!(
1862 target: LOG_TARGET,
1863 "Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
1864 id,
1865 peer.common_number,
1866 best_queued,
1867 );
1868 let current = std::cmp::min(peer.best_number, best_queued);
1869 peer.state = PeerSyncState::AncestorSearch {
1870 current,
1871 start: best_queued,
1872 state: AncestorSearchState::ExponentialBackoff(One::one()),
1873 };
1874 Some((id, ancestry_request::<B>(current)))
1875 } else if let Some((range, req)) = peer_block_request(
1876 &id,
1877 peer,
1878 blocks,
1879 attrs,
1880 max_parallel,
1881 max_blocks_per_request,
1882 last_finalized,
1883 best_queued,
1884 ) {
1885 peer.state = PeerSyncState::DownloadingNew(range.start);
1886 trace!(
1887 target: LOG_TARGET,
1888 "New block request for {}, (best:{}, common:{}) {:?}",
1889 id,
1890 peer.best_number,
1891 peer.common_number,
1892 req,
1893 );
1894 Some((id, req))
1895 } else if let Some((hash, req)) = fork_sync_request(
1896 &id,
1897 fork_targets,
1898 best_queued,
1899 last_finalized,
1900 attrs,
1901 |hash| {
1902 if queue_blocks.contains(hash) {
1903 BlockStatus::Queued
1904 } else {
1905 client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
1906 }
1907 },
1908 max_blocks_per_request,
1909 metrics,
1910 ) {
1911 trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
1912 peer.state = PeerSyncState::DownloadingStale(hash);
1913 Some((id, req))
1914 } else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
1915 peer_gap_block_request(
1916 &id,
1917 peer,
1918 &mut sync.blocks,
1919 attrs,
1920 sync.target,
1921 sync.best_queued_number,
1922 max_blocks_per_request,
1923 )
1924 }) {
1925 peer.state = PeerSyncState::DownloadingGap(range.start);
1926 trace!(
1927 target: LOG_TARGET,
1928 "New gap block request for {}, (best:{}, common:{}) {:?}",
1929 id,
1930 peer.best_number,
1931 peer.common_number,
1932 req,
1933 );
1934 Some((id, req))
1935 } else {
1936 None
1937 }
1938 })
1939 .collect::<Vec<_>>();
1940
1941 if !requests.is_empty() {
1944 self.allowed_requests.take();
1945 }
1946
1947 requests
1948 }
1949
1950 fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
1952 if self.allowed_requests.is_empty() {
1953 return None;
1954 }
1955 if self.state_sync.is_some()
1956 && self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
1957 {
1958 return None;
1960 }
1961 if let Some(sync) = &self.state_sync {
1962 if sync.is_complete() {
1963 return None;
1964 }
1965
1966 for (id, peer) in self.peers.iter_mut() {
1967 if peer.state.is_available()
1968 && peer.common_number >= sync.target_number()
1969 && self.disconnected_peers.is_peer_available(&id)
1970 {
1971 peer.state = PeerSyncState::DownloadingState;
1972 let request = sync.next_request();
1973 trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
1974 self.allowed_requests.clear();
1975 return Some((*id, request));
1976 }
1977 }
1978 }
1979 None
1980 }
1981
1982 #[must_use]
1983 fn on_state_data(&mut self, peer_id: &PeerId, response: &[u8]) -> Result<(), BadPeer> {
1984 let response = match StateResponse::decode(response) {
1985 Ok(response) => response,
1986 Err(error) => {
1987 debug!(
1988 target: LOG_TARGET,
1989 "Failed to decode state response from peer {peer_id:?}: {error:?}.",
1990 );
1991
1992 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
1993 },
1994 };
1995
1996 if let Some(peer) = self.peers.get_mut(peer_id) {
1997 if let PeerSyncState::DownloadingState = peer.state {
1998 peer.state = PeerSyncState::Available;
1999 self.allowed_requests.set_all();
2000 }
2001 }
2002 let import_result = if let Some(sync) = &mut self.state_sync {
2003 debug!(
2004 target: LOG_TARGET,
2005 "Importing state data from {} with {} keys, {} proof nodes.",
2006 peer_id,
2007 response.entries.len(),
2008 response.proof.len(),
2009 );
2010 sync.import(response)
2011 } else {
2012 debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
2013 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2014 };
2015
2016 match import_result {
2017 ImportResult::Import(hash, header, state, body, justifications) => {
2018 let origin = BlockOrigin::NetworkInitialSync;
2019 let block = IncomingBlock {
2020 hash,
2021 header: Some(header),
2022 body,
2023 indexed_body: None,
2024 justifications,
2025 origin: None,
2026 allow_missing_state: true,
2027 import_existing: true,
2028 skip_execution: self.skip_execution(),
2029 state: Some(state),
2030 };
2031 debug!(target: LOG_TARGET, "State download is complete. Import is queued");
2032 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
2033 Ok(())
2034 },
2035 ImportResult::Continue => Ok(()),
2036 ImportResult::BadResponse => {
2037 debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
2038 Err(BadPeer(*peer_id, rep::BAD_BLOCK))
2039 },
2040 }
2041 }
2042
2043 fn attempt_state_sync(
2044 &mut self,
2045 finalized_hash: B::Hash,
2046 finalized_number: NumberFor<B>,
2047 skip_proofs: bool,
2048 ) {
2049 let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect();
2050 heads.sort();
2051 let median = heads[heads.len() / 2];
2052 if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
2053 if let Ok(Some(header)) = self.client.header(finalized_hash) {
2054 log::debug!(
2055 target: LOG_TARGET,
2056 "Starting state sync for #{finalized_number} ({finalized_hash})",
2057 );
2058 self.state_sync =
2059 Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs));
2060 self.allowed_requests.set_all();
2061 } else {
2062 log::error!(
2063 target: LOG_TARGET,
2064 "Failed to start state sync: header for finalized block \
2065 #{finalized_number} ({finalized_hash}) is not available",
2066 );
2067 debug_assert!(false);
2068 }
2069 }
2070 }
2071
2072 #[cfg(test)]
2074 #[must_use]
2075 fn take_actions(&mut self) -> impl Iterator<Item = SyncingAction<B>> {
2076 std::mem::take(&mut self.actions).into_iter()
2077 }
2078}
2079
2080fn legacy_justification_mapping(
2085 justification: Option<EncodedJustification>,
2086) -> Option<Justifications> {
2087 justification.map(|just| (*b"FRNK", just).into())
2088}
2089
2090fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> {
2093 BlockRequest::<B> {
2094 id: 0,
2095 fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
2096 from: FromBlock::Number(block),
2097 direction: Direction::Ascending,
2098 max: Some(1),
2099 }
2100}
2101
2102#[derive(Copy, Clone, Eq, PartialEq, Debug)]
2105pub(crate) enum AncestorSearchState<B: BlockT> {
2106 ExponentialBackoff(NumberFor<B>),
2109 BinarySearch(NumberFor<B>, NumberFor<B>),
2112}
2113
2114fn handle_ancestor_search_state<B: BlockT>(
2122 state: &AncestorSearchState<B>,
2123 curr_block_num: NumberFor<B>,
2124 block_hash_match: bool,
2125) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
2126 let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
2127 match state {
2128 AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
2129 let next_distance_to_tip = *next_distance_to_tip;
2130 if block_hash_match && next_distance_to_tip == One::one() {
2131 return None;
2134 }
2135 if block_hash_match {
2136 let left = curr_block_num;
2137 let right = left + next_distance_to_tip / two;
2138 let middle = left + (right - left) / two;
2139 Some((AncestorSearchState::BinarySearch(left, right), middle))
2140 } else {
2141 let next_block_num =
2142 curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or_else(Zero::zero);
2143 let next_distance_to_tip = next_distance_to_tip * two;
2144 Some((
2145 AncestorSearchState::ExponentialBackoff(next_distance_to_tip),
2146 next_block_num,
2147 ))
2148 }
2149 },
2150 AncestorSearchState::BinarySearch(mut left, mut right) => {
2151 if left >= curr_block_num {
2152 return None;
2153 }
2154 if block_hash_match {
2155 left = curr_block_num;
2156 } else {
2157 right = curr_block_num;
2158 }
2159 assert!(right >= left);
2160 let middle = left + (right - left) / two;
2161 if middle == curr_block_num {
2162 None
2163 } else {
2164 Some((AncestorSearchState::BinarySearch(left, right), middle))
2165 }
2166 },
2167 }
2168}
2169
2170fn peer_block_request<B: BlockT>(
2172 id: &PeerId,
2173 peer: &PeerSync<B>,
2174 blocks: &mut BlockCollection<B>,
2175 attrs: BlockAttributes,
2176 max_parallel_downloads: u32,
2177 max_blocks_per_request: u32,
2178 finalized: NumberFor<B>,
2179 best_num: NumberFor<B>,
2180) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2181 if best_num >= peer.best_number {
2182 return None;
2184 } else if peer.common_number < finalized {
2185 trace!(
2186 target: LOG_TARGET,
2187 "Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
2188 id, peer.common_number, finalized, peer.best_number, best_num,
2189 );
2190 }
2191 let range = blocks.needed_blocks(
2192 *id,
2193 max_blocks_per_request,
2194 peer.best_number,
2195 peer.common_number,
2196 max_parallel_downloads,
2197 MAX_DOWNLOAD_AHEAD,
2198 )?;
2199
2200 let last = range.end.saturating_sub(One::one());
2202
2203 let from = if peer.best_number == last {
2204 FromBlock::Hash(peer.best_hash)
2205 } else {
2206 FromBlock::Number(last)
2207 };
2208
2209 let request = BlockRequest::<B> {
2210 id: 0,
2211 fields: attrs,
2212 from,
2213 direction: Direction::Descending,
2214 max: Some((range.end - range.start).saturated_into::<u32>()),
2215 };
2216
2217 Some((range, request))
2218}
2219
2220fn peer_gap_block_request<B: BlockT>(
2222 id: &PeerId,
2223 peer: &PeerSync<B>,
2224 blocks: &mut BlockCollection<B>,
2225 attrs: BlockAttributes,
2226 target: NumberFor<B>,
2227 common_number: NumberFor<B>,
2228 max_blocks_per_request: u32,
2229) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2230 let range = blocks.needed_blocks(
2231 *id,
2232 max_blocks_per_request,
2233 std::cmp::min(peer.best_number, target),
2234 common_number,
2235 1,
2236 MAX_DOWNLOAD_AHEAD,
2237 )?;
2238
2239 let last = range.end.saturating_sub(One::one());
2241 let from = FromBlock::Number(last);
2242
2243 let request = BlockRequest::<B> {
2244 id: 0,
2245 fields: attrs,
2246 from,
2247 direction: Direction::Descending,
2248 max: Some((range.end - range.start).saturated_into::<u32>()),
2249 };
2250 Some((range, request))
2251}
2252
2253fn fork_sync_request<B: BlockT>(
2255 id: &PeerId,
2256 fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
2257 best_num: NumberFor<B>,
2258 finalized: NumberFor<B>,
2259 attributes: BlockAttributes,
2260 check_block: impl Fn(&B::Hash) -> BlockStatus,
2261 max_blocks_per_request: u32,
2262 metrics: Option<&Metrics>,
2263) -> Option<(B::Hash, BlockRequest<B>)> {
2264 fork_targets.retain(|hash, r| {
2265 if r.number <= finalized {
2266 trace!(
2267 target: LOG_TARGET,
2268 "Removed expired fork sync request {:?} (#{})",
2269 hash,
2270 r.number,
2271 );
2272 return false;
2273 }
2274 if check_block(hash) != BlockStatus::Unknown {
2275 trace!(
2276 target: LOG_TARGET,
2277 "Removed obsolete fork sync request {:?} (#{})",
2278 hash,
2279 r.number,
2280 );
2281 return false;
2282 }
2283 true
2284 });
2285 if let Some(metrics) = metrics {
2286 metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
2287 }
2288 for (hash, r) in fork_targets {
2289 if !r.peers.contains(&id) {
2290 continue;
2291 }
2292 if r.number <= best_num
2295 || (r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
2296 {
2297 let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
2298 let count = if parent_status == BlockStatus::Unknown {
2299 (r.number - finalized).saturated_into::<u32>() } else {
2301 1
2303 };
2304 trace!(
2305 target: LOG_TARGET,
2306 "Downloading requested fork {hash:?} from {id}, {count} blocks",
2307 );
2308 return Some((
2309 *hash,
2310 BlockRequest::<B> {
2311 id: 0,
2312 fields: attributes,
2313 from: FromBlock::Hash(*hash),
2314 direction: Direction::Descending,
2315 max: Some(count),
2316 },
2317 ));
2318 } else {
2319 trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
2320 }
2321 }
2322 None
2323}
2324
2325fn is_descendent_of<Block, T>(
2327 client: &T,
2328 base: &Block::Hash,
2329 block: &Block::Hash,
2330) -> pezsp_blockchain::Result<bool>
2331where
2332 Block: BlockT,
2333 T: HeaderMetadata<Block, Error = pezsp_blockchain::Error> + ?Sized,
2334{
2335 if base == block {
2336 return Ok(false);
2337 }
2338
2339 let ancestor = pezsp_blockchain::lowest_common_ancestor(client, *block, *base)?;
2340
2341 Ok(ancestor.hash == *base)
2342}
2343
2344pub fn validate_blocks<Block: BlockT>(
2349 blocks: &Vec<BlockData<Block>>,
2350 peer_id: &PeerId,
2351 request: Option<BlockRequest<Block>>,
2352) -> Result<Option<NumberFor<Block>>, BadPeer> {
2353 if let Some(request) = request {
2354 if Some(blocks.len() as _) > request.max {
2355 debug!(
2356 target: LOG_TARGET,
2357 "Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
2358 peer_id,
2359 request.max,
2360 blocks.len(),
2361 );
2362
2363 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2364 }
2365
2366 let block_header =
2367 if request.direction == Direction::Descending { blocks.last() } else { blocks.first() }
2368 .and_then(|b| b.header.as_ref());
2369
2370 let expected_block = block_header.as_ref().map_or(false, |h| match request.from {
2371 FromBlock::Hash(hash) => h.hash() == hash,
2372 FromBlock::Number(n) => h.number() == &n,
2373 });
2374
2375 if !expected_block {
2376 debug!(
2377 target: LOG_TARGET,
2378 "Received block that was not requested. Requested {:?}, got {:?}.",
2379 request.from,
2380 block_header,
2381 );
2382
2383 return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2384 }
2385
2386 if request.fields.contains(BlockAttributes::HEADER)
2387 && blocks.iter().any(|b| b.header.is_none())
2388 {
2389 trace!(
2390 target: LOG_TARGET,
2391 "Missing requested header for a block in response from {peer_id}.",
2392 );
2393
2394 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2395 }
2396
2397 if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
2398 {
2399 trace!(
2400 target: LOG_TARGET,
2401 "Missing requested body for a block in response from {peer_id}.",
2402 );
2403
2404 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2405 }
2406 }
2407
2408 for b in blocks {
2409 if let Some(header) = &b.header {
2410 let hash = header.hash();
2411 if hash != b.hash {
2412 debug!(
2413 target: LOG_TARGET,
2414 "Bad header received from {}. Expected hash {:?}, got {:?}",
2415 peer_id,
2416 b.hash,
2417 hash,
2418 );
2419 return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2420 }
2421 }
2422 }
2423
2424 Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
2425}