sc_network/
discovery.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Discovery mechanisms of Substrate.
20//!
21//! The `DiscoveryBehaviour` struct implements the `NetworkBehaviour` trait of libp2p and is
22//! responsible for discovering other nodes that are part of the network.
23//!
24//! Substrate uses the following mechanisms in order to discover nodes that are part of the network:
25//!
26//! - Bootstrap nodes. These are hard-coded node identities and addresses passed in the constructor
27//! of the `DiscoveryBehaviour`. You can also call `add_known_address` later to add an entry.
28//!
29//! - mDNS. Discovers nodes on the local network by broadcasting UDP packets.
30//!
31//! - Kademlia random walk. Once connected, we perform random Kademlia `FIND_NODE` requests on the
32//! configured Kademlia DHTs in order for nodes to propagate to us their view of the network. This
33//! is performed automatically by the `DiscoveryBehaviour`.
34//!
35//! Additionally, the `DiscoveryBehaviour` is also capable of storing and loading value in the
36//! configured DHTs.
37//!
38//! ## Usage
39//!
40//! The `DiscoveryBehaviour` generates events of type `DiscoveryOut`, most notably
41//! `DiscoveryOut::Discovered` that is generated whenever we discover a node.
42//! Only the identity of the node is returned. The node's addresses are stored within the
43//! `DiscoveryBehaviour` and can be queried through the `NetworkBehaviour` trait.
44//!
45//! **Important**: In order for the discovery mechanism to work properly, there needs to be an
46//! active mechanism that asks nodes for the addresses they are listening on. Whenever we learn
47//! of a node's address, you must call `add_self_reported_address`.
48
49use crate::{config::ProtocolId, utils::LruHashSet};
50
51use array_bytes::bytes2hex;
52use futures::prelude::*;
53use futures_timer::Delay;
54use ip_network::IpNetwork;
55use libp2p::{
56	core::{transport::PortUse, Endpoint, Multiaddr},
57	kad::{
58		self,
59		store::{MemoryStore, RecordStore},
60		Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
61		Event, GetClosestPeersError, GetProvidersError, GetProvidersOk, GetRecordOk, PeerRecord,
62		QueryId, QueryResult, Quorum, Record, RecordKey,
63	},
64	mdns::{self, tokio::Behaviour as TokioMdns},
65	multiaddr::Protocol,
66	swarm::{
67		behaviour::{
68			toggle::{Toggle, ToggleConnectionHandler},
69			DialFailure, ExternalAddrConfirmed, FromSwarm,
70		},
71		ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, THandler,
72		THandlerInEvent, THandlerOutEvent, ToSwarm,
73	},
74	PeerId,
75};
76use linked_hash_set::LinkedHashSet;
77use log::{debug, info, trace, warn};
78use sp_core::hexdisplay::HexDisplay;
79use std::{
80	cmp,
81	collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
82	num::NonZeroUsize,
83	task::{Context, Poll},
84	time::{Duration, Instant},
85};
86
87/// Logging target for the file.
88const LOG_TARGET: &str = "sub-libp2p::discovery";
89
90/// Maximum number of known external addresses that we will cache.
91/// This only affects whether we will log whenever we (re-)discover
92/// a given address.
93const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
94
95/// Default value for Kademlia replication factor which  determines to how many closest peers a
96/// record is replicated to.
97pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
98
99// The minimum number of peers we expect an answer before we terminate the request.
100const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
101
102/// `DiscoveryBehaviour` configuration.
103///
104///
105/// Note: In order to discover nodes or load and store values via Kademlia one has to add
106///       Kademlia protocol via [`DiscoveryConfig::with_kademlia`].
107pub struct DiscoveryConfig {
108	local_peer_id: PeerId,
109	permanent_addresses: Vec<(PeerId, Multiaddr)>,
110	dht_random_walk: bool,
111	allow_private_ip: bool,
112	allow_non_globals_in_dht: bool,
113	discovery_only_if_under_num: u64,
114	enable_mdns: bool,
115	kademlia_disjoint_query_paths: bool,
116	kademlia_protocol: Option<StreamProtocol>,
117	kademlia_legacy_protocol: Option<StreamProtocol>,
118	kademlia_replication_factor: NonZeroUsize,
119}
120
121impl DiscoveryConfig {
122	/// Create a default configuration with the given public key.
123	pub fn new(local_peer_id: PeerId) -> Self {
124		Self {
125			local_peer_id,
126			permanent_addresses: Vec::new(),
127			dht_random_walk: true,
128			allow_private_ip: true,
129			allow_non_globals_in_dht: false,
130			discovery_only_if_under_num: std::u64::MAX,
131			enable_mdns: false,
132			kademlia_disjoint_query_paths: false,
133			kademlia_protocol: None,
134			kademlia_legacy_protocol: None,
135			kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
136				.expect("value is a constant; constant is non-zero; qed."),
137		}
138	}
139
140	/// Set the number of active connections at which we pause discovery.
141	pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
142		self.discovery_only_if_under_num = limit;
143		self
144	}
145
146	/// Set custom nodes which never expire, e.g. bootstrap or reserved nodes.
147	pub fn with_permanent_addresses<I>(&mut self, permanent_addresses: I) -> &mut Self
148	where
149		I: IntoIterator<Item = (PeerId, Multiaddr)>,
150	{
151		self.permanent_addresses.extend(permanent_addresses);
152		self
153	}
154
155	/// Whether the discovery behaviour should periodically perform a random
156	/// walk on the DHT to discover peers.
157	pub fn with_dht_random_walk(&mut self, value: bool) -> &mut Self {
158		self.dht_random_walk = value;
159		self
160	}
161
162	/// Should private IPv4/IPv6 addresses be reported?
163	pub fn allow_private_ip(&mut self, value: bool) -> &mut Self {
164		self.allow_private_ip = value;
165		self
166	}
167
168	/// Should non-global addresses be inserted to the DHT?
169	pub fn allow_non_globals_in_dht(&mut self, value: bool) -> &mut Self {
170		self.allow_non_globals_in_dht = value;
171		self
172	}
173
174	/// Should MDNS discovery be supported?
175	pub fn with_mdns(&mut self, value: bool) -> &mut Self {
176		self.enable_mdns = value;
177		self
178	}
179
180	/// Add discovery via Kademlia for the given protocol.
181	///
182	/// Currently accepts `protocol_id`. This should be removed once all the nodes
183	/// are upgraded to genesis hash- and fork ID-based Kademlia protocol name.
184	pub fn with_kademlia<Hash: AsRef<[u8]>>(
185		&mut self,
186		genesis_hash: Hash,
187		fork_id: Option<&str>,
188		protocol_id: &ProtocolId,
189	) -> &mut Self {
190		self.kademlia_protocol = Some(kademlia_protocol_name(genesis_hash, fork_id));
191		self.kademlia_legacy_protocol = Some(legacy_kademlia_protocol_name(protocol_id));
192		self
193	}
194
195	/// Require iterative Kademlia DHT queries to use disjoint paths for increased resiliency in the
196	/// presence of potentially adversarial nodes.
197	pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
198		self.kademlia_disjoint_query_paths = value;
199		self
200	}
201
202	/// Sets Kademlia replication factor.
203	pub fn with_kademlia_replication_factor(&mut self, value: NonZeroUsize) -> &mut Self {
204		self.kademlia_replication_factor = value;
205		self
206	}
207
208	/// Create a `DiscoveryBehaviour` from this config.
209	pub fn finish(self) -> DiscoveryBehaviour {
210		let Self {
211			local_peer_id,
212			permanent_addresses,
213			dht_random_walk,
214			allow_private_ip,
215			allow_non_globals_in_dht,
216			discovery_only_if_under_num,
217			enable_mdns,
218			kademlia_disjoint_query_paths,
219			kademlia_protocol,
220			kademlia_legacy_protocol: _,
221			kademlia_replication_factor,
222		} = self;
223
224		let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
225			let mut config = KademliaConfig::new(kademlia_protocol.clone());
226
227			config.set_replication_factor(kademlia_replication_factor);
228
229			config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
230
231			// By default Kademlia attempts to insert all peers into its routing table once a
232			// dialing attempt succeeds. In order to control which peer is added, disable the
233			// auto-insertion and instead add peers manually.
234			config.set_kbucket_inserts(BucketInserts::Manual);
235			config.disjoint_query_paths(kademlia_disjoint_query_paths);
236			let store = MemoryStore::new(local_peer_id);
237			let mut kad = Kademlia::with_config(local_peer_id, store, config);
238			kad.set_mode(Some(kad::Mode::Server));
239
240			for (peer_id, addr) in &permanent_addresses {
241				kad.add_address(peer_id, addr.clone());
242			}
243
244			Some(kad)
245		} else {
246			None
247		};
248
249		DiscoveryBehaviour {
250			permanent_addresses,
251			ephemeral_addresses: HashMap::new(),
252			kademlia: Toggle::from(kademlia),
253			next_kad_random_query: if dht_random_walk {
254				Some(Delay::new(Duration::new(0, 0)))
255			} else {
256				None
257			},
258			duration_to_next_kad: Duration::from_secs(1),
259			pending_events: VecDeque::new(),
260			local_peer_id,
261			num_connections: 0,
262			allow_private_ip,
263			discovery_only_if_under_num,
264			mdns: if enable_mdns {
265				match TokioMdns::new(mdns::Config::default(), local_peer_id) {
266					Ok(mdns) => Toggle::from(Some(mdns)),
267					Err(err) => {
268						warn!(target: LOG_TARGET, "Failed to initialize mDNS: {:?}", err);
269						Toggle::from(None)
270					},
271				}
272			} else {
273				Toggle::from(None)
274			},
275			allow_non_globals_in_dht,
276			known_external_addresses: LruHashSet::new(
277				NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
278					.expect("value is a constant; constant is non-zero; qed."),
279			),
280			records_to_publish: Default::default(),
281			kademlia_protocol,
282		}
283	}
284}
285
286/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
287pub struct DiscoveryBehaviour {
288	/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
289	/// reserved nodes.
290	permanent_addresses: Vec<(PeerId, Multiaddr)>,
291	/// Same as `permanent_addresses`, except that addresses that fail to reach a peer are
292	/// removed.
293	ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
294	/// Kademlia requests and answers. Even though it's wrapped in `Toggle`, currently
295	/// it's always enabled in `NetworkWorker::new()`.
296	kademlia: Toggle<Kademlia<MemoryStore>>,
297	/// Discovers nodes on the local network.
298	mdns: Toggle<TokioMdns>,
299	/// Stream that fires when we need to perform the next random Kademlia query. `None` if
300	/// random walking is disabled.
301	next_kad_random_query: Option<Delay>,
302	/// After `next_kad_random_query` triggers, the next one triggers after this duration.
303	duration_to_next_kad: Duration,
304	/// Events to return in priority when polled.
305	pending_events: VecDeque<DiscoveryOut>,
306	/// Identity of our local node.
307	local_peer_id: PeerId,
308	/// Number of nodes we're currently connected to.
309	num_connections: u64,
310	/// If false, `addresses_of_peer` won't return any private IPv4/IPv6 address, except for the
311	/// ones stored in `permanent_addresses` or `ephemeral_addresses`.
312	allow_private_ip: bool,
313	/// Number of active connections over which we interrupt the discovery process.
314	discovery_only_if_under_num: u64,
315	/// Should non-global addresses be added to the DHT?
316	allow_non_globals_in_dht: bool,
317	/// A cache of discovered external addresses. Only used for logging purposes.
318	known_external_addresses: LruHashSet<Multiaddr>,
319	/// Records to publish per QueryId.
320	///
321	/// After finishing a Kademlia query, libp2p will return us a list of the closest peers that
322	/// did not return the record(in `FinishedWithNoAdditionalRecord`). We will then put the record
323	/// to these peers.
324	records_to_publish: HashMap<QueryId, Record>,
325	/// The chain based kademlia protocol name (including genesis hash and fork id).
326	///
327	/// Remove when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
328	/// <https://github.com/paritytech/polkadot-sdk/issues/504>.
329	kademlia_protocol: Option<StreamProtocol>,
330}
331
332impl DiscoveryBehaviour {
333	/// Returns the list of nodes that we know exist in the network.
334	pub fn known_peers(&mut self) -> HashSet<PeerId> {
335		let mut peers = HashSet::new();
336		if let Some(k) = self.kademlia.as_mut() {
337			for b in k.kbuckets() {
338				for e in b.iter() {
339					if !peers.contains(e.node.key.preimage()) {
340						peers.insert(*e.node.key.preimage());
341					}
342				}
343			}
344		}
345		peers
346	}
347
348	/// Adds a hard-coded address for the given peer, that never expires.
349	///
350	/// This adds an entry to the parameter that was passed to `new`.
351	///
352	/// If we didn't know this address before, also generates a `Discovered` event.
353	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
354		let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
355		if addrs_list.contains(&addr) {
356			return
357		}
358
359		if let Some(k) = self.kademlia.as_mut() {
360			k.add_address(&peer_id, addr.clone());
361		}
362
363		self.pending_events.push_back(DiscoveryOut::Discovered(peer_id));
364		addrs_list.push(addr);
365	}
366
367	/// Add a self-reported address of a remote peer to the k-buckets of the DHT
368	/// if it has compatible `supported_protocols`.
369	///
370	/// **Note**: It is important that you call this method. The discovery mechanism will not
371	/// automatically add connecting peers to the Kademlia k-buckets.
372	pub fn add_self_reported_address(
373		&mut self,
374		peer_id: &PeerId,
375		supported_protocols: &[StreamProtocol],
376		addr: Multiaddr,
377	) {
378		if let Some(kademlia) = self.kademlia.as_mut() {
379			if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
380				trace!(
381					target: LOG_TARGET,
382					"Ignoring self-reported non-global address {} from {}.", addr, peer_id
383				);
384				return
385			}
386
387			// The supported protocols must include the chain-based Kademlia protocol.
388			//
389			// Extract the chain-based Kademlia protocol from `kademlia.protocol_name()`
390			// when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
391			// https://github.com/paritytech/polkadot-sdk/issues/504.
392			if !supported_protocols.iter().any(|p| {
393				p == self
394					.kademlia_protocol
395					.as_ref()
396					.expect("kademlia protocol was checked above to be enabled; qed")
397			}) {
398				trace!(
399					target: LOG_TARGET,
400					"Ignoring self-reported address {} from {} as remote node is not part of the \
401					 Kademlia DHT supported by the local node.", addr, peer_id,
402				);
403				return
404			}
405
406			trace!(
407				target: LOG_TARGET,
408				"Adding self-reported address {} from {} to Kademlia DHT.",
409				addr, peer_id
410			);
411			kademlia.add_address(peer_id, addr.clone());
412		}
413	}
414
415	/// Start fetching a record from the DHT.
416	///
417	/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
418	pub fn get_value(&mut self, key: RecordKey) {
419		if let Some(k) = self.kademlia.as_mut() {
420			k.get_record(key.clone());
421		}
422	}
423
424	/// Start putting a record into the DHT. Other nodes can later fetch that value with
425	/// `get_value`.
426	///
427	/// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated.
428	pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
429		if let Some(k) = self.kademlia.as_mut() {
430			if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
431				warn!(target: LOG_TARGET, "Libp2p => Failed to put record: {:?}", e);
432				self.pending_events
433					.push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
434			}
435		}
436	}
437
438	/// Puts a record into the DHT on the provided `peers`
439	///
440	/// If `update_local_storage` is true, the local storage is update as well.
441	pub fn put_record_to(
442		&mut self,
443		record: Record,
444		peers: HashSet<sc_network_types::PeerId>,
445		update_local_storage: bool,
446	) {
447		if let Some(kad) = self.kademlia.as_mut() {
448			if update_local_storage {
449				if let Err(_e) = kad.store_mut().put(record.clone()) {
450					warn!(target: LOG_TARGET, "Failed to update local starage");
451				}
452			}
453
454			if !peers.is_empty() {
455				kad.put_record_to(
456					record,
457					peers.into_iter().map(|peer_id| peer_id.into()),
458					Quorum::All,
459				);
460			}
461		}
462	}
463
464	/// Register as a content provider on the DHT for `key`.
465	pub fn start_providing(&mut self, key: RecordKey) {
466		if let Some(kad) = self.kademlia.as_mut() {
467			if let Err(e) = kad.start_providing(key.clone()) {
468				warn!(target: LOG_TARGET, "Libp2p => Failed to start providing {key:?}: {e}.");
469				self.pending_events.push_back(DiscoveryOut::StartProvidingFailed(key));
470			}
471		}
472	}
473
474	/// Deregister as a content provider on the DHT for `key`.
475	pub fn stop_providing(&mut self, key: &RecordKey) {
476		if let Some(kad) = self.kademlia.as_mut() {
477			kad.stop_providing(key);
478		}
479	}
480
481	/// Get content providers for `key` from the DHT.
482	pub fn get_providers(&mut self, key: RecordKey) {
483		if let Some(kad) = self.kademlia.as_mut() {
484			kad.get_providers(key);
485		}
486	}
487
488	/// Store a record in the Kademlia record store.
489	pub fn store_record(
490		&mut self,
491		record_key: RecordKey,
492		record_value: Vec<u8>,
493		publisher: Option<PeerId>,
494		expires: Option<Instant>,
495	) {
496		if let Some(k) = self.kademlia.as_mut() {
497			if let Err(err) = k.store_mut().put(Record {
498				key: record_key,
499				value: record_value,
500				publisher: publisher.map(|publisher| publisher.into()),
501				expires,
502			}) {
503				debug!(
504					target: LOG_TARGET,
505					"Failed to store record with key: {:?}",
506					err
507				);
508			}
509		}
510	}
511
512	/// Returns the number of nodes in each Kademlia kbucket for each Kademlia instance.
513	///
514	/// Identifies Kademlia instances by their [`ProtocolId`] and kbuckets by the base 2 logarithm
515	/// of their lower bound.
516	pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
517		self.kademlia.as_mut().map(|kad| {
518			kad.kbuckets()
519				.map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
520				.collect()
521		})
522	}
523
524	/// Returns the number of records in the Kademlia record stores.
525	pub fn num_kademlia_records(&mut self) -> Option<usize> {
526		// Note that this code is ok only because we use a `MemoryStore`.
527		self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
528	}
529
530	/// Returns the total size in bytes of all the records in the Kademlia record stores.
531	pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
532		// Note that this code is ok only because we use a `MemoryStore`. If the records were
533		// for example stored on disk, this would load every single one of them every single time.
534		self.kademlia
535			.as_mut()
536			.map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
537	}
538
539	/// Can the given `Multiaddr` be put into the DHT?
540	///
541	/// This test is successful only for global IP addresses and DNS names.
542	// NB: Currently all DNS names are allowed and no check for TLD suffixes is done
543	// because the set of valid domains is highly dynamic and would require frequent
544	// updates, for example by utilising publicsuffix.org or IANA.
545	pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
546		let ip = match addr.iter().next() {
547			Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
548			Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
549			Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
550				return true,
551			_ => return false,
552		};
553		ip.is_global()
554	}
555}
556
557/// Event generated by the `DiscoveryBehaviour`.
558#[derive(Debug)]
559pub enum DiscoveryOut {
560	/// A connection to a peer has been established but the peer has not been
561	/// added to the routing table because [`BucketInserts::Manual`] is
562	/// configured. If the peer is to be included in the routing table, it must
563	/// be explicitly added via
564	/// [`DiscoveryBehaviour::add_self_reported_address`].
565	Discovered(PeerId),
566
567	/// A peer connected to this node for whom no listen address is known.
568	///
569	/// In order for the peer to be added to the Kademlia routing table, a known
570	/// listen address must be added via
571	/// [`DiscoveryBehaviour::add_self_reported_address`], e.g. obtained through
572	/// the `identify` protocol.
573	UnroutablePeer(PeerId),
574
575	/// The DHT yielded results for the record request.
576	///
577	/// Returning the result grouped in (key, value) pairs as well as the request duration.
578	ValueFound(PeerRecord, Duration),
579
580	/// The DHT received a put record request.
581	PutRecordRequest(
582		RecordKey,
583		Vec<u8>,
584		Option<sc_network_types::PeerId>,
585		Option<std::time::Instant>,
586	),
587
588	/// The record requested was not found in the DHT.
589	///
590	/// Returning the corresponding key as well as the request duration.
591	ValueNotFound(RecordKey, Duration),
592
593	/// The record with a given key was successfully inserted into the DHT.
594	///
595	/// Returning the corresponding key as well as the request duration.
596	ValuePut(RecordKey, Duration),
597
598	/// Inserting a value into the DHT failed.
599	///
600	/// Returning the corresponding key as well as the request duration.
601	ValuePutFailed(RecordKey, Duration),
602
603	/// Starting providing a key failed.
604	StartProvidingFailed(RecordKey),
605
606	/// The DHT yielded results for the providers request.
607	ProvidersFound(RecordKey, HashSet<PeerId>, Duration),
608
609	/// Providers for the requested key were not found in the DHT.
610	ProvidersNotFound(RecordKey, Duration),
611
612	/// Started a random Kademlia query.
613	///
614	/// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`.
615	RandomKademliaStarted,
616}
617
618impl NetworkBehaviour for DiscoveryBehaviour {
619	type ConnectionHandler =
620		ToggleConnectionHandler<<Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler>;
621	type ToSwarm = DiscoveryOut;
622
623	fn handle_established_inbound_connection(
624		&mut self,
625		connection_id: ConnectionId,
626		peer: PeerId,
627		local_addr: &Multiaddr,
628		remote_addr: &Multiaddr,
629	) -> Result<THandler<Self>, ConnectionDenied> {
630		self.kademlia.handle_established_inbound_connection(
631			connection_id,
632			peer,
633			local_addr,
634			remote_addr,
635		)
636	}
637
638	fn handle_established_outbound_connection(
639		&mut self,
640		connection_id: ConnectionId,
641		peer: PeerId,
642		addr: &Multiaddr,
643		role_override: Endpoint,
644		port_use: PortUse,
645	) -> Result<THandler<Self>, ConnectionDenied> {
646		self.kademlia.handle_established_outbound_connection(
647			connection_id,
648			peer,
649			addr,
650			role_override,
651			port_use,
652		)
653	}
654
655	fn handle_pending_inbound_connection(
656		&mut self,
657		connection_id: ConnectionId,
658		local_addr: &Multiaddr,
659		remote_addr: &Multiaddr,
660	) -> Result<(), ConnectionDenied> {
661		self.kademlia
662			.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
663	}
664
665	fn handle_pending_outbound_connection(
666		&mut self,
667		connection_id: ConnectionId,
668		maybe_peer: Option<PeerId>,
669		addresses: &[Multiaddr],
670		effective_role: Endpoint,
671	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
672		let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
673
674		// Collect addresses into [`LinkedHashSet`] to eliminate duplicate entries preserving the
675		// order of addresses. Give priority to `permanent_addresses` (used with reserved nodes) and
676		// `ephemeral_addresses` (used for addresses discovered from other sources, like authority
677		// discovery DHT records).
678		let mut list: LinkedHashSet<_> = self
679			.permanent_addresses
680			.iter()
681			.filter_map(|(p, a)| (*p == peer_id).then(|| a.clone()))
682			.collect();
683
684		if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
685			ephemeral_addresses.iter().for_each(|address| {
686				list.insert_if_absent(address.clone());
687			});
688		}
689
690		{
691			let mut list_to_filter = self.kademlia.handle_pending_outbound_connection(
692				connection_id,
693				maybe_peer,
694				addresses,
695				effective_role,
696			)?;
697
698			list_to_filter.extend(self.mdns.handle_pending_outbound_connection(
699				connection_id,
700				maybe_peer,
701				addresses,
702				effective_role,
703			)?);
704
705			if !self.allow_private_ip {
706				list_to_filter.retain(|addr| match addr.iter().next() {
707					Some(Protocol::Ip4(addr)) if !IpNetwork::from(addr).is_global() => false,
708					Some(Protocol::Ip6(addr)) if !IpNetwork::from(addr).is_global() => false,
709					_ => true,
710				});
711			}
712
713			list_to_filter.into_iter().for_each(|address| {
714				list.insert_if_absent(address);
715			});
716		}
717
718		trace!(target: LOG_TARGET, "Addresses of {:?}: {:?}", peer_id, list);
719
720		Ok(list.into_iter().collect())
721	}
722
723	fn on_swarm_event(&mut self, event: FromSwarm) {
724		match event {
725			FromSwarm::ConnectionEstablished(e) => {
726				self.num_connections += 1;
727				self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
728			},
729			FromSwarm::ConnectionClosed(e) => {
730				self.num_connections -= 1;
731				self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
732			},
733			FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
734				if let Some(peer_id) = peer_id {
735					if let DialError::Transport(errors) = error {
736						if let Entry::Occupied(mut entry) = self.ephemeral_addresses.entry(peer_id)
737						{
738							for (addr, _error) in errors {
739								entry.get_mut().retain(|a| a != addr);
740							}
741							if entry.get().is_empty() {
742								entry.remove();
743							}
744						}
745					}
746				}
747
748				self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
749			},
750			FromSwarm::ListenerClosed(e) => {
751				self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
752			},
753			FromSwarm::ListenFailure(e) => {
754				self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
755			},
756			FromSwarm::ListenerError(e) => {
757				self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
758			},
759			FromSwarm::ExternalAddrExpired(e) => {
760				// We intentionally don't remove the element from `known_external_addresses` in
761				// order to not print the log line again.
762
763				self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
764			},
765			FromSwarm::NewListener(e) => {
766				self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
767			},
768			FromSwarm::ExpiredListenAddr(e) => {
769				self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
770			},
771			FromSwarm::NewExternalAddrCandidate(e) => {
772				self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
773			},
774			FromSwarm::AddressChange(e) => {
775				self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
776			},
777			FromSwarm::NewListenAddr(e) => {
778				self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
779				self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
780			},
781			FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
782				let mut address = addr.clone();
783
784				if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
785					if peer_id != self.local_peer_id {
786						warn!(
787							target: LOG_TARGET,
788							"🔍 Discovered external address for a peer that is not us: {addr}",
789						);
790						// Ensure this address is not propagated to kademlia.
791						return
792					}
793				} else {
794					address.push(Protocol::P2p(self.local_peer_id));
795				}
796
797				if Self::can_add_to_dht(&address) {
798					// NOTE: we might re-discover the same address multiple times
799					// in which case we just want to refrain from logging.
800					if self.known_external_addresses.insert(address.clone()) {
801						info!(
802						  target: LOG_TARGET,
803						  "🔍 Discovered new external address for our node: {address}",
804						);
805					}
806				}
807
808				self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
809			},
810			event => {
811				debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
812				self.kademlia.on_swarm_event(event);
813			},
814		}
815	}
816
817	fn on_connection_handler_event(
818		&mut self,
819		peer_id: PeerId,
820		connection_id: ConnectionId,
821		event: THandlerOutEvent<Self>,
822	) {
823		self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
824	}
825
826	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
827		// Immediately process the content of `discovered`.
828		if let Some(ev) = self.pending_events.pop_front() {
829			return Poll::Ready(ToSwarm::GenerateEvent(ev))
830		}
831
832		// Poll the stream that fires when we need to start a random Kademlia query.
833		if let Some(kademlia) = self.kademlia.as_mut() {
834			if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
835				while next_kad_random_query.poll_unpin(cx).is_ready() {
836					let actually_started =
837						if self.num_connections < self.discovery_only_if_under_num {
838							let random_peer_id = PeerId::random();
839							debug!(
840								target: LOG_TARGET,
841								"Libp2p <= Starting random Kademlia request for {:?}",
842								random_peer_id,
843							);
844							kademlia.get_closest_peers(random_peer_id);
845							true
846						} else {
847							debug!(
848								target: LOG_TARGET,
849								"Kademlia paused due to high number of connections ({})",
850								self.num_connections
851							);
852							false
853						};
854
855					// Schedule the next random query with exponentially increasing delay,
856					// capped at 60 seconds.
857					*next_kad_random_query = Delay::new(self.duration_to_next_kad);
858					self.duration_to_next_kad =
859						cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
860
861					if actually_started {
862						let ev = DiscoveryOut::RandomKademliaStarted;
863						return Poll::Ready(ToSwarm::GenerateEvent(ev))
864					}
865				}
866			}
867		}
868
869		while let Poll::Ready(ev) = self.kademlia.poll(cx) {
870			match ev {
871				ToSwarm::GenerateEvent(ev) => match ev {
872					KademliaEvent::RoutingUpdated { peer, .. } => {
873						let ev = DiscoveryOut::Discovered(peer);
874						return Poll::Ready(ToSwarm::GenerateEvent(ev))
875					},
876					KademliaEvent::UnroutablePeer { peer, .. } => {
877						let ev = DiscoveryOut::UnroutablePeer(peer);
878						return Poll::Ready(ToSwarm::GenerateEvent(ev))
879					},
880					KademliaEvent::RoutablePeer { peer, .. } => {
881						let ev = DiscoveryOut::Discovered(peer);
882						return Poll::Ready(ToSwarm::GenerateEvent(ev))
883					},
884					KademliaEvent::PendingRoutablePeer { .. } => {
885						// We are not interested in this event at the moment.
886					},
887					KademliaEvent::InboundRequest { request } => match request {
888						libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } =>
889							return Poll::Ready(ToSwarm::GenerateEvent(
890								DiscoveryOut::PutRecordRequest(
891									record.key,
892									record.value,
893									record.publisher.map(Into::into),
894									record.expires,
895								),
896							)),
897						_ => {},
898					},
899					KademliaEvent::OutboundQueryProgressed {
900						result: QueryResult::GetClosestPeers(res),
901						..
902					} => match res {
903						Err(GetClosestPeersError::Timeout { key, peers }) => {
904							debug!(
905								target: LOG_TARGET,
906								"Libp2p => Query for {:?} timed out with {} results",
907								HexDisplay::from(&key), peers.len(),
908							);
909						},
910						Ok(ok) => {
911							trace!(
912								target: LOG_TARGET,
913								"Libp2p => Query for {:?} yielded {:?} results",
914								HexDisplay::from(&ok.key), ok.peers.len(),
915							);
916							if ok.peers.is_empty() && self.num_connections != 0 {
917								debug!(
918									target: LOG_TARGET,
919									"Libp2p => Random Kademlia query has yielded empty results",
920								);
921							}
922						},
923					},
924					KademliaEvent::OutboundQueryProgressed {
925						result: QueryResult::GetRecord(res),
926						stats,
927						id,
928						..
929					} => {
930						let ev = match res {
931							Ok(GetRecordOk::FoundRecord(r)) => {
932								debug!(
933									target: LOG_TARGET,
934									"Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
935									r.record.key,
936									r.record.value,
937									id,
938									stats,
939								);
940
941								// Let's directly finish the query if we are above 4.
942								// This number is small enough to make sure we don't
943								// unnecessarily flood the network with queries, but high
944								// enough to make sure we also touch peers which might have
945								// old record, so that we can update them once we notice
946								// they have old records.
947								if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
948									if let Some(kad) = self.kademlia.as_mut() {
949										if let Some(mut query) = kad.query_mut(&id) {
950											query.finish();
951										}
952									}
953								}
954
955								// Will be removed below when we receive
956								// `FinishedWithNoAdditionalRecord`.
957								self.records_to_publish.insert(id, r.record.clone());
958
959								DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
960							},
961							Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
962								cache_candidates,
963							}) => {
964								debug!(
965									target: LOG_TARGET,
966									"Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
967									id,
968									stats,
969									stats.duration().map(|val| val.as_millis())
970								);
971								// We always need to remove the record to not leak any data!
972								if let Some(record) = self.records_to_publish.remove(&id) {
973									if cache_candidates.is_empty() {
974										continue
975									}
976
977									// Put the record to the `cache_candidates` that are nearest to
978									// the record key from our point of view of the network.
979									if let Some(kad) = self.kademlia.as_mut() {
980										kad.put_record_to(
981											record,
982											cache_candidates.into_iter().map(|v| v.1),
983											Quorum::One,
984										);
985									}
986								}
987
988								continue
989							},
990							Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
991								trace!(
992									target: LOG_TARGET,
993									"Libp2p => Failed to get record: {:?}",
994									e,
995								);
996								DiscoveryOut::ValueNotFound(
997									e.into_key(),
998									stats.duration().unwrap_or_default(),
999								)
1000							},
1001							Err(e) => {
1002								debug!(
1003									target: LOG_TARGET,
1004									"Libp2p => Failed to get record: {:?}",
1005									e,
1006								);
1007								DiscoveryOut::ValueNotFound(
1008									e.into_key(),
1009									stats.duration().unwrap_or_default(),
1010								)
1011							},
1012						};
1013						return Poll::Ready(ToSwarm::GenerateEvent(ev))
1014					},
1015					KademliaEvent::OutboundQueryProgressed {
1016						result: QueryResult::GetProviders(res),
1017						stats,
1018						id,
1019						..
1020					} => {
1021						let ev = match res {
1022							Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1023								debug!(
1024									target: LOG_TARGET,
1025									"Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}",
1026									providers,
1027									key,
1028									id,
1029									stats,
1030								);
1031
1032								DiscoveryOut::ProvidersFound(
1033									key,
1034									providers,
1035									stats.duration().unwrap_or_default(),
1036								)
1037							},
1038							Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
1039								closest_peers: _,
1040							}) => {
1041								debug!(
1042									target: LOG_TARGET,
1043									"Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms",
1044									id,
1045									stats,
1046									stats.duration().map(|val| val.as_millis())
1047								);
1048
1049								continue
1050							},
1051							Err(GetProvidersError::Timeout { key, closest_peers: _ }) => {
1052								debug!(
1053									target: LOG_TARGET,
1054									"Libp2p => Failed to get providers for {key:?} due to timeout.",
1055								);
1056
1057								DiscoveryOut::ProvidersNotFound(
1058									key,
1059									stats.duration().unwrap_or_default(),
1060								)
1061							},
1062						};
1063						return Poll::Ready(ToSwarm::GenerateEvent(ev))
1064					},
1065					KademliaEvent::OutboundQueryProgressed {
1066						result: QueryResult::PutRecord(res),
1067						stats,
1068						..
1069					} => {
1070						let ev = match res {
1071							Ok(ok) =>
1072								DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default()),
1073							Err(e) => {
1074								debug!(
1075									target: LOG_TARGET,
1076									"Libp2p => Failed to put record: {:?}",
1077									e,
1078								);
1079								DiscoveryOut::ValuePutFailed(
1080									e.into_key(),
1081									stats.duration().unwrap_or_default(),
1082								)
1083							},
1084						};
1085						return Poll::Ready(ToSwarm::GenerateEvent(ev))
1086					},
1087					KademliaEvent::OutboundQueryProgressed {
1088						result: QueryResult::RepublishRecord(res),
1089						..
1090					} => match res {
1091						Ok(ok) => debug!(
1092							target: LOG_TARGET,
1093							"Libp2p => Record republished: {:?}",
1094							ok.key,
1095						),
1096						Err(e) => debug!(
1097							target: LOG_TARGET,
1098							"Libp2p => Republishing of record {:?} failed with: {:?}",
1099							e.key(), e,
1100						),
1101					},
1102					KademliaEvent::OutboundQueryProgressed {
1103						result: QueryResult::Bootstrap(res),
1104						..
1105					} => match res {
1106						Ok(ok) => debug!(
1107							target: LOG_TARGET,
1108							"Libp2p => DHT bootstrap progressed: {ok:?}",
1109						),
1110						Err(e) => warn!(
1111							target: LOG_TARGET,
1112							"Libp2p => DHT bootstrap error: {e:?}",
1113						),
1114					},
1115					// We never start any other type of query.
1116					KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
1117						warn!(target: LOG_TARGET, "Libp2p => Unhandled Kademlia event: {:?}", e)
1118					},
1119					Event::ModeChanged { new_mode } => {
1120						debug!(target: LOG_TARGET, "Libp2p => Kademlia mode changed: {new_mode}")
1121					},
1122				},
1123				ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
1124				event => {
1125					return Poll::Ready(event.map_out(|_| {
1126						unreachable!("`GenerateEvent` is handled in a branch above; qed")
1127					}));
1128				},
1129			}
1130		}
1131
1132		// Poll mDNS.
1133		while let Poll::Ready(ev) = self.mdns.poll(cx) {
1134			match ev {
1135				ToSwarm::GenerateEvent(event) => match event {
1136					mdns::Event::Discovered(list) => {
1137						if self.num_connections >= self.discovery_only_if_under_num {
1138							continue
1139						}
1140
1141						self.pending_events.extend(
1142							list.into_iter().map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)),
1143						);
1144						if let Some(ev) = self.pending_events.pop_front() {
1145							return Poll::Ready(ToSwarm::GenerateEvent(ev))
1146						}
1147					},
1148					mdns::Event::Expired(_) => {},
1149				},
1150				ToSwarm::Dial { .. } => {
1151					unreachable!("mDNS never dials!");
1152				},
1153				// `event` is an enum with no variant
1154				ToSwarm::NotifyHandler { event, .. } => match event {},
1155				event => {
1156					return Poll::Ready(
1157						event
1158							.map_in(|_| {
1159								unreachable!("`NotifyHandler` is handled in a branch above; qed")
1160							})
1161							.map_out(|_| {
1162								unreachable!("`GenerateEvent` is handled in a branch above; qed")
1163							}),
1164					);
1165				},
1166			}
1167		}
1168
1169		Poll::Pending
1170	}
1171}
1172
1173/// Legacy (fallback) Kademlia protocol name based on `protocol_id`.
1174fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol {
1175	let name = format!("/{}/kad", id.as_ref());
1176	StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1177}
1178
1179/// Kademlia protocol name based on `genesis_hash` and `fork_id`.
1180fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
1181	genesis_hash: Hash,
1182	fork_id: Option<&str>,
1183) -> StreamProtocol {
1184	let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
1185	let name = if let Some(fork_id) = fork_id {
1186		format!("/{genesis_hash_hex}/{fork_id}/kad")
1187	} else {
1188		format!("/{genesis_hash_hex}/kad")
1189	};
1190
1191	StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196	use super::{
1197		kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig, DiscoveryOut,
1198	};
1199	use crate::config::ProtocolId;
1200	use futures::prelude::*;
1201	use libp2p::{
1202		core::{
1203			transport::{MemoryTransport, Transport},
1204			upgrade,
1205		},
1206		identity::Keypair,
1207		noise,
1208		swarm::{Swarm, SwarmEvent},
1209		yamux, Multiaddr,
1210	};
1211	use sp_core::hash::H256;
1212	use std::{collections::HashSet, task::Poll, time::Duration};
1213
1214	#[tokio::test]
1215	async fn discovery_working() {
1216		let mut first_swarm_peer_id_and_addr = None;
1217
1218		let genesis_hash = H256::from_low_u64_be(1);
1219		let fork_id = Some("test-fork-id");
1220		let protocol_id = ProtocolId::from("dot");
1221
1222		// Build swarms whose behaviour is `DiscoveryBehaviour`, each aware of
1223		// the first swarm via `with_permanent_addresses`.
1224		let mut swarms = (0..25)
1225			.map(|i| {
1226				let mut swarm = libp2p::SwarmBuilder::with_new_identity()
1227					.with_tokio()
1228					.with_other_transport(|keypair| {
1229						MemoryTransport::new()
1230							.upgrade(upgrade::Version::V1)
1231							.authenticate(noise::Config::new(&keypair).unwrap())
1232							.multiplex(yamux::Config::default())
1233							.boxed()
1234					})
1235					.unwrap()
1236					.with_behaviour(|keypair| {
1237						let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1238						config
1239							.with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
1240							.allow_private_ip(true)
1241							.allow_non_globals_in_dht(true)
1242							.discovery_limit(50)
1243							.with_kademlia(genesis_hash, fork_id, &protocol_id);
1244
1245						config.finish()
1246					})
1247					.unwrap()
1248					.with_swarm_config(|config| {
1249						// This is taken care of by notification protocols in non-test environment
1250						config.with_idle_connection_timeout(Duration::from_secs(10))
1251					})
1252					.build();
1253
1254				let listen_addr: Multiaddr =
1255					format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1256
1257				if i == 0 {
1258					first_swarm_peer_id_and_addr =
1259						Some((*swarm.local_peer_id(), listen_addr.clone()))
1260				}
1261
1262				swarm.listen_on(listen_addr.clone()).unwrap();
1263				(swarm, listen_addr)
1264			})
1265			.collect::<Vec<_>>();
1266
1267		// Build a `Vec<HashSet<PeerId>>` with the list of nodes remaining to be discovered.
1268		let mut to_discover = (0..swarms.len())
1269			.map(|n| {
1270				(0..swarms.len())
1271					// Skip the first swarm as all other swarms already know it.
1272					.skip(1)
1273					.filter(|p| *p != n)
1274					.map(|p| *Swarm::local_peer_id(&swarms[p].0))
1275					.collect::<HashSet<_>>()
1276			})
1277			.collect::<Vec<_>>();
1278
1279		let fut = futures::future::poll_fn(move |cx| {
1280			'polling: loop {
1281				for swarm_n in 0..swarms.len() {
1282					match swarms[swarm_n].0.poll_next_unpin(cx) {
1283						Poll::Ready(Some(e)) => {
1284							match e {
1285								SwarmEvent::Behaviour(behavior) => {
1286									match behavior {
1287										DiscoveryOut::UnroutablePeer(other) |
1288										DiscoveryOut::Discovered(other) => {
1289											// Call `add_self_reported_address` to simulate identify
1290											// happening.
1291											let addr = swarms
1292												.iter()
1293												.find_map(|(s, a)| {
1294													if s.behaviour().local_peer_id == other {
1295														Some(a.clone())
1296													} else {
1297														None
1298													}
1299												})
1300												.unwrap();
1301											// Test both genesis hash-based and legacy
1302											// protocol names.
1303											let protocol_names = if swarm_n % 2 == 0 {
1304												vec![kademlia_protocol_name(genesis_hash, fork_id)]
1305											} else {
1306												vec![
1307													legacy_kademlia_protocol_name(&protocol_id),
1308													kademlia_protocol_name(genesis_hash, fork_id),
1309												]
1310											};
1311											swarms[swarm_n]
1312												.0
1313												.behaviour_mut()
1314												.add_self_reported_address(
1315													&other,
1316													protocol_names.as_slice(),
1317													addr,
1318												);
1319
1320											to_discover[swarm_n].remove(&other);
1321										},
1322										DiscoveryOut::RandomKademliaStarted => {},
1323										e => {
1324											panic!("Unexpected event: {:?}", e)
1325										},
1326									}
1327								},
1328								// ignore non Behaviour events
1329								_ => {},
1330							}
1331							continue 'polling
1332						},
1333						_ => {},
1334					}
1335				}
1336				break
1337			}
1338
1339			if to_discover.iter().all(|l| l.is_empty()) {
1340				Poll::Ready(())
1341			} else {
1342				Poll::Pending
1343			}
1344		});
1345
1346		fut.await
1347	}
1348
1349	#[test]
1350	fn discovery_ignores_peers_with_unknown_protocols() {
1351		let supported_genesis_hash = H256::from_low_u64_be(1);
1352		let unsupported_genesis_hash = H256::from_low_u64_be(2);
1353		let supported_protocol_id = ProtocolId::from("a");
1354		let unsupported_protocol_id = ProtocolId::from("b");
1355
1356		let mut discovery = {
1357			let keypair = Keypair::generate_ed25519();
1358			let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1359			config
1360				.allow_private_ip(true)
1361				.allow_non_globals_in_dht(true)
1362				.discovery_limit(50)
1363				.with_kademlia(supported_genesis_hash, None, &supported_protocol_id);
1364			config.finish()
1365		};
1366
1367		let predictable_peer_id = |bytes: &[u8; 32]| {
1368			Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id()
1369		};
1370
1371		let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001");
1372		let remote_addr: Multiaddr = "/memory/1".parse().unwrap();
1373		let another_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1374		let another_addr: Multiaddr = "/memory/2".parse().unwrap();
1375
1376		// Try adding remote peers with unsupported protocols.
1377		discovery.add_self_reported_address(
1378			&remote_peer_id,
1379			&[kademlia_protocol_name(unsupported_genesis_hash, None)],
1380			remote_addr.clone(),
1381		);
1382		discovery.add_self_reported_address(
1383			&another_peer_id,
1384			&[legacy_kademlia_protocol_name(&unsupported_protocol_id)],
1385			another_addr.clone(),
1386		);
1387
1388		{
1389			let kademlia = discovery.kademlia.as_mut().unwrap();
1390			assert!(
1391				kademlia
1392					.kbucket(remote_peer_id)
1393					.expect("Remote peer id not to be equal to local peer id.")
1394					.is_empty(),
1395				"Expect peer with unsupported protocol not to be added."
1396			);
1397			assert!(
1398				kademlia
1399					.kbucket(another_peer_id)
1400					.expect("Remote peer id not to be equal to local peer id.")
1401					.is_empty(),
1402				"Expect peer with unsupported protocol not to be added."
1403			);
1404		}
1405
1406		// Add remote peers with supported protocols.
1407		discovery.add_self_reported_address(
1408			&remote_peer_id,
1409			&[kademlia_protocol_name(supported_genesis_hash, None)],
1410			remote_addr.clone(),
1411		);
1412		{
1413			let kademlia = discovery.kademlia.as_mut().unwrap();
1414			assert!(
1415				!kademlia
1416					.kbucket(remote_peer_id)
1417					.expect("Remote peer id not to be equal to local peer id.")
1418					.is_empty(),
1419				"Expect peer with supported protocol to be added."
1420			);
1421		}
1422
1423		let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1424		let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();
1425
1426		// Check the unsupported peer is not present before and after the call.
1427		{
1428			let kademlia = discovery.kademlia.as_mut().unwrap();
1429			assert!(
1430				kademlia
1431					.kbucket(unsupported_peer_id)
1432					.expect("Remote peer id not to be equal to local peer id.")
1433					.is_empty(),
1434				"Expect unsupported peer not to be added."
1435			);
1436		}
1437		// Note: legacy protocol is not supported without genesis hash and fork ID,
1438		// if the legacy is the only protocol supported, then the peer will not be added.
1439		discovery.add_self_reported_address(
1440			&unsupported_peer_id,
1441			&[legacy_kademlia_protocol_name(&supported_protocol_id)],
1442			unsupported_peer_addr.clone(),
1443		);
1444		{
1445			let kademlia = discovery.kademlia.as_mut().unwrap();
1446			assert!(
1447				kademlia
1448					.kbucket(unsupported_peer_id)
1449					.expect("Remote peer id not to be equal to local peer id.")
1450					.is_empty(),
1451				"Expect unsupported peer not to be added."
1452			);
1453		}
1454
1455		// Supported legacy and genesis based protocols are allowed to be added.
1456		discovery.add_self_reported_address(
1457			&another_peer_id,
1458			&[
1459				legacy_kademlia_protocol_name(&supported_protocol_id),
1460				kademlia_protocol_name(supported_genesis_hash, None),
1461			],
1462			another_addr.clone(),
1463		);
1464
1465		{
1466			let kademlia = discovery.kademlia.as_mut().unwrap();
1467			assert_eq!(
1468				2,
1469				kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
1470				"Expect peers with supported protocol to be added."
1471			);
1472			assert!(
1473				!kademlia
1474					.kbucket(another_peer_id)
1475					.expect("Remote peer id not to be equal to local peer id.")
1476					.is_empty(),
1477				"Expect peer with supported protocol to be added."
1478			);
1479		}
1480	}
1481}