1use crate::{
23 block_announce_validator::{
24 BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
25 },
26 pending_responses::{PendingResponses, ResponseEvent},
27 service::{
28 self,
29 syncing_service::{SyncingService, ToServiceCommand},
30 },
31 strategy::{SyncingAction, SyncingStrategy},
32 types::{BadPeer, ExtendedPeerInfo, SyncEvent},
33 LOG_TARGET,
34};
35
36use codec::{Decode, DecodeAll, Encode};
37use futures::{channel::oneshot, StreamExt};
38use log::{debug, error, trace, warn};
39use prometheus_endpoint::{
40 register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
41};
42use schnellru::{ByLength, LruMap};
43use tokio::time::{Interval, MissedTickBehavior};
44
45use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
46use sc_consensus::{import_queue::ImportQueueService, IncomingBlock};
47use sc_network::{
48 config::{FullNetworkConfiguration, NotificationHandshake, ProtocolId, SetConfig},
49 peer_store::PeerStoreProvider,
50 request_responses::{OutboundFailure, RequestFailure},
51 service::{
52 traits::{Direction, NotificationConfig, NotificationEvent, ValidationResult},
53 NotificationMetrics,
54 },
55 types::ProtocolName,
56 utils::LruHashSet,
57 NetworkBackend, NotificationService, ReputationChange,
58};
59use sc_network_common::{
60 role::Roles,
61 sync::message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
62};
63use sc_network_types::PeerId;
64use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
65use sp_blockchain::{Error as ClientError, HeaderMetadata};
66use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
67use sp_runtime::{
68 traits::{Block as BlockT, Header, NumberFor, Zero},
69 Justifications,
70};
71
72use std::{
73 collections::{HashMap, HashSet},
74 iter,
75 num::NonZeroUsize,
76 sync::{
77 atomic::{AtomicBool, AtomicUsize, Ordering},
78 Arc,
79 },
80};
81
82const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);
84
85const MAX_KNOWN_BLOCKS: usize = 1024; const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
90
91mod rep {
92 use sc_network::ReputationChange as Rep;
93 pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
95 pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
97 pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
99 pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
101 pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
103}
104
105struct Metrics {
106 peers: Gauge<U64>,
107 import_queue_blocks_submitted: Counter<U64>,
108 import_queue_justifications_submitted: Counter<U64>,
109}
110
111impl Metrics {
112 fn register(r: &Registry, major_syncing: Arc<AtomicBool>) -> Result<Self, PrometheusError> {
113 let _ = MajorSyncingGauge::register(r, major_syncing)?;
114 Ok(Self {
115 peers: {
116 let g = Gauge::new("substrate_sync_peers", "Number of peers we sync with")?;
117 register(g, r)?
118 },
119 import_queue_blocks_submitted: {
120 let c = Counter::new(
121 "substrate_sync_import_queue_blocks_submitted",
122 "Number of blocks submitted to the import queue.",
123 )?;
124 register(c, r)?
125 },
126 import_queue_justifications_submitted: {
127 let c = Counter::new(
128 "substrate_sync_import_queue_justifications_submitted",
129 "Number of justifications submitted to the import queue.",
130 )?;
131 register(c, r)?
132 },
133 })
134 }
135}
136
137#[derive(Clone)]
139pub struct MajorSyncingGauge(Arc<AtomicBool>);
140
141impl MajorSyncingGauge {
142 fn register(registry: &Registry, value: Arc<AtomicBool>) -> Result<(), PrometheusError> {
145 prometheus_endpoint::register(
146 SourcedGauge::new(
147 &Opts::new(
148 "substrate_sub_libp2p_is_major_syncing",
149 "Whether the node is performing a major sync or not.",
150 ),
151 MajorSyncingGauge(value),
152 )?,
153 registry,
154 )?;
155
156 Ok(())
157 }
158}
159
160impl MetricSource for MajorSyncingGauge {
161 type N = u64;
162
163 fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
164 set(&[], self.0.load(Ordering::Relaxed) as u64);
165 }
166}
167
168#[derive(Debug)]
170pub struct Peer<B: BlockT> {
171 pub info: ExtendedPeerInfo<B>,
172 pub known_blocks: LruHashSet<B::Hash>,
174 inbound: bool,
176}
177
178pub struct SyncingEngine<B: BlockT, Client> {
179 strategy: Box<dyn SyncingStrategy<B>>,
181
182 client: Arc<Client>,
184
185 num_connected: Arc<AtomicUsize>,
187
188 is_major_syncing: Arc<AtomicBool>,
190
191 network_service: service::network::NetworkServiceHandle,
193
194 service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,
196
197 roles: Roles,
199
200 genesis_hash: B::Hash,
202
203 event_streams: Vec<TracingUnboundedSender<SyncEvent>>,
205
206 tick_timeout: Interval,
208
209 peers: HashMap<PeerId, Peer<B>>,
211
212 important_peers: HashSet<PeerId>,
215
216 default_peers_set_no_slot_connected_peers: HashSet<PeerId>,
218
219 default_peers_set_no_slot_peers: HashSet<PeerId>,
221
222 default_peers_set_num_full: usize,
225
226 default_peers_set_num_light: usize,
228
229 max_in_peers: usize,
231
232 num_in_peers: usize,
234
235 block_announce_validator: BlockAnnounceValidatorStream<B>,
237
238 block_announce_data_cache: LruMap<B::Hash, Vec<u8>>,
240
241 boot_node_ids: HashSet<PeerId>,
243
244 block_announce_protocol_name: ProtocolName,
246
247 metrics: Option<Metrics>,
249
250 notification_service: Box<dyn NotificationService>,
252
253 peer_store_handle: Arc<dyn PeerStoreProvider>,
255
256 pending_responses: PendingResponses,
258
259 import_queue: Box<dyn ImportQueueService<B>>,
261}
262
263impl<B: BlockT, Client> SyncingEngine<B, Client>
264where
265 B: BlockT,
266 Client: HeaderBackend<B>
267 + BlockBackend<B>
268 + HeaderMetadata<B, Error = sp_blockchain::Error>
269 + ProofProvider<B>
270 + Send
271 + Sync
272 + 'static,
273{
274 pub fn new<N>(
275 roles: Roles,
276 client: Arc<Client>,
277 metrics_registry: Option<&Registry>,
278 network_metrics: NotificationMetrics,
279 net_config: &FullNetworkConfiguration<B, <B as BlockT>::Hash, N>,
280 protocol_id: ProtocolId,
281 fork_id: Option<&str>,
282 block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
283 syncing_strategy: Box<dyn SyncingStrategy<B>>,
284 network_service: service::network::NetworkServiceHandle,
285 import_queue: Box<dyn ImportQueueService<B>>,
286 peer_store_handle: Arc<dyn PeerStoreProvider>,
287 ) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
288 where
289 N: NetworkBackend<B, <B as BlockT>::Hash>,
290 {
291 let cache_capacity = (net_config.network_config.default_peers_set.in_peers +
292 net_config.network_config.default_peers_set.out_peers)
293 .max(1);
294 let important_peers = {
295 let mut imp_p = HashSet::new();
296 for reserved in &net_config.network_config.default_peers_set.reserved_nodes {
297 imp_p.insert(reserved.peer_id);
298 }
299 for config in net_config.notification_protocols() {
300 let peer_ids = config.set_config().reserved_nodes.iter().map(|info| info.peer_id);
301 imp_p.extend(peer_ids);
302 }
303
304 imp_p.shrink_to_fit();
305 imp_p
306 };
307 let boot_node_ids = {
308 let mut list = HashSet::new();
309 for node in &net_config.network_config.boot_nodes {
310 list.insert(node.peer_id);
311 }
312 list.shrink_to_fit();
313 list
314 };
315 let default_peers_set_no_slot_peers = {
316 let mut no_slot_p: HashSet<PeerId> = net_config
317 .network_config
318 .default_peers_set
319 .reserved_nodes
320 .iter()
321 .map(|reserved| reserved.peer_id)
322 .collect();
323 no_slot_p.shrink_to_fit();
324 no_slot_p
325 };
326 let default_peers_set_num_full =
327 net_config.network_config.default_peers_set_num_full as usize;
328 let default_peers_set_num_light = {
329 let total = net_config.network_config.default_peers_set.out_peers +
330 net_config.network_config.default_peers_set.in_peers;
331 total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
332 };
333
334 let info = client.info();
335
336 let (block_announce_config, notification_service) =
337 Self::get_block_announce_proto_config::<N>(
338 protocol_id,
339 fork_id,
340 roles,
341 info.best_number,
342 info.best_hash,
343 info.genesis_hash,
344 &net_config.network_config.default_peers_set,
345 network_metrics,
346 Arc::clone(&peer_store_handle),
347 );
348
349 let block_announce_protocol_name = block_announce_config.protocol_name().clone();
350 let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
351 let num_connected = Arc::new(AtomicUsize::new(0));
352 let is_major_syncing = Arc::new(AtomicBool::new(false));
353
354 let max_full_peers = net_config.network_config.default_peers_set_num_full;
357 let max_out_peers = net_config.network_config.default_peers_set.out_peers;
358 let max_in_peers = (max_full_peers - max_out_peers) as usize;
359
360 let tick_timeout = {
361 let mut interval = tokio::time::interval(TICK_TIMEOUT);
362 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
363 interval
364 };
365
366 Ok((
367 Self {
368 roles,
369 client,
370 strategy: syncing_strategy,
371 network_service,
372 peers: HashMap::new(),
373 block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
374 block_announce_protocol_name,
375 block_announce_validator: BlockAnnounceValidatorStream::new(
376 block_announce_validator,
377 ),
378 num_connected: num_connected.clone(),
379 is_major_syncing: is_major_syncing.clone(),
380 service_rx,
381 genesis_hash: info.genesis_hash,
382 important_peers,
383 default_peers_set_no_slot_connected_peers: HashSet::new(),
384 boot_node_ids,
385 default_peers_set_no_slot_peers,
386 default_peers_set_num_full,
387 default_peers_set_num_light,
388 num_in_peers: 0usize,
389 max_in_peers,
390 event_streams: Vec::new(),
391 notification_service,
392 tick_timeout,
393 peer_store_handle,
394 metrics: if let Some(r) = metrics_registry {
395 match Metrics::register(r, is_major_syncing.clone()) {
396 Ok(metrics) => Some(metrics),
397 Err(err) => {
398 log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}");
399 None
400 },
401 }
402 } else {
403 None
404 },
405 pending_responses: PendingResponses::new(),
406 import_queue,
407 },
408 SyncingService::new(tx, num_connected, is_major_syncing),
409 block_announce_config,
410 ))
411 }
412
413 fn update_peer_info(
414 &mut self,
415 peer_id: &PeerId,
416 best_hash: B::Hash,
417 best_number: NumberFor<B>,
418 ) {
419 if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
420 peer.info.best_hash = best_hash;
421 peer.info.best_number = best_number;
422 }
423 }
424
425 fn process_block_announce_validation_result(
427 &mut self,
428 validation_result: BlockAnnounceValidationResult<B::Header>,
429 ) {
430 match validation_result {
431 BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
432 BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
433 if let Some((best_hash, best_number)) =
434 self.strategy.on_validated_block_announce(is_new_best, peer_id, &announce)
435 {
436 self.update_peer_info(&peer_id, best_hash, best_number);
437 }
438
439 if let Some(data) = announce.data {
440 if !data.is_empty() {
441 self.block_announce_data_cache.insert(announce.header.hash(), data);
442 }
443 }
444 },
445 BlockAnnounceValidationResult::Failure { peer_id, disconnect } => {
446 if disconnect {
447 log::debug!(
448 target: LOG_TARGET,
449 "Disconnecting peer {peer_id} due to block announce validation failure",
450 );
451 self.network_service
452 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
453 }
454
455 self.network_service.report_peer(peer_id, rep::BAD_BLOCK_ANNOUNCEMENT);
456 },
457 }
458 }
459
460 pub fn push_block_announce_validation(
462 &mut self,
463 peer_id: PeerId,
464 announce: BlockAnnounce<B::Header>,
465 ) {
466 let hash = announce.header.hash();
467
468 let peer = match self.peers.get_mut(&peer_id) {
469 Some(p) => p,
470 None => {
471 log::error!(
472 target: LOG_TARGET,
473 "Received block announce from disconnected peer {peer_id}",
474 );
475 debug_assert!(false);
476 return;
477 },
478 };
479 peer.known_blocks.insert(hash);
480
481 if peer.info.roles.is_full() {
482 let is_best = match announce.state.unwrap_or(BlockState::Best) {
483 BlockState::Best => true,
484 BlockState::Normal => false,
485 };
486
487 self.block_announce_validator
488 .push_block_announce_validation(peer_id, hash, announce, is_best);
489 }
490 }
491
492 pub fn announce_block(&mut self, hash: B::Hash, data: Option<Vec<u8>>) {
497 let header = match self.client.header(hash) {
498 Ok(Some(header)) => header,
499 Ok(None) => {
500 log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}");
501 return;
502 },
503 Err(e) => {
504 log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}");
505 return;
506 },
507 };
508
509 if header.number().is_zero() {
511 return;
512 }
513
514 let is_best = self.client.info().best_hash == hash;
515 log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");
516
517 let data = data
518 .or_else(|| self.block_announce_data_cache.get(&hash).cloned())
519 .unwrap_or_default();
520
521 for (peer_id, ref mut peer) in self.peers.iter_mut() {
522 let inserted = peer.known_blocks.insert(hash);
523 if inserted {
524 log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
525 let message = BlockAnnounce {
526 header: header.clone(),
527 state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
528 data: Some(data.clone()),
529 };
530
531 let _ = self.notification_service.send_sync_notification(peer_id, message.encode());
532 }
533 }
534 }
535
536 pub async fn run(mut self) {
537 loop {
538 tokio::select! {
539 _ = self.tick_timeout.tick() => {
540 },
544 command = self.service_rx.select_next_some() =>
545 self.process_service_command(command),
546 notification_event = self.notification_service.next_event() => match notification_event {
547 Some(event) => self.process_notification_event(event),
548 None => return,
549 },
550 response_event = self.pending_responses.select_next_some() =>
551 self.process_response_event(response_event),
552 validation_result = self.block_announce_validator.select_next_some() =>
553 self.process_block_announce_validation_result(validation_result),
554 }
555
556 self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed);
558
559 if let Err(e) = self.process_strategy_actions() {
561 error!(
562 target: LOG_TARGET,
563 "Terminating `SyncingEngine` due to fatal error: {e:?}.",
564 );
565 return;
566 }
567 }
568 }
569
570 fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
571 for action in self.strategy.actions(&self.network_service)? {
572 match action {
573 SyncingAction::StartRequest { peer_id, key, request, remove_obsolete } => {
574 if !self.peers.contains_key(&peer_id) {
575 trace!(
576 target: LOG_TARGET,
577 "Cannot start request with strategy key {key:?} to unknown peer \
578 {peer_id}",
579 );
580 debug_assert!(false);
581 continue;
582 }
583 if remove_obsolete {
584 if self.pending_responses.remove(peer_id, key) {
585 warn!(
586 target: LOG_TARGET,
587 "Processed `SyncingAction::StartRequest` to {peer_id} with \
588 strategy key {key:?}. Stale response removed!",
589 )
590 } else {
591 trace!(
592 target: LOG_TARGET,
593 "Processed `SyncingAction::StartRequest` to {peer_id} with \
594 strategy key {key:?}.",
595 )
596 }
597 }
598
599 self.pending_responses.insert(peer_id, key, request);
600 },
601 SyncingAction::CancelRequest { peer_id, key } => {
602 let removed = self.pending_responses.remove(peer_id, key);
603
604 trace!(
605 target: LOG_TARGET,
606 "Processed `SyncingAction::CancelRequest`, response removed: {removed}.",
607 );
608 },
609 SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
610 self.pending_responses.remove_all(&peer_id);
611 self.network_service
612 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
613 self.network_service.report_peer(peer_id, rep);
614
615 trace!(target: LOG_TARGET, "{peer_id:?} dropped: {rep:?}.");
616 },
617 SyncingAction::ImportBlocks { origin, blocks } => {
618 let count = blocks.len();
619 self.import_blocks(origin, blocks);
620
621 trace!(
622 target: LOG_TARGET,
623 "Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
624 );
625 },
626 SyncingAction::ImportJustifications { peer_id, hash, number, justifications } => {
627 self.import_justifications(peer_id, hash, number, justifications);
628
629 trace!(
630 target: LOG_TARGET,
631 "Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
632 peer_id,
633 hash,
634 number,
635 )
636 },
637 SyncingAction::Finished => {},
639 }
640 }
641
642 Ok(())
643 }
644
645 fn process_service_command(&mut self, command: ToServiceCommand<B>) {
646 match command {
647 ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
648 self.strategy.set_sync_fork_request(peers, &hash, number);
649 },
650 ToServiceCommand::EventStream(tx) => {
651 for peer_id in self.peers.keys() {
653 let _ = tx.unbounded_send(SyncEvent::PeerConnected(*peer_id));
654 }
655 self.event_streams.push(tx);
656 },
657 ToServiceCommand::RequestJustification(hash, number) =>
658 self.strategy.request_justification(&hash, number),
659 ToServiceCommand::ClearJustificationRequests =>
660 self.strategy.clear_justification_requests(),
661 ToServiceCommand::BlocksProcessed(imported, count, results) => {
662 self.strategy.on_blocks_processed(imported, count, results);
663 },
664 ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
665 self.strategy.on_justification_import(hash, number, success);
666 if !success {
667 log::info!(
668 target: LOG_TARGET,
669 "💔 Invalid justification provided by {peer_id} for #{hash}",
670 );
671 self.network_service
672 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
673 self.network_service
674 .report_peer(peer_id, ReputationChange::new_fatal("Invalid justification"));
675 }
676 },
677 ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
678 ToServiceCommand::NewBestBlockImported(hash, number) => {
679 log::debug!(target: LOG_TARGET, "New best block imported {:?}/#{}", hash, number);
680
681 self.strategy.update_chain_info(&hash, number);
682 let _ = self.notification_service.try_set_handshake(
683 BlockAnnouncesHandshake::<B>::build(
684 self.roles,
685 number,
686 hash,
687 self.genesis_hash,
688 )
689 .encode(),
690 );
691 },
692 ToServiceCommand::Status(tx) => {
693 let _ = tx.send(self.strategy.status());
694 },
695 ToServiceCommand::NumActivePeers(tx) => {
696 let _ = tx.send(self.num_active_peers());
697 },
698 ToServiceCommand::NumDownloadedBlocks(tx) => {
699 let _ = tx.send(self.strategy.num_downloaded_blocks());
700 },
701 ToServiceCommand::NumSyncRequests(tx) => {
702 let _ = tx.send(self.strategy.num_sync_requests());
703 },
704 ToServiceCommand::PeersInfo(tx) => {
705 let peers_info =
706 self.peers.iter().map(|(peer_id, peer)| (*peer_id, peer.info)).collect();
707 let _ = tx.send(peers_info);
708 },
709 ToServiceCommand::OnBlockFinalized(hash, header) =>
710 self.strategy.on_block_finalized(&hash, *header.number()),
711 }
712 }
713
714 fn process_notification_event(&mut self, event: NotificationEvent) {
715 match event {
716 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
717 let validation_result = self
718 .validate_connection(&peer, handshake, Direction::Inbound)
719 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
720
721 let _ = result_tx.send(validation_result);
722 },
723 NotificationEvent::NotificationStreamOpened { peer, handshake, direction, .. } => {
724 log::debug!(
725 target: LOG_TARGET,
726 "Substream opened for {peer}, handshake {handshake:?}"
727 );
728
729 match self.validate_connection(&peer, handshake, direction) {
730 Ok(handshake) => {
731 if self.on_sync_peer_connected(peer, &handshake, direction).is_err() {
732 log::debug!(target: LOG_TARGET, "Failed to register peer {peer}");
733 self.network_service
734 .disconnect_peer(peer, self.block_announce_protocol_name.clone());
735 }
736 },
737 Err(wrong_genesis) => {
738 log::debug!(target: LOG_TARGET, "`SyncingEngine` rejected {peer}");
739
740 if wrong_genesis {
741 self.peer_store_handle.report_peer(peer, rep::GENESIS_MISMATCH);
742 }
743
744 self.network_service
745 .disconnect_peer(peer, self.block_announce_protocol_name.clone());
746 },
747 }
748 },
749 NotificationEvent::NotificationStreamClosed { peer } => {
750 self.on_sync_peer_disconnected(peer);
751 },
752 NotificationEvent::NotificationReceived { peer, notification } => {
753 if !self.peers.contains_key(&peer) {
754 log::error!(
755 target: LOG_TARGET,
756 "received notification from {peer} who had been earlier refused by `SyncingEngine`",
757 );
758 return;
759 }
760
761 let Ok(announce) = BlockAnnounce::decode(&mut notification.as_ref()) else {
762 log::warn!(target: LOG_TARGET, "failed to decode block announce");
763 return;
764 };
765
766 self.push_block_announce_validation(peer, announce);
767 },
768 }
769 }
770
771 fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) {
775 let Some(info) = self.peers.remove(&peer_id) else {
776 log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
777 return;
778 };
779 if let Some(metrics) = &self.metrics {
780 metrics.peers.dec();
781 }
782 self.num_connected.fetch_sub(1, Ordering::AcqRel);
783
784 if self.important_peers.contains(&peer_id) {
785 log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
786 } else {
787 log::debug!(target: LOG_TARGET, "{peer_id} disconnected");
788 }
789
790 if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id) &&
791 info.inbound &&
792 info.info.roles.is_full()
793 {
794 match self.num_in_peers.checked_sub(1) {
795 Some(value) => {
796 self.num_in_peers = value;
797 },
798 None => {
799 log::error!(
800 target: LOG_TARGET,
801 "trying to disconnect an inbound node which is not counted as inbound"
802 );
803 debug_assert!(false);
804 },
805 }
806 }
807
808 self.strategy.remove_peer(&peer_id);
809 self.pending_responses.remove_all(&peer_id);
810 self.event_streams
811 .retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
812 }
813
814 fn validate_handshake(
816 &mut self,
817 peer_id: &PeerId,
818 handshake: Vec<u8>,
819 ) -> Result<BlockAnnouncesHandshake<B>, bool> {
820 log::trace!(target: LOG_TARGET, "Validate handshake for {peer_id}");
821
822 let handshake = <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &handshake[..])
823 .map_err(|error| {
824 log::debug!(target: LOG_TARGET, "Failed to decode handshake for {peer_id}: {error:?}");
825 false
826 })?;
827
828 if handshake.genesis_hash != self.genesis_hash {
829 if self.important_peers.contains(&peer_id) {
830 log::error!(
831 target: LOG_TARGET,
832 "Reserved peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
833 self.genesis_hash,
834 handshake.genesis_hash,
835 );
836 } else if self.boot_node_ids.contains(&peer_id) {
837 log::error!(
838 target: LOG_TARGET,
839 "Bootnode with peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
840 self.genesis_hash,
841 handshake.genesis_hash,
842 );
843 } else {
844 log::debug!(
845 target: LOG_TARGET,
846 "Peer is on different chain (our genesis: {} theirs: {})",
847 self.genesis_hash,
848 handshake.genesis_hash
849 );
850 }
851
852 return Err(true);
853 }
854
855 Ok(handshake)
856 }
857
858 fn validate_connection(
872 &mut self,
873 peer_id: &PeerId,
874 handshake: Vec<u8>,
875 direction: Direction,
876 ) -> Result<BlockAnnouncesHandshake<B>, bool> {
877 log::trace!(target: LOG_TARGET, "New peer {peer_id} {handshake:?}");
878
879 let handshake = self.validate_handshake(peer_id, handshake)?;
880
881 if self.peers.contains_key(&peer_id) {
882 log::error!(
883 target: LOG_TARGET,
884 "Called `validate_connection()` with already connected peer {peer_id}",
885 );
886 debug_assert!(false);
887 return Err(false);
888 }
889
890 let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&peer_id);
891 let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };
892
893 if handshake.roles.is_full() &&
894 self.strategy.num_peers() >=
895 self.default_peers_set_num_full +
896 self.default_peers_set_no_slot_connected_peers.len() +
897 this_peer_reserved_slot
898 {
899 log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {peer_id}");
900 return Err(false);
901 }
902
903 if !no_slot_peer &&
905 handshake.roles.is_full() &&
906 direction.is_inbound() &&
907 self.num_in_peers == self.max_in_peers
908 {
909 log::debug!(target: LOG_TARGET, "All inbound slots have been consumed, rejecting {peer_id}");
910 return Err(false);
911 }
912
913 if handshake.roles.is_light() &&
918 (self.peers.len() - self.strategy.num_peers()) >= self.default_peers_set_num_light
919 {
920 log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
921 return Err(false);
922 }
923
924 Ok(handshake)
925 }
926
927 fn on_sync_peer_connected(
933 &mut self,
934 peer_id: PeerId,
935 status: &BlockAnnouncesHandshake<B>,
936 direction: Direction,
937 ) -> Result<(), ()> {
938 log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}");
939
940 let peer = Peer {
941 info: ExtendedPeerInfo {
942 roles: status.roles,
943 best_hash: status.best_hash,
944 best_number: status.best_number,
945 },
946 known_blocks: LruHashSet::new(
947 NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
948 ),
949 inbound: direction.is_inbound(),
950 };
951
952 if status.roles.is_full() {
954 self.strategy.add_peer(peer_id, peer.info.best_hash, peer.info.best_number);
955 }
956
957 log::debug!(target: LOG_TARGET, "Connected {peer_id}");
958
959 if self.peers.insert(peer_id, peer).is_none() {
960 if let Some(metrics) = &self.metrics {
961 metrics.peers.inc();
962 }
963 self.num_connected.fetch_add(1, Ordering::AcqRel);
964 }
965 self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());
966
967 if self.default_peers_set_no_slot_peers.contains(&peer_id) {
968 self.default_peers_set_no_slot_connected_peers.insert(peer_id);
969 } else if direction.is_inbound() && status.roles.is_full() {
970 self.num_in_peers += 1;
971 }
972
973 self.event_streams
974 .retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok());
975
976 Ok(())
977 }
978
979 fn process_response_event(&mut self, response_event: ResponseEvent) {
980 let ResponseEvent { peer_id, key, response: response_result } = response_event;
981
982 match response_result {
983 Ok(Ok((response, protocol_name))) => {
984 self.strategy.on_generic_response(&peer_id, key, protocol_name, response);
985 },
986 Ok(Err(e)) => {
987 debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}.");
988
989 match e {
990 RequestFailure::Network(OutboundFailure::Timeout) => {
991 self.network_service.report_peer(peer_id, rep::TIMEOUT);
992 self.network_service
993 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
994 },
995 RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
996 self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL);
997 self.network_service
998 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
999 },
1000 RequestFailure::Network(OutboundFailure::DialFailure) => {
1001 self.network_service
1002 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1003 },
1004 RequestFailure::Refused => {
1005 self.network_service.report_peer(peer_id, rep::REFUSED);
1006 self.network_service
1007 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1008 },
1009 RequestFailure::Network(OutboundFailure::ConnectionClosed) |
1010 RequestFailure::NotConnected => {
1011 self.network_service
1012 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1013 },
1014 RequestFailure::UnknownProtocol => {
1015 debug_assert!(false, "Block request protocol should always be known.");
1016 },
1017 RequestFailure::Obsolete => {
1018 debug_assert!(
1019 false,
1020 "Can not receive `RequestFailure::Obsolete` after dropping the \
1021 response receiver.",
1022 );
1023 },
1024 }
1025 },
1026 Err(oneshot::Canceled) => {
1027 trace!(
1028 target: LOG_TARGET,
1029 "Request to peer {peer_id:?} failed due to oneshot being canceled.",
1030 );
1031 self.network_service
1032 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1033 },
1034 }
1035 }
1036
1037 fn num_active_peers(&self) -> usize {
1039 self.pending_responses.len()
1040 }
1041
1042 fn get_block_announce_proto_config<N: NetworkBackend<B, <B as BlockT>::Hash>>(
1044 protocol_id: ProtocolId,
1045 fork_id: Option<&str>,
1046 roles: Roles,
1047 best_number: NumberFor<B>,
1048 best_hash: B::Hash,
1049 genesis_hash: B::Hash,
1050 set_config: &SetConfig,
1051 metrics: NotificationMetrics,
1052 peer_store_handle: Arc<dyn PeerStoreProvider>,
1053 ) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
1054 let block_announces_protocol = {
1055 let genesis_hash = genesis_hash.as_ref();
1056 if let Some(fork_id) = fork_id {
1057 format!(
1058 "/{}/{}/block-announces/1",
1059 array_bytes::bytes2hex("", genesis_hash),
1060 fork_id
1061 )
1062 } else {
1063 format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash))
1064 }
1065 };
1066
1067 N::notification_config(
1068 block_announces_protocol.into(),
1069 iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(),
1070 MAX_BLOCK_ANNOUNCE_SIZE,
1071 Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
1072 roles,
1073 best_number,
1074 best_hash,
1075 genesis_hash,
1076 ))),
1077 set_config.clone(),
1078 metrics,
1079 peer_store_handle,
1080 )
1081 }
1082
1083 fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
1085 if let Some(metrics) = &self.metrics {
1086 metrics.import_queue_blocks_submitted.inc();
1087 }
1088
1089 self.import_queue.import_blocks(origin, blocks);
1090 }
1091
1092 fn import_justifications(
1094 &mut self,
1095 peer_id: PeerId,
1096 hash: B::Hash,
1097 number: NumberFor<B>,
1098 justifications: Justifications,
1099 ) {
1100 if let Some(metrics) = &self.metrics {
1101 metrics.import_queue_justifications_submitted.inc();
1102 }
1103
1104 self.import_queue.import_justifications(peer_id, hash, number, justifications);
1105 }
1106}