1use crate::{
23 block_announce_validator::{
24 BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
25 },
26 pending_responses::{PendingResponses, ResponseEvent},
27 service::{
28 self,
29 syncing_service::{SyncingService, ToServiceCommand},
30 },
31 strategy::{SyncingAction, SyncingStrategy},
32 types::{BadPeer, ExtendedPeerInfo, SyncEvent},
33 LOG_TARGET,
34};
35
36use codec::{Decode, DecodeAll, Encode};
37use futures::{channel::oneshot, StreamExt};
38use log::{debug, error, trace, warn};
39use prometheus_endpoint::{
40 register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
41};
42use schnellru::{ByLength, LruMap};
43use tokio::time::{Interval, MissedTickBehavior};
44
45use pezsc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
46use pezsc_consensus::{import_queue::ImportQueueService, IncomingBlock};
47use pezsc_network::{
48 config::{FullNetworkConfiguration, NotificationHandshake, ProtocolId, SetConfig},
49 peer_store::PeerStoreProvider,
50 request_responses::{OutboundFailure, RequestFailure},
51 service::{
52 traits::{Direction, NotificationConfig, NotificationEvent, ValidationResult},
53 NotificationMetrics,
54 },
55 types::ProtocolName,
56 utils::LruHashSet,
57 NetworkBackend, NotificationService, ReputationChange,
58};
59use pezsc_network_common::{
60 role::Roles,
61 sync::message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
62};
63use pezsc_network_types::PeerId;
64use pezsc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
65use pezsp_blockchain::{Error as ClientError, HeaderMetadata};
66use pezsp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
67use pezsp_runtime::{
68 traits::{Block as BlockT, Header, NumberFor, Zero},
69 Justifications,
70};
71
72use std::{
73 collections::{HashMap, HashSet},
74 iter,
75 num::NonZeroUsize,
76 sync::{
77 atomic::{AtomicBool, AtomicUsize, Ordering},
78 Arc,
79 },
80};
81
82const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);
84
85const MAX_KNOWN_BLOCKS: usize = 1024; const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
90
91mod rep {
92 use pezsc_network::ReputationChange as Rep;
93 pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
95 pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
97 pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
99 pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
101 pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
103 pub const IO: Rep = Rep::new(-(1 << 10), "IO error during request");
105}
106
107struct Metrics {
108 peers: Gauge<U64>,
109 import_queue_blocks_submitted: Counter<U64>,
110 import_queue_justifications_submitted: Counter<U64>,
111}
112
113impl Metrics {
114 fn register(r: &Registry, major_syncing: Arc<AtomicBool>) -> Result<Self, PrometheusError> {
115 MajorSyncingGauge::register(r, major_syncing)?;
116 Ok(Self {
117 peers: {
118 let g = Gauge::new("bizinikiwi_sync_peers", "Number of peers we sync with")?;
119 register(g, r)?
120 },
121 import_queue_blocks_submitted: {
122 let c = Counter::new(
123 "bizinikiwi_sync_import_queue_blocks_submitted",
124 "Number of blocks submitted to the import queue.",
125 )?;
126 register(c, r)?
127 },
128 import_queue_justifications_submitted: {
129 let c = Counter::new(
130 "bizinikiwi_sync_import_queue_justifications_submitted",
131 "Number of justifications submitted to the import queue.",
132 )?;
133 register(c, r)?
134 },
135 })
136 }
137}
138
139#[derive(Clone)]
141pub struct MajorSyncingGauge(Arc<AtomicBool>);
142
143impl MajorSyncingGauge {
144 fn register(registry: &Registry, value: Arc<AtomicBool>) -> Result<(), PrometheusError> {
147 prometheus_endpoint::register(
148 SourcedGauge::new(
149 &Opts::new(
150 "bizinikiwi_sub_libp2p_is_major_syncing",
151 "Whether the node is performing a major sync or not.",
152 ),
153 MajorSyncingGauge(value),
154 )?,
155 registry,
156 )?;
157
158 Ok(())
159 }
160}
161
162impl MetricSource for MajorSyncingGauge {
163 type N = u64;
164
165 fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
166 set(&[], self.0.load(Ordering::Relaxed) as u64);
167 }
168}
169
170#[derive(Debug)]
172pub struct Peer<B: BlockT> {
173 pub info: ExtendedPeerInfo<B>,
174 pub known_blocks: LruHashSet<B::Hash>,
176 inbound: bool,
178}
179
180pub struct SyncingEngine<B: BlockT, Client> {
181 strategy: Box<dyn SyncingStrategy<B>>,
183
184 client: Arc<Client>,
186
187 num_connected: Arc<AtomicUsize>,
189
190 is_major_syncing: Arc<AtomicBool>,
192
193 network_service: service::network::NetworkServiceHandle,
195
196 service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,
198
199 roles: Roles,
201
202 genesis_hash: B::Hash,
204
205 event_streams: Vec<TracingUnboundedSender<SyncEvent>>,
207
208 tick_timeout: Interval,
210
211 peers: HashMap<PeerId, Peer<B>>,
213
214 important_peers: HashSet<PeerId>,
217
218 default_peers_set_no_slot_connected_peers: HashSet<PeerId>,
220
221 default_peers_set_no_slot_peers: HashSet<PeerId>,
223
224 default_peers_set_num_full: usize,
227
228 default_peers_set_num_light: usize,
230
231 max_in_peers: usize,
233
234 num_in_peers: usize,
236
237 block_announce_validator: BlockAnnounceValidatorStream<B>,
239
240 block_announce_data_cache: LruMap<B::Hash, Vec<u8>>,
242
243 boot_node_ids: HashSet<PeerId>,
245
246 block_announce_protocol_name: ProtocolName,
248
249 metrics: Option<Metrics>,
251
252 notification_service: Box<dyn NotificationService>,
254
255 peer_store_handle: Arc<dyn PeerStoreProvider>,
257
258 pending_responses: PendingResponses,
260
261 import_queue: Box<dyn ImportQueueService<B>>,
263}
264
265impl<B: BlockT, Client> SyncingEngine<B, Client>
266where
267 B: BlockT,
268 Client: HeaderBackend<B>
269 + BlockBackend<B>
270 + HeaderMetadata<B, Error = pezsp_blockchain::Error>
271 + ProofProvider<B>
272 + Send
273 + Sync
274 + 'static,
275{
276 pub fn new<N>(
277 roles: Roles,
278 client: Arc<Client>,
279 metrics_registry: Option<&Registry>,
280 network_metrics: NotificationMetrics,
281 net_config: &FullNetworkConfiguration<B, <B as BlockT>::Hash, N>,
282 protocol_id: ProtocolId,
283 fork_id: Option<&str>,
284 block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
285 syncing_strategy: Box<dyn SyncingStrategy<B>>,
286 network_service: service::network::NetworkServiceHandle,
287 import_queue: Box<dyn ImportQueueService<B>>,
288 peer_store_handle: Arc<dyn PeerStoreProvider>,
289 ) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
290 where
291 N: NetworkBackend<B, <B as BlockT>::Hash>,
292 {
293 let cache_capacity = (net_config.network_config.default_peers_set.in_peers
294 + net_config.network_config.default_peers_set.out_peers)
295 .max(1);
296 let important_peers = {
297 let mut imp_p = HashSet::new();
298 for reserved in &net_config.network_config.default_peers_set.reserved_nodes {
299 imp_p.insert(reserved.peer_id);
300 }
301 for config in net_config.notification_protocols() {
302 let peer_ids = config.set_config().reserved_nodes.iter().map(|info| info.peer_id);
303 imp_p.extend(peer_ids);
304 }
305
306 imp_p.shrink_to_fit();
307 imp_p
308 };
309 let boot_node_ids = {
310 let mut list = HashSet::new();
311 for node in &net_config.network_config.boot_nodes {
312 list.insert(node.peer_id);
313 }
314 list.shrink_to_fit();
315 list
316 };
317 let default_peers_set_no_slot_peers = {
318 let mut no_slot_p: HashSet<PeerId> = net_config
319 .network_config
320 .default_peers_set
321 .reserved_nodes
322 .iter()
323 .map(|reserved| reserved.peer_id)
324 .collect();
325 no_slot_p.shrink_to_fit();
326 no_slot_p
327 };
328 let default_peers_set_num_full =
329 net_config.network_config.default_peers_set_num_full as usize;
330 let default_peers_set_num_light = {
331 let total = net_config.network_config.default_peers_set.out_peers
332 + net_config.network_config.default_peers_set.in_peers;
333 total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
334 };
335
336 let info = client.info();
337
338 let (block_announce_config, notification_service) =
339 Self::get_block_announce_proto_config::<N>(
340 protocol_id,
341 fork_id,
342 roles,
343 info.best_number,
344 info.best_hash,
345 info.genesis_hash,
346 &net_config.network_config.default_peers_set,
347 network_metrics,
348 Arc::clone(&peer_store_handle),
349 );
350
351 let block_announce_protocol_name = block_announce_config.protocol_name().clone();
352 let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
353 let num_connected = Arc::new(AtomicUsize::new(0));
354 let is_major_syncing = Arc::new(AtomicBool::new(false));
355
356 let max_full_peers = net_config.network_config.default_peers_set_num_full;
359 let max_out_peers = net_config.network_config.default_peers_set.out_peers;
360 let max_in_peers = (max_full_peers - max_out_peers) as usize;
361
362 let tick_timeout = {
363 let mut interval = tokio::time::interval(TICK_TIMEOUT);
364 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
365 interval
366 };
367
368 Ok((
369 Self {
370 roles,
371 client,
372 strategy: syncing_strategy,
373 network_service,
374 peers: HashMap::new(),
375 block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
376 block_announce_protocol_name,
377 block_announce_validator: BlockAnnounceValidatorStream::new(
378 block_announce_validator,
379 ),
380 num_connected: num_connected.clone(),
381 is_major_syncing: is_major_syncing.clone(),
382 service_rx,
383 genesis_hash: info.genesis_hash,
384 important_peers,
385 default_peers_set_no_slot_connected_peers: HashSet::new(),
386 boot_node_ids,
387 default_peers_set_no_slot_peers,
388 default_peers_set_num_full,
389 default_peers_set_num_light,
390 num_in_peers: 0usize,
391 max_in_peers,
392 event_streams: Vec::new(),
393 notification_service,
394 tick_timeout,
395 peer_store_handle,
396 metrics: if let Some(r) = metrics_registry {
397 match Metrics::register(r, is_major_syncing.clone()) {
398 Ok(metrics) => Some(metrics),
399 Err(err) => {
400 log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}");
401 None
402 },
403 }
404 } else {
405 None
406 },
407 pending_responses: PendingResponses::new(),
408 import_queue,
409 },
410 SyncingService::new(tx, num_connected, is_major_syncing),
411 block_announce_config,
412 ))
413 }
414
415 fn update_peer_info(
416 &mut self,
417 peer_id: &PeerId,
418 best_hash: B::Hash,
419 best_number: NumberFor<B>,
420 ) {
421 if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
422 peer.info.best_hash = best_hash;
423 peer.info.best_number = best_number;
424 }
425 }
426
427 fn process_block_announce_validation_result(
429 &mut self,
430 validation_result: BlockAnnounceValidationResult<B::Header>,
431 ) {
432 match validation_result {
433 BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
434 BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
435 if let Some((best_hash, best_number)) =
436 self.strategy.on_validated_block_announce(is_new_best, peer_id, &announce)
437 {
438 self.update_peer_info(&peer_id, best_hash, best_number);
439 }
440
441 if let Some(data) = announce.data {
442 if !data.is_empty() {
443 self.block_announce_data_cache.insert(announce.header.hash(), data);
444 }
445 }
446 },
447 BlockAnnounceValidationResult::Failure { peer_id, disconnect } => {
448 if disconnect {
449 log::debug!(
450 target: LOG_TARGET,
451 "Disconnecting peer {peer_id} due to block announce validation failure",
452 );
453 self.network_service
454 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
455 }
456
457 self.network_service.report_peer(peer_id, rep::BAD_BLOCK_ANNOUNCEMENT);
458 },
459 }
460 }
461
462 pub fn push_block_announce_validation(
464 &mut self,
465 peer_id: PeerId,
466 announce: BlockAnnounce<B::Header>,
467 ) {
468 let hash = announce.header.hash();
469
470 let peer = match self.peers.get_mut(&peer_id) {
471 Some(p) => p,
472 None => {
473 log::error!(
474 target: LOG_TARGET,
475 "Received block announce from disconnected peer {peer_id}",
476 );
477 debug_assert!(false);
478 return;
479 },
480 };
481 peer.known_blocks.insert(hash);
482
483 if peer.info.roles.is_full() {
484 let is_best = match announce.state.unwrap_or(BlockState::Best) {
485 BlockState::Best => true,
486 BlockState::Normal => false,
487 };
488
489 self.block_announce_validator
490 .push_block_announce_validation(peer_id, hash, announce, is_best);
491 }
492 }
493
494 pub fn announce_block(&mut self, hash: B::Hash, data: Option<Vec<u8>>) {
499 let header = match self.client.header(hash) {
500 Ok(Some(header)) => header,
501 Ok(None) => {
502 log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}");
503 return;
504 },
505 Err(e) => {
506 log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}");
507 return;
508 },
509 };
510
511 if header.number().is_zero() {
513 return;
514 }
515
516 let is_best = self.client.info().best_hash == hash;
517 log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");
518
519 let data = data
520 .or_else(|| self.block_announce_data_cache.get(&hash).cloned())
521 .unwrap_or_default();
522
523 for (peer_id, ref mut peer) in self.peers.iter_mut() {
524 let inserted = peer.known_blocks.insert(hash);
525 if inserted {
526 log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
527 let message = BlockAnnounce {
528 header: header.clone(),
529 state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
530 data: Some(data.clone()),
531 };
532
533 let _ = self.notification_service.send_sync_notification(peer_id, message.encode());
534 }
535 }
536 }
537
538 pub async fn run(mut self) {
539 loop {
540 tokio::select! {
541 _ = self.tick_timeout.tick() => {
542 },
546 command = self.service_rx.select_next_some() =>
547 self.process_service_command(command),
548 notification_event = self.notification_service.next_event() => match notification_event {
549 Some(event) => self.process_notification_event(event),
550 None => {
551 error!(
552 target: LOG_TARGET,
553 "Terminating `SyncingEngine` because `NotificationService` has terminated.",
554 );
555
556 return;
557 }
558 },
559 response_event = self.pending_responses.select_next_some() =>
560 self.process_response_event(response_event),
561 validation_result = self.block_announce_validator.select_next_some() =>
562 self.process_block_announce_validation_result(validation_result),
563 }
564
565 self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed);
567
568 if let Err(e) = self.process_strategy_actions() {
570 error!(
571 target: LOG_TARGET,
572 "Terminating `SyncingEngine` due to fatal error: {e:?}.",
573 );
574 return;
575 }
576 }
577 }
578
579 fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
580 for action in self.strategy.actions(&self.network_service)? {
581 match action {
582 SyncingAction::StartRequest { peer_id, key, request, remove_obsolete } => {
583 if !self.peers.contains_key(&peer_id) {
584 trace!(
585 target: LOG_TARGET,
586 "Cannot start request with strategy key {key:?} to unknown peer \
587 {peer_id}",
588 );
589 debug_assert!(false);
590 continue;
591 }
592 if remove_obsolete {
593 if self.pending_responses.remove(peer_id, key) {
594 warn!(
595 target: LOG_TARGET,
596 "Processed `SyncingAction::StartRequest` to {peer_id} with \
597 strategy key {key:?}. Stale response removed!",
598 )
599 } else {
600 trace!(
601 target: LOG_TARGET,
602 "Processed `SyncingAction::StartRequest` to {peer_id} with \
603 strategy key {key:?}.",
604 )
605 }
606 }
607
608 self.pending_responses.insert(peer_id, key, request);
609 },
610 SyncingAction::CancelRequest { peer_id, key } => {
611 let removed = self.pending_responses.remove(peer_id, key);
612
613 trace!(
614 target: LOG_TARGET,
615 "Processed `SyncingAction::CancelRequest`, response removed: {removed}.",
616 );
617 },
618 SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
619 self.pending_responses.remove_all(&peer_id);
620 self.network_service
621 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
622 self.network_service.report_peer(peer_id, rep);
623
624 trace!(target: LOG_TARGET, "{peer_id:?} dropped: {rep:?}.");
625 },
626 SyncingAction::ImportBlocks { origin, blocks } => {
627 let count = blocks.len();
628 self.import_blocks(origin, blocks);
629
630 trace!(
631 target: LOG_TARGET,
632 "Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
633 );
634 },
635 SyncingAction::ImportJustifications { peer_id, hash, number, justifications } => {
636 self.import_justifications(peer_id, hash, number, justifications);
637
638 trace!(
639 target: LOG_TARGET,
640 "Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
641 peer_id,
642 hash,
643 number,
644 )
645 },
646 SyncingAction::Finished => {},
648 }
649 }
650
651 Ok(())
652 }
653
654 fn process_service_command(&mut self, command: ToServiceCommand<B>) {
655 match command {
656 ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
657 self.strategy.set_sync_fork_request(peers, &hash, number);
658 },
659 ToServiceCommand::EventStream(tx) => {
660 for peer_id in self.peers.keys() {
662 let _ = tx.unbounded_send(SyncEvent::PeerConnected(*peer_id));
663 }
664 self.event_streams.push(tx);
665 },
666 ToServiceCommand::RequestJustification(hash, number) => {
667 self.strategy.request_justification(&hash, number)
668 },
669 ToServiceCommand::ClearJustificationRequests => {
670 self.strategy.clear_justification_requests()
671 },
672 ToServiceCommand::BlocksProcessed(imported, count, results) => {
673 self.strategy.on_blocks_processed(imported, count, results);
674 },
675 ToServiceCommand::JustificationImported(peer_id, hash, number, import_result) => {
676 let success =
677 matches!(import_result, pezsc_consensus::JustificationImportResult::Success);
678 self.strategy.on_justification_import(hash, number, success);
679
680 match import_result {
681 pezsc_consensus::JustificationImportResult::OutdatedJustification => {
682 log::info!(
683 target: LOG_TARGET,
684 "💔 Outdated justification provided by {peer_id} for #{hash}",
685 );
686 },
687 pezsc_consensus::JustificationImportResult::Failure => {
688 log::info!(
689 target: LOG_TARGET,
690 "💔 Invalid justification provided by {peer_id} for #{hash}",
691 );
692 self.network_service
693 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
694 self.network_service.report_peer(
695 peer_id,
696 ReputationChange::new_fatal("Invalid justification"),
697 );
698 },
699 pezsc_consensus::JustificationImportResult::Success => {
700 log::debug!(
701 target: LOG_TARGET,
702 "Justification for block #{hash} ({number}) imported from {peer_id} successfully",
703 );
704 },
705 }
706 },
707 ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
708 ToServiceCommand::NewBestBlockImported(hash, number) => {
709 log::debug!(target: LOG_TARGET, "New best block imported {:?}/#{}", hash, number);
710
711 self.strategy.update_chain_info(&hash, number);
712 let _ = self.notification_service.try_set_handshake(
713 BlockAnnouncesHandshake::<B>::build(
714 self.roles,
715 number,
716 hash,
717 self.genesis_hash,
718 )
719 .encode(),
720 );
721 },
722 ToServiceCommand::Status(tx) => {
723 let _ = tx.send(self.strategy.status());
724 },
725 ToServiceCommand::NumActivePeers(tx) => {
726 let _ = tx.send(self.num_active_peers());
727 },
728 ToServiceCommand::NumDownloadedBlocks(tx) => {
729 let _ = tx.send(self.strategy.num_downloaded_blocks());
730 },
731 ToServiceCommand::NumSyncRequests(tx) => {
732 let _ = tx.send(self.strategy.num_sync_requests());
733 },
734 ToServiceCommand::PeersInfo(tx) => {
735 let peers_info =
736 self.peers.iter().map(|(peer_id, peer)| (*peer_id, peer.info)).collect();
737 let _ = tx.send(peers_info);
738 },
739 ToServiceCommand::OnBlockFinalized(hash, header) => {
740 self.strategy.on_block_finalized(&hash, *header.number())
741 },
742 }
743 }
744
745 fn process_notification_event(&mut self, event: NotificationEvent) {
746 match event {
747 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
748 let validation_result = self
749 .validate_connection(&peer, handshake, Direction::Inbound)
750 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
751
752 let _ = result_tx.send(validation_result);
753 },
754 NotificationEvent::NotificationStreamOpened { peer, handshake, direction, .. } => {
755 log::debug!(
756 target: LOG_TARGET,
757 "Substream opened for {peer}, handshake {handshake:?}"
758 );
759
760 match self.validate_connection(&peer, handshake, direction) {
761 Ok(handshake) => {
762 if self.on_sync_peer_connected(peer, &handshake, direction).is_err() {
763 log::debug!(target: LOG_TARGET, "Failed to register peer {peer}");
764 self.network_service
765 .disconnect_peer(peer, self.block_announce_protocol_name.clone());
766 }
767 },
768 Err(wrong_genesis) => {
769 log::debug!(target: LOG_TARGET, "`SyncingEngine` rejected {peer}");
770
771 if wrong_genesis {
772 self.peer_store_handle.report_peer(peer, rep::GENESIS_MISMATCH);
773 }
774
775 self.network_service
776 .disconnect_peer(peer, self.block_announce_protocol_name.clone());
777 },
778 }
779 },
780 NotificationEvent::NotificationStreamClosed { peer } => {
781 self.on_sync_peer_disconnected(peer);
782 },
783 NotificationEvent::NotificationReceived { peer, notification } => {
784 if !self.peers.contains_key(&peer) {
785 log::error!(
786 target: LOG_TARGET,
787 "received notification from {peer} who had been earlier refused by `SyncingEngine`",
788 );
789 return;
790 }
791
792 let Ok(announce) = BlockAnnounce::decode(&mut notification.as_ref()) else {
793 log::warn!(target: LOG_TARGET, "failed to decode block announce");
794 return;
795 };
796
797 self.push_block_announce_validation(peer, announce);
798 },
799 }
800 }
801
802 fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) {
806 let Some(info) = self.peers.remove(&peer_id) else {
807 log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
808 return;
809 };
810 if let Some(metrics) = &self.metrics {
811 metrics.peers.dec();
812 }
813 self.num_connected.fetch_sub(1, Ordering::AcqRel);
814
815 if self.important_peers.contains(&peer_id) {
816 log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
817 } else {
818 log::debug!(target: LOG_TARGET, "{peer_id} disconnected");
819 }
820
821 if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id)
822 && info.inbound
823 && info.info.roles.is_full()
824 {
825 match self.num_in_peers.checked_sub(1) {
826 Some(value) => {
827 self.num_in_peers = value;
828 },
829 None => {
830 log::error!(
831 target: LOG_TARGET,
832 "trying to disconnect an inbound node which is not counted as inbound"
833 );
834 debug_assert!(false);
835 },
836 }
837 }
838
839 self.strategy.remove_peer(&peer_id);
840 self.pending_responses.remove_all(&peer_id);
841 self.event_streams
842 .retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
843 }
844
845 fn validate_handshake(
847 &mut self,
848 peer_id: &PeerId,
849 handshake: Vec<u8>,
850 ) -> Result<BlockAnnouncesHandshake<B>, bool> {
851 log::trace!(target: LOG_TARGET, "Validate handshake for {peer_id}");
852
853 let handshake = <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &handshake[..])
854 .map_err(|error| {
855 log::debug!(target: LOG_TARGET, "Failed to decode handshake for {peer_id}: {error:?}");
856 false
857 })?;
858
859 if handshake.genesis_hash != self.genesis_hash {
860 if self.important_peers.contains(&peer_id) {
861 log::error!(
862 target: LOG_TARGET,
863 "Reserved peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
864 self.genesis_hash,
865 handshake.genesis_hash,
866 );
867 } else if self.boot_node_ids.contains(&peer_id) {
868 log::error!(
869 target: LOG_TARGET,
870 "Bootnode with peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
871 self.genesis_hash,
872 handshake.genesis_hash,
873 );
874 } else {
875 log::debug!(
876 target: LOG_TARGET,
877 "Peer is on different chain (our genesis: {} theirs: {})",
878 self.genesis_hash,
879 handshake.genesis_hash
880 );
881 }
882
883 return Err(true);
884 }
885
886 Ok(handshake)
887 }
888
889 fn validate_connection(
903 &mut self,
904 peer_id: &PeerId,
905 handshake: Vec<u8>,
906 direction: Direction,
907 ) -> Result<BlockAnnouncesHandshake<B>, bool> {
908 log::trace!(target: LOG_TARGET, "New peer {peer_id} {handshake:?}");
909
910 let handshake = self.validate_handshake(peer_id, handshake)?;
911
912 if self.peers.contains_key(&peer_id) {
913 log::error!(
914 target: LOG_TARGET,
915 "Called `validate_connection()` with already connected peer {peer_id}",
916 );
917 debug_assert!(false);
918 return Err(false);
919 }
920
921 let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&peer_id);
922 let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };
923
924 if handshake.roles.is_full()
925 && self.strategy.num_peers()
926 >= self.default_peers_set_num_full
927 + self.default_peers_set_no_slot_connected_peers.len()
928 + this_peer_reserved_slot
929 {
930 log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {peer_id}");
931 return Err(false);
932 }
933
934 if !no_slot_peer
936 && handshake.roles.is_full()
937 && direction.is_inbound()
938 && self.num_in_peers == self.max_in_peers
939 {
940 log::debug!(target: LOG_TARGET, "All inbound slots have been consumed, rejecting {peer_id}");
941 return Err(false);
942 }
943
944 if handshake.roles.is_light()
949 && (self.peers.len() - self.strategy.num_peers()) >= self.default_peers_set_num_light
950 {
951 log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
952 return Err(false);
953 }
954
955 Ok(handshake)
956 }
957
958 fn on_sync_peer_connected(
964 &mut self,
965 peer_id: PeerId,
966 status: &BlockAnnouncesHandshake<B>,
967 direction: Direction,
968 ) -> Result<(), ()> {
969 log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}");
970
971 let peer = Peer {
972 info: ExtendedPeerInfo {
973 roles: status.roles,
974 best_hash: status.best_hash,
975 best_number: status.best_number,
976 },
977 known_blocks: LruHashSet::new(
978 NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
979 ),
980 inbound: direction.is_inbound(),
981 };
982
983 if status.roles.is_full() {
985 self.strategy.add_peer(peer_id, peer.info.best_hash, peer.info.best_number);
986 }
987
988 log::debug!(target: LOG_TARGET, "Connected {peer_id}");
989
990 if self.peers.insert(peer_id, peer).is_none() {
991 if let Some(metrics) = &self.metrics {
992 metrics.peers.inc();
993 }
994 self.num_connected.fetch_add(1, Ordering::AcqRel);
995 }
996 self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());
997
998 if self.default_peers_set_no_slot_peers.contains(&peer_id) {
999 self.default_peers_set_no_slot_connected_peers.insert(peer_id);
1000 } else if direction.is_inbound() && status.roles.is_full() {
1001 self.num_in_peers += 1;
1002 }
1003
1004 self.event_streams
1005 .retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok());
1006
1007 Ok(())
1008 }
1009
1010 fn process_response_event(&mut self, response_event: ResponseEvent) {
1011 let ResponseEvent { peer_id, key, response: response_result } = response_event;
1012
1013 match response_result {
1014 Ok(Ok((response, protocol_name))) => {
1015 self.strategy.on_generic_response(&peer_id, key, protocol_name, response);
1016 },
1017 Ok(Err(e)) => {
1018 debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}.");
1019
1020 match e {
1021 RequestFailure::Network(OutboundFailure::Timeout) => {
1022 self.network_service.report_peer(peer_id, rep::TIMEOUT);
1023 self.network_service
1024 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1025 },
1026 RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1027 self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL);
1028 self.network_service
1029 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1030 },
1031 RequestFailure::Network(OutboundFailure::DialFailure) => {
1032 self.network_service
1033 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1034 },
1035 RequestFailure::Refused => {
1036 self.network_service.report_peer(peer_id, rep::REFUSED);
1037 self.network_service
1038 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1039 },
1040 RequestFailure::Network(OutboundFailure::ConnectionClosed)
1041 | RequestFailure::NotConnected => {
1042 self.network_service
1043 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1044 },
1045 RequestFailure::UnknownProtocol => {
1046 debug_assert!(false, "Block request protocol should always be known.");
1047 },
1048 RequestFailure::Obsolete => {
1049 debug_assert!(
1050 false,
1051 "Can not receive `RequestFailure::Obsolete` after dropping the \
1052 response receiver.",
1053 );
1054 },
1055 RequestFailure::Network(OutboundFailure::Io(_)) => {
1056 self.network_service.report_peer(peer_id, rep::IO);
1057 self.network_service
1058 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1059 },
1060 }
1061 },
1062 Err(oneshot::Canceled) => {
1063 trace!(
1064 target: LOG_TARGET,
1065 "Request to peer {peer_id:?} failed due to oneshot being canceled.",
1066 );
1067 self.network_service
1068 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1069 },
1070 }
1071 }
1072
1073 fn num_active_peers(&self) -> usize {
1075 self.pending_responses.len()
1076 }
1077
1078 fn get_block_announce_proto_config<N: NetworkBackend<B, <B as BlockT>::Hash>>(
1080 protocol_id: ProtocolId,
1081 fork_id: Option<&str>,
1082 roles: Roles,
1083 best_number: NumberFor<B>,
1084 best_hash: B::Hash,
1085 genesis_hash: B::Hash,
1086 set_config: &SetConfig,
1087 metrics: NotificationMetrics,
1088 peer_store_handle: Arc<dyn PeerStoreProvider>,
1089 ) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
1090 let block_announces_protocol = {
1091 let genesis_hash = genesis_hash.as_ref();
1092 if let Some(fork_id) = fork_id {
1093 format!(
1094 "/{}/{}/block-announces/1",
1095 array_bytes::bytes2hex("", genesis_hash),
1096 fork_id
1097 )
1098 } else {
1099 format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash))
1100 }
1101 };
1102
1103 N::notification_config(
1104 block_announces_protocol.into(),
1105 iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(),
1106 MAX_BLOCK_ANNOUNCE_SIZE,
1107 Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
1108 roles,
1109 best_number,
1110 best_hash,
1111 genesis_hash,
1112 ))),
1113 set_config.clone(),
1114 metrics,
1115 peer_store_handle,
1116 )
1117 }
1118
1119 fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
1121 if let Some(metrics) = &self.metrics {
1122 metrics.import_queue_blocks_submitted.inc();
1123 }
1124
1125 self.import_queue.import_blocks(origin, blocks);
1126 }
1127
1128 fn import_justifications(
1130 &mut self,
1131 peer_id: PeerId,
1132 hash: B::Hash,
1133 number: NumberFor<B>,
1134 justifications: Justifications,
1135 ) {
1136 if let Some(metrics) = &self.metrics {
1137 metrics.import_queue_justifications_submitted.inc();
1138 }
1139
1140 self.import_queue.import_justifications(peer_id, hash, number, justifications);
1141 }
1142}