1use crate::{
11 sync::{
12 block_announce_validator::{
13 BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
14 },
15 pending_responses::{PendingResponses, ResponseEvent},
16 service::{
17 self,
18 syncing_service::{SyncingService, ToServiceCommand},
19 },
20 strategy::{SyncingAction, SyncingStrategy},
21 types::{BadPeer, ExtendedPeerInfo, SyncEvent},
22 },
23 LOG_TARGET,
24};
25
26use codec::{Decode, DecodeAll, Encode};
27use futures::{channel::oneshot, StreamExt};
28use log::{debug, error, trace, warn};
29use soil_prometheus::{
30 register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
31};
32use schnellru::{ByLength, LruMap};
33use tokio::time::{Interval, MissedTickBehavior};
34
35use soil_client::blockchain::{Error as ClientError, HeaderMetadata};
36use soil_client::client_api::{BlockBackend, HeaderBackend, ProofProvider};
37use soil_client::consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
38use soil_client::import::{ImportQueueService, IncomingBlock};
39use soil_client::utils::mpsc::{
40 tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
41};
42use soil_network::common::{
43 role::Roles,
44 sync::message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
45};
46use soil_network::types::PeerId;
47use soil_network::{
48 config::{FullNetworkConfiguration, NotificationHandshake, ProtocolId, SetConfig},
49 peer_store::PeerStoreProvider,
50 request_responses::{OutboundFailure, RequestFailure},
51 service::{
52 traits::{Direction, NotificationConfig, NotificationEvent, ValidationResult},
53 NotificationMetrics,
54 },
55 types::ProtocolName,
56 utils::LruHashSet,
57 NetworkBackend, NotificationService, ReputationChange,
58};
59use subsoil::runtime::{
60 traits::{Block as BlockT, Header, NumberFor, Zero},
61 Justifications,
62};
63
64use std::{
65 collections::{HashMap, HashSet},
66 iter,
67 num::NonZeroUsize,
68 sync::{
69 atomic::{AtomicBool, AtomicUsize, Ordering},
70 Arc,
71 },
72};
73
74const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);
76
77const MAX_KNOWN_BLOCKS: usize = 1024; const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
82
83mod rep {
84 use soil_network::ReputationChange as Rep;
85 pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
87 pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
89 pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
91 pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
93 pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
95 pub const IO: Rep = Rep::new(-(1 << 10), "IO error during request");
97}
98
99struct Metrics {
100 peers: Gauge<U64>,
101 import_queue_blocks_submitted: Counter<U64>,
102 import_queue_justifications_submitted: Counter<U64>,
103}
104
105impl Metrics {
106 fn register(r: &Registry, major_syncing: Arc<AtomicBool>) -> Result<Self, PrometheusError> {
107 MajorSyncingGauge::register(r, major_syncing)?;
108 Ok(Self {
109 peers: {
110 let g = Gauge::new("substrate_sync_peers", "Number of peers we sync with")?;
111 register(g, r)?
112 },
113 import_queue_blocks_submitted: {
114 let c = Counter::new(
115 "substrate_sync_import_queue_blocks_submitted",
116 "Number of blocks submitted to the import queue.",
117 )?;
118 register(c, r)?
119 },
120 import_queue_justifications_submitted: {
121 let c = Counter::new(
122 "substrate_sync_import_queue_justifications_submitted",
123 "Number of justifications submitted to the import queue.",
124 )?;
125 register(c, r)?
126 },
127 })
128 }
129}
130
131#[derive(Clone)]
133pub struct MajorSyncingGauge(Arc<AtomicBool>);
134
135impl MajorSyncingGauge {
136 fn register(registry: &Registry, value: Arc<AtomicBool>) -> Result<(), PrometheusError> {
139 soil_prometheus::register(
140 SourcedGauge::new(
141 &Opts::new(
142 "substrate_sub_libp2p_is_major_syncing",
143 "Whether the node is performing a major sync or not.",
144 ),
145 MajorSyncingGauge(value),
146 )?,
147 registry,
148 )?;
149
150 Ok(())
151 }
152}
153
154impl MetricSource for MajorSyncingGauge {
155 type N = u64;
156
157 fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
158 set(&[], self.0.load(Ordering::Relaxed) as u64);
159 }
160}
161
162#[derive(Debug)]
164pub struct Peer<B: BlockT> {
165 pub info: ExtendedPeerInfo<B>,
166 pub known_blocks: LruHashSet<B::Hash>,
168 inbound: bool,
170}
171
172pub struct SyncingEngine<B: BlockT, Client> {
173 strategy: Box<dyn SyncingStrategy<B>>,
175
176 client: Arc<Client>,
178
179 num_connected: Arc<AtomicUsize>,
181
182 is_major_syncing: Arc<AtomicBool>,
184
185 network_service: service::network::NetworkServiceHandle,
187
188 service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,
190
191 roles: Roles,
193
194 genesis_hash: B::Hash,
196
197 event_streams: Vec<TracingUnboundedSender<SyncEvent>>,
199
200 tick_timeout: Interval,
202
203 peers: HashMap<PeerId, Peer<B>>,
205
206 important_peers: HashSet<PeerId>,
209
210 default_peers_set_no_slot_connected_peers: HashSet<PeerId>,
212
213 default_peers_set_no_slot_peers: HashSet<PeerId>,
215
216 default_peers_set_num_full: usize,
219
220 default_peers_set_num_light: usize,
222
223 max_in_peers: usize,
225
226 num_in_peers: usize,
228
229 block_announce_validator: BlockAnnounceValidatorStream<B>,
231
232 block_announce_data_cache: LruMap<B::Hash, Vec<u8>>,
234
235 boot_node_ids: HashSet<PeerId>,
237
238 block_announce_protocol_name: ProtocolName,
240
241 metrics: Option<Metrics>,
243
244 notification_service: Box<dyn NotificationService>,
246
247 peer_store_handle: Arc<dyn PeerStoreProvider>,
249
250 pending_responses: PendingResponses,
252
253 import_queue: Box<dyn ImportQueueService<B>>,
255}
256
257impl<B: BlockT, Client> SyncingEngine<B, Client>
258where
259 B: BlockT,
260 Client: HeaderBackend<B>
261 + BlockBackend<B>
262 + HeaderMetadata<B, Error = soil_client::blockchain::Error>
263 + ProofProvider<B>
264 + Send
265 + Sync
266 + 'static,
267{
268 pub fn new<N>(
269 roles: Roles,
270 client: Arc<Client>,
271 metrics_registry: Option<&Registry>,
272 network_metrics: NotificationMetrics,
273 net_config: &FullNetworkConfiguration<B, <B as BlockT>::Hash, N>,
274 protocol_id: ProtocolId,
275 fork_id: Option<&str>,
276 block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
277 syncing_strategy: Box<dyn SyncingStrategy<B>>,
278 network_service: service::network::NetworkServiceHandle,
279 import_queue: Box<dyn ImportQueueService<B>>,
280 peer_store_handle: Arc<dyn PeerStoreProvider>,
281 ) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
282 where
283 N: NetworkBackend<B, <B as BlockT>::Hash>,
284 {
285 let cache_capacity = (net_config.network_config.default_peers_set.in_peers
286 + net_config.network_config.default_peers_set.out_peers)
287 .max(1);
288 let important_peers = {
289 let mut imp_p = HashSet::new();
290 for reserved in &net_config.network_config.default_peers_set.reserved_nodes {
291 imp_p.insert(reserved.peer_id);
292 }
293 for config in net_config.notification_protocols() {
294 let peer_ids = config.set_config().reserved_nodes.iter().map(|info| info.peer_id);
295 imp_p.extend(peer_ids);
296 }
297
298 imp_p.shrink_to_fit();
299 imp_p
300 };
301 let boot_node_ids = {
302 let mut list = HashSet::new();
303 for node in &net_config.network_config.boot_nodes {
304 list.insert(node.peer_id);
305 }
306 list.shrink_to_fit();
307 list
308 };
309 let default_peers_set_no_slot_peers = {
310 let mut no_slot_p: HashSet<PeerId> = net_config
311 .network_config
312 .default_peers_set
313 .reserved_nodes
314 .iter()
315 .map(|reserved| reserved.peer_id)
316 .collect();
317 no_slot_p.shrink_to_fit();
318 no_slot_p
319 };
320 let default_peers_set_num_full =
321 net_config.network_config.default_peers_set_num_full as usize;
322 let default_peers_set_num_light = {
323 let total = net_config.network_config.default_peers_set.out_peers
324 + net_config.network_config.default_peers_set.in_peers;
325 total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
326 };
327
328 let info = client.info();
329
330 let (block_announce_config, notification_service) =
331 Self::get_block_announce_proto_config::<N>(
332 protocol_id,
333 fork_id,
334 roles,
335 info.best_number,
336 info.best_hash,
337 info.genesis_hash,
338 &net_config.network_config.default_peers_set,
339 network_metrics,
340 Arc::clone(&peer_store_handle),
341 );
342
343 let block_announce_protocol_name = block_announce_config.protocol_name().clone();
344 let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
345 let num_connected = Arc::new(AtomicUsize::new(0));
346 let is_major_syncing = Arc::new(AtomicBool::new(false));
347
348 let max_full_peers = net_config.network_config.default_peers_set_num_full;
351 let max_out_peers = net_config.network_config.default_peers_set.out_peers;
352 let max_in_peers = (max_full_peers - max_out_peers) as usize;
353
354 let tick_timeout = {
355 let mut interval = tokio::time::interval(TICK_TIMEOUT);
356 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
357 interval
358 };
359
360 Ok((
361 Self {
362 roles,
363 client,
364 strategy: syncing_strategy,
365 network_service,
366 peers: HashMap::new(),
367 block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
368 block_announce_protocol_name,
369 block_announce_validator: BlockAnnounceValidatorStream::new(
370 block_announce_validator,
371 ),
372 num_connected: num_connected.clone(),
373 is_major_syncing: is_major_syncing.clone(),
374 service_rx,
375 genesis_hash: info.genesis_hash,
376 important_peers,
377 default_peers_set_no_slot_connected_peers: HashSet::new(),
378 boot_node_ids,
379 default_peers_set_no_slot_peers,
380 default_peers_set_num_full,
381 default_peers_set_num_light,
382 num_in_peers: 0usize,
383 max_in_peers,
384 event_streams: Vec::new(),
385 notification_service,
386 tick_timeout,
387 peer_store_handle,
388 metrics: if let Some(r) = metrics_registry {
389 match Metrics::register(r, is_major_syncing.clone()) {
390 Ok(metrics) => Some(metrics),
391 Err(err) => {
392 log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}");
393 None
394 },
395 }
396 } else {
397 None
398 },
399 pending_responses: PendingResponses::new(),
400 import_queue,
401 },
402 SyncingService::new(tx, num_connected, is_major_syncing),
403 block_announce_config,
404 ))
405 }
406
407 fn update_peer_info(
408 &mut self,
409 peer_id: &PeerId,
410 best_hash: B::Hash,
411 best_number: NumberFor<B>,
412 ) {
413 if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
414 peer.info.best_hash = best_hash;
415 peer.info.best_number = best_number;
416 }
417 }
418
419 fn process_block_announce_validation_result(
421 &mut self,
422 validation_result: BlockAnnounceValidationResult<B::Header>,
423 ) {
424 match validation_result {
425 BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
426 BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
427 if let Some((best_hash, best_number)) =
428 self.strategy.on_validated_block_announce(is_new_best, peer_id, &announce)
429 {
430 self.update_peer_info(&peer_id, best_hash, best_number);
431 }
432
433 if let Some(data) = announce.data {
434 if !data.is_empty() {
435 self.block_announce_data_cache.insert(announce.header.hash(), data);
436 }
437 }
438 },
439 BlockAnnounceValidationResult::Failure { peer_id, disconnect } => {
440 if disconnect {
441 log::debug!(
442 target: LOG_TARGET,
443 "Disconnecting peer {peer_id} due to block announce validation failure",
444 );
445 self.network_service
446 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
447 }
448
449 self.network_service.report_peer(peer_id, rep::BAD_BLOCK_ANNOUNCEMENT);
450 },
451 }
452 }
453
454 pub fn push_block_announce_validation(
456 &mut self,
457 peer_id: PeerId,
458 announce: BlockAnnounce<B::Header>,
459 ) {
460 let hash = announce.header.hash();
461
462 let peer = match self.peers.get_mut(&peer_id) {
463 Some(p) => p,
464 None => {
465 log::error!(
466 target: LOG_TARGET,
467 "Received block announce from disconnected peer {peer_id}",
468 );
469 debug_assert!(false);
470 return;
471 },
472 };
473 peer.known_blocks.insert(hash);
474
475 if peer.info.roles.is_full() {
476 let is_best = match announce.state.unwrap_or(BlockState::Best) {
477 BlockState::Best => true,
478 BlockState::Normal => false,
479 };
480
481 self.block_announce_validator
482 .push_block_announce_validation(peer_id, hash, announce, is_best);
483 }
484 }
485
486 pub fn announce_block(&mut self, hash: B::Hash, data: Option<Vec<u8>>) {
491 let header = match self.client.header(hash) {
492 Ok(Some(header)) => header,
493 Ok(None) => {
494 log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}");
495 return;
496 },
497 Err(e) => {
498 log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}");
499 return;
500 },
501 };
502
503 if header.number().is_zero() {
505 return;
506 }
507
508 let is_best = self.client.info().best_hash == hash;
509 log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");
510
511 let data = data
512 .or_else(|| self.block_announce_data_cache.get(&hash).cloned())
513 .unwrap_or_default();
514
515 for (peer_id, ref mut peer) in self.peers.iter_mut() {
516 let inserted = peer.known_blocks.insert(hash);
517 if inserted {
518 log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
519 let message = BlockAnnounce {
520 header: header.clone(),
521 state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
522 data: Some(data.clone()),
523 };
524
525 let _ = self.notification_service.send_sync_notification(peer_id, message.encode());
526 }
527 }
528 }
529
530 pub async fn run(mut self) {
531 loop {
532 tokio::select! {
533 _ = self.tick_timeout.tick() => {
534 },
538 command = self.service_rx.select_next_some() =>
539 self.process_service_command(command),
540 notification_event = self.notification_service.next_event() => match notification_event {
541 Some(event) => self.process_notification_event(event),
542 None => {
543 error!(
544 target: LOG_TARGET,
545 "Terminating `SyncingEngine` because `NotificationService` has terminated.",
546 );
547
548 return;
549 }
550 },
551 response_event = self.pending_responses.select_next_some() =>
552 self.process_response_event(response_event),
553 validation_result = self.block_announce_validator.select_next_some() =>
554 self.process_block_announce_validation_result(validation_result),
555 }
556
557 self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed);
559
560 if let Err(e) = self.process_strategy_actions() {
562 error!(
563 target: LOG_TARGET,
564 "Terminating `SyncingEngine` due to fatal error: {e:?}.",
565 );
566 return;
567 }
568 }
569 }
570
571 fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
572 for action in self.strategy.actions(&self.network_service)? {
573 match action {
574 SyncingAction::StartRequest { peer_id, key, request, remove_obsolete } => {
575 if !self.peers.contains_key(&peer_id) {
576 trace!(
577 target: LOG_TARGET,
578 "Cannot start request with strategy key {key:?} to unknown peer \
579 {peer_id}",
580 );
581 debug_assert!(false);
582 continue;
583 }
584 if remove_obsolete {
585 if self.pending_responses.remove(peer_id, key) {
586 warn!(
587 target: LOG_TARGET,
588 "Processed `SyncingAction::StartRequest` to {peer_id} with \
589 strategy key {key:?}. Stale response removed!",
590 )
591 } else {
592 trace!(
593 target: LOG_TARGET,
594 "Processed `SyncingAction::StartRequest` to {peer_id} with \
595 strategy key {key:?}.",
596 )
597 }
598 }
599
600 self.pending_responses.insert(peer_id, key, request);
601 },
602 SyncingAction::CancelRequest { peer_id, key } => {
603 let removed = self.pending_responses.remove(peer_id, key);
604
605 trace!(
606 target: LOG_TARGET,
607 "Processed `SyncingAction::CancelRequest`, response removed: {removed}.",
608 );
609 },
610 SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
611 self.pending_responses.remove_all(&peer_id);
612 self.network_service
613 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
614 self.network_service.report_peer(peer_id, rep);
615
616 trace!(target: LOG_TARGET, "{peer_id:?} dropped: {rep:?}.");
617 },
618 SyncingAction::ImportBlocks { origin, blocks } => {
619 let count = blocks.len();
620 self.import_blocks(origin, blocks);
621
622 trace!(
623 target: LOG_TARGET,
624 "Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
625 );
626 },
627 SyncingAction::ImportJustifications { peer_id, hash, number, justifications } => {
628 self.import_justifications(peer_id, hash, number, justifications);
629
630 trace!(
631 target: LOG_TARGET,
632 "Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
633 peer_id,
634 hash,
635 number,
636 )
637 },
638 SyncingAction::Finished => {},
640 }
641 }
642
643 Ok(())
644 }
645
646 fn process_service_command(&mut self, command: ToServiceCommand<B>) {
647 match command {
648 ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
649 self.strategy.set_sync_fork_request(peers, &hash, number);
650 },
651 ToServiceCommand::EventStream(tx) => {
652 for peer_id in self.peers.keys() {
654 let _ = tx.unbounded_send(SyncEvent::PeerConnected(*peer_id));
655 }
656 self.event_streams.push(tx);
657 },
658 ToServiceCommand::RequestJustification(hash, number) => {
659 self.strategy.request_justification(&hash, number)
660 },
661 ToServiceCommand::ClearJustificationRequests => {
662 self.strategy.clear_justification_requests()
663 },
664 ToServiceCommand::BlocksProcessed(imported, count, results) => {
665 self.strategy.on_blocks_processed(imported, count, results);
666 },
667 ToServiceCommand::JustificationImported(peer_id, hash, number, import_result) => {
668 let success = matches!(
669 import_result,
670 soil_client::import::JustificationImportResult::Success
671 );
672 self.strategy.on_justification_import(hash, number, success);
673
674 match import_result {
675 soil_client::import::JustificationImportResult::OutdatedJustification => {
676 log::info!(
677 target: LOG_TARGET,
678 "💔 Outdated justification provided by {peer_id} for #{hash}",
679 );
680 },
681 soil_client::import::JustificationImportResult::Failure => {
682 log::info!(
683 target: LOG_TARGET,
684 "💔 Invalid justification provided by {peer_id} for #{hash}",
685 );
686 self.network_service
687 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
688 self.network_service.report_peer(
689 peer_id,
690 ReputationChange::new_fatal("Invalid justification"),
691 );
692 },
693 soil_client::import::JustificationImportResult::Success => {
694 log::debug!(
695 target: LOG_TARGET,
696 "Justification for block #{hash} ({number}) imported from {peer_id} successfully",
697 );
698 },
699 }
700 },
701 ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
702 ToServiceCommand::NewBestBlockImported(hash, number) => {
703 log::debug!(target: LOG_TARGET, "New best block imported {:?}/#{}", hash, number);
704
705 self.strategy.update_chain_info(&hash, number);
706 let _ = self.notification_service.try_set_handshake(
707 BlockAnnouncesHandshake::<B>::build(
708 self.roles,
709 number,
710 hash,
711 self.genesis_hash,
712 )
713 .encode(),
714 );
715 },
716 ToServiceCommand::Status(tx) => {
717 let _ = tx.send(self.strategy.status());
718 },
719 ToServiceCommand::NumActivePeers(tx) => {
720 let _ = tx.send(self.num_active_peers());
721 },
722 ToServiceCommand::NumDownloadedBlocks(tx) => {
723 let _ = tx.send(self.strategy.num_downloaded_blocks());
724 },
725 ToServiceCommand::NumSyncRequests(tx) => {
726 let _ = tx.send(self.strategy.num_sync_requests());
727 },
728 ToServiceCommand::PeersInfo(tx) => {
729 let peers_info =
730 self.peers.iter().map(|(peer_id, peer)| (*peer_id, peer.info)).collect();
731 let _ = tx.send(peers_info);
732 },
733 ToServiceCommand::OnBlockFinalized(hash, header) => {
734 self.strategy.on_block_finalized(&hash, *header.number())
735 },
736 }
737 }
738
739 fn process_notification_event(&mut self, event: NotificationEvent) {
740 match event {
741 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
742 let validation_result = self
743 .validate_connection(&peer, handshake, Direction::Inbound)
744 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
745
746 let _ = result_tx.send(validation_result);
747 },
748 NotificationEvent::NotificationStreamOpened { peer, handshake, direction, .. } => {
749 log::debug!(
750 target: LOG_TARGET,
751 "Substream opened for {peer}, handshake {handshake:?}"
752 );
753
754 match self.validate_connection(&peer, handshake, direction) {
755 Ok(handshake) => {
756 if self.on_sync_peer_connected(peer, &handshake, direction).is_err() {
757 log::debug!(target: LOG_TARGET, "Failed to register peer {peer}");
758 self.network_service
759 .disconnect_peer(peer, self.block_announce_protocol_name.clone());
760 }
761 },
762 Err(wrong_genesis) => {
763 log::debug!(target: LOG_TARGET, "`SyncingEngine` rejected {peer}");
764
765 if wrong_genesis {
766 self.peer_store_handle.report_peer(peer, rep::GENESIS_MISMATCH);
767 }
768
769 self.network_service
770 .disconnect_peer(peer, self.block_announce_protocol_name.clone());
771 },
772 }
773 },
774 NotificationEvent::NotificationStreamClosed { peer } => {
775 self.on_sync_peer_disconnected(peer);
776 },
777 NotificationEvent::NotificationReceived { peer, notification } => {
778 if !self.peers.contains_key(&peer) {
779 log::error!(
780 target: LOG_TARGET,
781 "received notification from {peer} who had been earlier refused by `SyncingEngine`",
782 );
783 return;
784 }
785
786 let Ok(announce) = BlockAnnounce::decode(&mut notification.as_ref()) else {
787 log::warn!(target: LOG_TARGET, "failed to decode block announce");
788 return;
789 };
790
791 self.push_block_announce_validation(peer, announce);
792 },
793 }
794 }
795
796 fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) {
800 let Some(info) = self.peers.remove(&peer_id) else {
801 log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
802 return;
803 };
804 if let Some(metrics) = &self.metrics {
805 metrics.peers.dec();
806 }
807 self.num_connected.fetch_sub(1, Ordering::AcqRel);
808
809 if self.important_peers.contains(&peer_id) {
810 log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
811 } else {
812 log::debug!(target: LOG_TARGET, "{peer_id} disconnected");
813 }
814
815 if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id)
816 && info.inbound
817 && info.info.roles.is_full()
818 {
819 match self.num_in_peers.checked_sub(1) {
820 Some(value) => {
821 self.num_in_peers = value;
822 },
823 None => {
824 log::error!(
825 target: LOG_TARGET,
826 "trying to disconnect an inbound node which is not counted as inbound"
827 );
828 debug_assert!(false);
829 },
830 }
831 }
832
833 self.strategy.remove_peer(&peer_id);
834 self.pending_responses.remove_all(&peer_id);
835 self.event_streams
836 .retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
837 }
838
839 fn validate_handshake(
841 &mut self,
842 peer_id: &PeerId,
843 handshake: Vec<u8>,
844 ) -> Result<BlockAnnouncesHandshake<B>, bool> {
845 log::trace!(target: LOG_TARGET, "Validate handshake for {peer_id}");
846
847 let handshake = <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &handshake[..])
848 .map_err(|error| {
849 log::debug!(target: LOG_TARGET, "Failed to decode handshake for {peer_id}: {error:?}");
850 false
851 })?;
852
853 if handshake.genesis_hash != self.genesis_hash {
854 if self.important_peers.contains(&peer_id) {
855 log::error!(
856 target: LOG_TARGET,
857 "Reserved peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
858 self.genesis_hash,
859 handshake.genesis_hash,
860 );
861 } else if self.boot_node_ids.contains(&peer_id) {
862 log::error!(
863 target: LOG_TARGET,
864 "Bootnode with peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
865 self.genesis_hash,
866 handshake.genesis_hash,
867 );
868 } else {
869 log::debug!(
870 target: LOG_TARGET,
871 "Peer is on different chain (our genesis: {} theirs: {})",
872 self.genesis_hash,
873 handshake.genesis_hash
874 );
875 }
876
877 return Err(true);
878 }
879
880 Ok(handshake)
881 }
882
883 fn validate_connection(
897 &mut self,
898 peer_id: &PeerId,
899 handshake: Vec<u8>,
900 direction: Direction,
901 ) -> Result<BlockAnnouncesHandshake<B>, bool> {
902 log::trace!(target: LOG_TARGET, "New peer {peer_id} {handshake:?}");
903
904 let handshake = self.validate_handshake(peer_id, handshake)?;
905
906 if self.peers.contains_key(&peer_id) {
907 log::error!(
908 target: LOG_TARGET,
909 "Called `validate_connection()` with already connected peer {peer_id}",
910 );
911 debug_assert!(false);
912 return Err(false);
913 }
914
915 let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&peer_id);
916 let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };
917
918 if handshake.roles.is_full()
919 && self.strategy.num_peers()
920 >= self.default_peers_set_num_full
921 + self.default_peers_set_no_slot_connected_peers.len()
922 + this_peer_reserved_slot
923 {
924 log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {peer_id}");
925 return Err(false);
926 }
927
928 if !no_slot_peer
930 && handshake.roles.is_full()
931 && direction.is_inbound()
932 && self.num_in_peers == self.max_in_peers
933 {
934 log::debug!(target: LOG_TARGET, "All inbound slots have been consumed, rejecting {peer_id}");
935 return Err(false);
936 }
937
938 if handshake.roles.is_light()
943 && (self.peers.len() - self.strategy.num_peers()) >= self.default_peers_set_num_light
944 {
945 log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
946 return Err(false);
947 }
948
949 Ok(handshake)
950 }
951
952 fn on_sync_peer_connected(
958 &mut self,
959 peer_id: PeerId,
960 status: &BlockAnnouncesHandshake<B>,
961 direction: Direction,
962 ) -> Result<(), ()> {
963 log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}");
964
965 let peer = Peer {
966 info: ExtendedPeerInfo {
967 roles: status.roles,
968 best_hash: status.best_hash,
969 best_number: status.best_number,
970 },
971 known_blocks: LruHashSet::new(
972 NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
973 ),
974 inbound: direction.is_inbound(),
975 };
976
977 if status.roles.is_full() {
979 self.strategy.add_peer(peer_id, peer.info.best_hash, peer.info.best_number);
980 }
981
982 log::debug!(target: LOG_TARGET, "Connected {peer_id}");
983
984 if self.peers.insert(peer_id, peer).is_none() {
985 if let Some(metrics) = &self.metrics {
986 metrics.peers.inc();
987 }
988 self.num_connected.fetch_add(1, Ordering::AcqRel);
989 }
990 self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());
991
992 if self.default_peers_set_no_slot_peers.contains(&peer_id) {
993 self.default_peers_set_no_slot_connected_peers.insert(peer_id);
994 } else if direction.is_inbound() && status.roles.is_full() {
995 self.num_in_peers += 1;
996 }
997
998 self.event_streams
999 .retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok());
1000
1001 Ok(())
1002 }
1003
1004 fn process_response_event(&mut self, response_event: ResponseEvent) {
1005 let ResponseEvent { peer_id, key, response: response_result } = response_event;
1006
1007 match response_result {
1008 Ok(Ok((response, protocol_name))) => {
1009 self.strategy.on_generic_response(&peer_id, key, protocol_name, response);
1010 },
1011 Ok(Err(e)) => {
1012 debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}.");
1013
1014 match e {
1015 RequestFailure::Network(OutboundFailure::Timeout) => {
1016 self.network_service.report_peer(peer_id, rep::TIMEOUT);
1017 self.network_service
1018 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1019 },
1020 RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1021 self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL);
1022 self.network_service
1023 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1024 },
1025 RequestFailure::Network(OutboundFailure::DialFailure) => {
1026 self.network_service
1027 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1028 },
1029 RequestFailure::Refused => {
1030 self.network_service.report_peer(peer_id, rep::REFUSED);
1031 self.network_service
1032 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1033 },
1034 RequestFailure::Network(OutboundFailure::ConnectionClosed)
1035 | RequestFailure::NotConnected => {
1036 self.network_service
1037 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1038 },
1039 RequestFailure::UnknownProtocol => {
1040 debug_assert!(false, "Block request protocol should always be known.");
1041 },
1042 RequestFailure::Obsolete => {
1043 debug_assert!(
1044 false,
1045 "Can not receive `RequestFailure::Obsolete` after dropping the \
1046 response receiver.",
1047 );
1048 },
1049 RequestFailure::Network(OutboundFailure::Io(_)) => {
1050 self.network_service.report_peer(peer_id, rep::IO);
1051 self.network_service
1052 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1053 },
1054 }
1055 },
1056 Err(oneshot::Canceled) => {
1057 trace!(
1058 target: LOG_TARGET,
1059 "Request to peer {peer_id:?} failed due to oneshot being canceled.",
1060 );
1061 self.network_service
1062 .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1063 },
1064 }
1065 }
1066
1067 fn num_active_peers(&self) -> usize {
1069 self.pending_responses.len()
1070 }
1071
1072 fn get_block_announce_proto_config<N: NetworkBackend<B, <B as BlockT>::Hash>>(
1074 protocol_id: ProtocolId,
1075 fork_id: Option<&str>,
1076 roles: Roles,
1077 best_number: NumberFor<B>,
1078 best_hash: B::Hash,
1079 genesis_hash: B::Hash,
1080 set_config: &SetConfig,
1081 metrics: NotificationMetrics,
1082 peer_store_handle: Arc<dyn PeerStoreProvider>,
1083 ) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
1084 let block_announces_protocol = {
1085 let genesis_hash = genesis_hash.as_ref();
1086 if let Some(fork_id) = fork_id {
1087 format!(
1088 "/{}/{}/block-announces/1",
1089 array_bytes::bytes2hex("", genesis_hash),
1090 fork_id
1091 )
1092 } else {
1093 format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash))
1094 }
1095 };
1096
1097 N::notification_config(
1098 block_announces_protocol.into(),
1099 iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(),
1100 MAX_BLOCK_ANNOUNCE_SIZE,
1101 Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
1102 roles,
1103 best_number,
1104 best_hash,
1105 genesis_hash,
1106 ))),
1107 set_config.clone(),
1108 metrics,
1109 peer_store_handle,
1110 )
1111 }
1112
1113 fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
1115 if let Some(metrics) = &self.metrics {
1116 metrics.import_queue_blocks_submitted.inc();
1117 }
1118
1119 self.import_queue.import_blocks(origin, blocks);
1120 }
1121
1122 fn import_justifications(
1124 &mut self,
1125 peer_id: PeerId,
1126 hash: B::Hash,
1127 number: NumberFor<B>,
1128 justifications: Justifications,
1129 ) {
1130 if let Some(metrics) = &self.metrics {
1131 metrics.import_queue_justifications_submitted.inc();
1132 }
1133
1134 self.import_queue
1135 .import_justifications(peer_id.into(), hash, number, justifications);
1136 }
1137}