Skip to main content

sc_network/
discovery.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Discovery mechanisms of Substrate.
20//!
21//! The `DiscoveryBehaviour` struct implements the `NetworkBehaviour` trait of libp2p and is
22//! responsible for discovering other nodes that are part of the network.
23//!
24//! Substrate uses the following mechanisms in order to discover nodes that are part of the network:
25//!
26//! - Bootstrap nodes. These are hard-coded node identities and addresses passed in the constructor
27//! of the `DiscoveryBehaviour`. You can also call `add_known_address` later to add an entry.
28//!
29//! - mDNS. Discovers nodes on the local network by broadcasting UDP packets.
30//!
31//! - Kademlia random walk. Once connected, we perform random Kademlia `FIND_NODE` requests on the
32//! configured Kademlia DHTs in order for nodes to propagate to us their view of the network. This
33//! is performed automatically by the `DiscoveryBehaviour`.
34//!
35//! Additionally, the `DiscoveryBehaviour` is also capable of storing and loading value in the
36//! configured DHTs.
37//!
38//! ## Usage
39//!
40//! The `DiscoveryBehaviour` generates events of type `DiscoveryOut`, most notably
41//! `DiscoveryOut::Discovered` that is generated whenever we discover a node.
42//! Only the identity of the node is returned. The node's addresses are stored within the
43//! `DiscoveryBehaviour` and can be queried through the `NetworkBehaviour` trait.
44//!
45//! **Important**: In order for the discovery mechanism to work properly, there needs to be an
46//! active mechanism that asks nodes for the addresses they are listening on. Whenever we learn
47//! of a node's address, you must call `add_self_reported_address`.
48
49use crate::{
50	config::{
51		ProtocolId, KADEMLIA_MAX_PROVIDER_KEYS, KADEMLIA_PROVIDER_RECORD_TTL,
52		KADEMLIA_PROVIDER_REPUBLISH_INTERVAL,
53	},
54	utils::LruHashSet,
55};
56
57use array_bytes::bytes2hex;
58use futures::prelude::*;
59use futures_timer::Delay;
60use ip_network::IpNetwork;
61use libp2p::{
62	core::{transport::PortUse, Endpoint, Multiaddr},
63	kad::{
64		self,
65		store::{MemoryStore, MemoryStoreConfig, RecordStore},
66		Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
67		Event, GetClosestPeersError, GetClosestPeersOk, GetProvidersError, GetProvidersOk,
68		GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record, RecordKey,
69	},
70	mdns::{self, tokio::Behaviour as TokioMdns},
71	multiaddr::Protocol,
72	swarm::{
73		behaviour::{
74			toggle::{Toggle, ToggleConnectionHandler},
75			DialFailure, ExternalAddrConfirmed, FromSwarm,
76		},
77		ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, THandler,
78		THandlerInEvent, THandlerOutEvent, ToSwarm,
79	},
80	PeerId,
81};
82use linked_hash_set::LinkedHashSet;
83use log::{debug, error, info, trace, warn};
84use sp_core::hexdisplay::HexDisplay;
85use std::{
86	cmp,
87	collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
88	num::NonZeroUsize,
89	task::{Context, Poll},
90	time::{Duration, Instant},
91};
92
93/// Logging target for the file.
94const LOG_TARGET: &str = "sub-libp2p::discovery";
95
96/// Maximum number of known external addresses that we will cache.
97/// This only affects whether we will log whenever we (re-)discover
98/// a given address.
99const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
100
101/// Default value for Kademlia replication factor which  determines to how many closest peers a
102/// record is replicated to.
103pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
104
105/// The minimum number of peers we expect an answer before we terminate the request.
106const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
107
108/// Query timeout for Kademlia requests. We need to increase this for record/provider publishing
109/// to not timeout most of the time.
110const KAD_QUERY_TIMEOUT: Duration = Duration::from_secs(300);
111
112/// `DiscoveryBehaviour` configuration.
113///
114///
115/// Note: In order to discover nodes or load and store values via Kademlia one has to add
116///       Kademlia protocol via [`DiscoveryConfig::with_kademlia`].
117pub struct DiscoveryConfig {
118	local_peer_id: PeerId,
119	permanent_addresses: Vec<(PeerId, Multiaddr)>,
120	dht_random_walk: bool,
121	allow_private_ip: bool,
122	allow_non_globals_in_dht: bool,
123	discovery_only_if_under_num: u64,
124	enable_mdns: bool,
125	kademlia_disjoint_query_paths: bool,
126	kademlia_protocol: Option<StreamProtocol>,
127	kademlia_legacy_protocol: Option<StreamProtocol>,
128	kademlia_replication_factor: NonZeroUsize,
129}
130
131impl DiscoveryConfig {
132	/// Create a default configuration with the given public key.
133	pub fn new(local_peer_id: PeerId) -> Self {
134		Self {
135			local_peer_id,
136			permanent_addresses: Vec::new(),
137			dht_random_walk: true,
138			allow_private_ip: true,
139			allow_non_globals_in_dht: false,
140			discovery_only_if_under_num: std::u64::MAX,
141			enable_mdns: false,
142			kademlia_disjoint_query_paths: false,
143			kademlia_protocol: None,
144			kademlia_legacy_protocol: None,
145			kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
146				.expect("value is a constant; constant is non-zero; qed."),
147		}
148	}
149
150	/// Set the number of active connections at which we pause discovery.
151	pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
152		self.discovery_only_if_under_num = limit;
153		self
154	}
155
156	/// Set custom nodes which never expire, e.g. bootstrap or reserved nodes.
157	pub fn with_permanent_addresses<I>(&mut self, permanent_addresses: I) -> &mut Self
158	where
159		I: IntoIterator<Item = (PeerId, Multiaddr)>,
160	{
161		self.permanent_addresses.extend(permanent_addresses);
162		self
163	}
164
165	/// Whether the discovery behaviour should periodically perform a random
166	/// walk on the DHT to discover peers.
167	pub fn with_dht_random_walk(&mut self, value: bool) -> &mut Self {
168		self.dht_random_walk = value;
169		self
170	}
171
172	/// Should private IPv4/IPv6 addresses be reported?
173	pub fn allow_private_ip(&mut self, value: bool) -> &mut Self {
174		self.allow_private_ip = value;
175		self
176	}
177
178	/// Should non-global addresses be inserted to the DHT?
179	pub fn allow_non_globals_in_dht(&mut self, value: bool) -> &mut Self {
180		self.allow_non_globals_in_dht = value;
181		self
182	}
183
184	/// Should MDNS discovery be supported?
185	pub fn with_mdns(&mut self, value: bool) -> &mut Self {
186		self.enable_mdns = value;
187		self
188	}
189
190	/// Add discovery via Kademlia for the given protocol.
191	///
192	/// Currently accepts `protocol_id`. This should be removed once all the nodes
193	/// are upgraded to genesis hash- and fork ID-based Kademlia protocol name.
194	pub fn with_kademlia<Hash: AsRef<[u8]>>(
195		&mut self,
196		genesis_hash: Hash,
197		fork_id: Option<&str>,
198		protocol_id: &ProtocolId,
199	) -> &mut Self {
200		self.kademlia_protocol = Some(kademlia_protocol_name(genesis_hash, fork_id));
201		self.kademlia_legacy_protocol = Some(legacy_kademlia_protocol_name(protocol_id));
202		self
203	}
204
205	/// Require iterative Kademlia DHT queries to use disjoint paths for increased resiliency in the
206	/// presence of potentially adversarial nodes.
207	pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
208		self.kademlia_disjoint_query_paths = value;
209		self
210	}
211
212	/// Sets Kademlia replication factor.
213	pub fn with_kademlia_replication_factor(&mut self, value: NonZeroUsize) -> &mut Self {
214		self.kademlia_replication_factor = value;
215		self
216	}
217
218	/// Create a `DiscoveryBehaviour` from this config.
219	pub fn finish(self) -> DiscoveryBehaviour {
220		let Self {
221			local_peer_id,
222			permanent_addresses,
223			dht_random_walk,
224			allow_private_ip,
225			allow_non_globals_in_dht,
226			discovery_only_if_under_num,
227			enable_mdns,
228			kademlia_disjoint_query_paths,
229			kademlia_protocol,
230			kademlia_legacy_protocol: _,
231			kademlia_replication_factor,
232		} = self;
233
234		let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
235			let mut config = KademliaConfig::new(kademlia_protocol.clone());
236
237			config.set_replication_factor(kademlia_replication_factor);
238
239			config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
240
241			config.set_query_timeout(KAD_QUERY_TIMEOUT);
242
243			// By default Kademlia attempts to insert all peers into its routing table once a
244			// dialing attempt succeeds. In order to control which peer is added, disable the
245			// auto-insertion and instead add peers manually.
246			config.set_kbucket_inserts(BucketInserts::Manual);
247			config.disjoint_query_paths(kademlia_disjoint_query_paths);
248
249			config.set_provider_record_ttl(Some(KADEMLIA_PROVIDER_RECORD_TTL));
250			config.set_provider_publication_interval(Some(KADEMLIA_PROVIDER_REPUBLISH_INTERVAL));
251
252			let store = MemoryStore::with_config(
253				local_peer_id,
254				MemoryStoreConfig {
255					max_provided_keys: KADEMLIA_MAX_PROVIDER_KEYS,
256					..Default::default()
257				},
258			);
259
260			let mut kad = Kademlia::with_config(local_peer_id, store, config);
261			kad.set_mode(Some(kad::Mode::Server));
262
263			for (peer_id, addr) in &permanent_addresses {
264				kad.add_address(peer_id, addr.clone());
265			}
266
267			Some(kad)
268		} else {
269			None
270		};
271
272		DiscoveryBehaviour {
273			permanent_addresses,
274			ephemeral_addresses: HashMap::new(),
275			kademlia: Toggle::from(kademlia),
276			next_kad_random_query: if dht_random_walk {
277				Some(Delay::new(Duration::new(0, 0)))
278			} else {
279				None
280			},
281			duration_to_next_kad: Duration::from_secs(1),
282			pending_events: VecDeque::new(),
283			local_peer_id,
284			num_connections: 0,
285			allow_private_ip,
286			discovery_only_if_under_num,
287			mdns: if enable_mdns {
288				match TokioMdns::new(mdns::Config::default(), local_peer_id) {
289					Ok(mdns) => Toggle::from(Some(mdns)),
290					Err(err) => {
291						warn!(target: LOG_TARGET, "Failed to initialize mDNS: {:?}", err);
292						Toggle::from(None)
293					},
294				}
295			} else {
296				Toggle::from(None)
297			},
298			allow_non_globals_in_dht,
299			known_external_addresses: LruHashSet::new(
300				NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
301					.expect("value is a constant; constant is non-zero; qed."),
302			),
303			records_to_publish: Default::default(),
304			kademlia_protocol,
305			provider_keys_requested: HashMap::new(),
306		}
307	}
308}
309
310/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
311pub struct DiscoveryBehaviour {
312	/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
313	/// reserved nodes.
314	permanent_addresses: Vec<(PeerId, Multiaddr)>,
315	/// Same as `permanent_addresses`, except that addresses that fail to reach a peer are
316	/// removed.
317	ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
318	/// Kademlia requests and answers. Even though it's wrapped in `Toggle`, currently
319	/// it's always enabled in `NetworkWorker::new()`.
320	kademlia: Toggle<Kademlia<MemoryStore>>,
321	/// Discovers nodes on the local network.
322	mdns: Toggle<TokioMdns>,
323	/// Stream that fires when we need to perform the next random Kademlia query. `None` if
324	/// random walking is disabled.
325	next_kad_random_query: Option<Delay>,
326	/// After `next_kad_random_query` triggers, the next one triggers after this duration.
327	duration_to_next_kad: Duration,
328	/// Events to return in priority when polled.
329	pending_events: VecDeque<DiscoveryOut>,
330	/// Identity of our local node.
331	local_peer_id: PeerId,
332	/// Number of nodes we're currently connected to.
333	num_connections: u64,
334	/// If false, `addresses_of_peer` won't return any private IPv4/IPv6 address, except for the
335	/// ones stored in `permanent_addresses` or `ephemeral_addresses`.
336	allow_private_ip: bool,
337	/// Number of active connections over which we interrupt the discovery process.
338	discovery_only_if_under_num: u64,
339	/// Should non-global addresses be added to the DHT?
340	allow_non_globals_in_dht: bool,
341	/// A cache of discovered external addresses. Only used for logging purposes.
342	known_external_addresses: LruHashSet<Multiaddr>,
343	/// Records to publish per QueryId.
344	///
345	/// After finishing a Kademlia query, libp2p will return us a list of the closest peers that
346	/// did not return the record(in `FinishedWithNoAdditionalRecord`). We will then put the record
347	/// to these peers.
348	records_to_publish: HashMap<QueryId, Record>,
349	/// The chain based kademlia protocol name (including genesis hash and fork id).
350	///
351	/// Remove when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
352	/// <https://github.com/paritytech/polkadot-sdk/issues/504>.
353	kademlia_protocol: Option<StreamProtocol>,
354	/// Provider keys requested with `GET_PROVIDERS` queries.
355	provider_keys_requested: HashMap<QueryId, RecordKey>,
356}
357
358impl DiscoveryBehaviour {
359	/// Returns the list of nodes that we know exist in the network.
360	pub fn known_peers(&mut self) -> HashSet<PeerId> {
361		let mut peers = HashSet::new();
362		if let Some(k) = self.kademlia.as_mut() {
363			for b in k.kbuckets() {
364				for e in b.iter() {
365					if !peers.contains(e.node.key.preimage()) {
366						peers.insert(*e.node.key.preimage());
367					}
368				}
369			}
370		}
371		peers
372	}
373
374	/// Adds a hard-coded address for the given peer, that never expires.
375	///
376	/// This adds an entry to the parameter that was passed to `new`.
377	///
378	/// If we didn't know this address before, also generates a `Discovered` event.
379	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
380		let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
381		if addrs_list.contains(&addr) {
382			return;
383		}
384
385		if let Some(k) = self.kademlia.as_mut() {
386			k.add_address(&peer_id, addr.clone());
387		}
388
389		self.pending_events.push_back(DiscoveryOut::Discovered(peer_id));
390		addrs_list.push(addr);
391	}
392
393	/// Add a self-reported address of a remote peer to the k-buckets of the DHT
394	/// if it has compatible `supported_protocols`.
395	///
396	/// **Note**: It is important that you call this method. The discovery mechanism will not
397	/// automatically add connecting peers to the Kademlia k-buckets.
398	pub fn add_self_reported_address(
399		&mut self,
400		peer_id: &PeerId,
401		supported_protocols: &[StreamProtocol],
402		addr: Multiaddr,
403	) {
404		if let Some(kademlia) = self.kademlia.as_mut() {
405			if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
406				trace!(
407					target: LOG_TARGET,
408					"Ignoring self-reported non-global address {} from {}.", addr, peer_id
409				);
410				return;
411			}
412
413			// The supported protocols must include the chain-based Kademlia protocol.
414			//
415			// Extract the chain-based Kademlia protocol from `kademlia.protocol_name()`
416			// when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
417			// https://github.com/paritytech/polkadot-sdk/issues/504.
418			if !supported_protocols.iter().any(|p| {
419				p == self
420					.kademlia_protocol
421					.as_ref()
422					.expect("kademlia protocol was checked above to be enabled; qed")
423			}) {
424				trace!(
425					target: LOG_TARGET,
426					"Ignoring self-reported address {} from {} as remote node is not part of the \
427					 Kademlia DHT supported by the local node.", addr, peer_id,
428				);
429				return;
430			}
431
432			trace!(
433				target: LOG_TARGET,
434				"Adding self-reported address {} from {} to Kademlia DHT.",
435				addr, peer_id
436			);
437			kademlia.add_address(peer_id, addr.clone());
438		}
439	}
440
441	/// Start finding the closest peers to the given `PeerId`.
442	///
443	/// A corresponding `ClosestPeersFound` or `ClosestPeersNotFound` event will later be generated.
444	pub fn find_closest_peers(&mut self, target: PeerId) {
445		if let Some(k) = self.kademlia.as_mut() {
446			k.get_closest_peers(target);
447		}
448	}
449
450	/// Start fetching a record from the DHT.
451	///
452	/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
453	pub fn get_value(&mut self, key: RecordKey) {
454		if let Some(k) = self.kademlia.as_mut() {
455			k.get_record(key.clone());
456		}
457	}
458
459	/// Start putting a record into the DHT. Other nodes can later fetch that value with
460	/// `get_value`.
461	///
462	/// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated.
463	pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
464		if let Some(k) = self.kademlia.as_mut() {
465			if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
466				warn!(target: LOG_TARGET, "Libp2p => Failed to put record: {:?}", e);
467				self.pending_events
468					.push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
469			}
470		}
471	}
472
473	/// Puts a record into the DHT on the provided `peers`
474	///
475	/// If `update_local_storage` is true, the local storage is update as well.
476	pub fn put_record_to(
477		&mut self,
478		record: Record,
479		peers: HashSet<sc_network_types::PeerId>,
480		update_local_storage: bool,
481	) {
482		if let Some(kad) = self.kademlia.as_mut() {
483			if update_local_storage {
484				if let Err(_e) = kad.store_mut().put(record.clone()) {
485					warn!(target: LOG_TARGET, "Failed to update local starage");
486				}
487			}
488
489			if !peers.is_empty() {
490				kad.put_record_to(
491					record,
492					peers.into_iter().map(|peer_id| peer_id.into()),
493					Quorum::All,
494				);
495			}
496		}
497	}
498
499	/// Register as a content provider on the DHT for `key`.
500	pub fn start_providing(&mut self, key: RecordKey) {
501		if let Some(kad) = self.kademlia.as_mut() {
502			if let Err(e) = kad.start_providing(key.clone()) {
503				warn!(target: LOG_TARGET, "Libp2p => Failed to start providing {key:?}: {e}.");
504				self.pending_events
505					.push_back(DiscoveryOut::StartProvidingFailed(key, Duration::from_secs(0)));
506			}
507		}
508	}
509
510	/// Deregister as a content provider on the DHT for `key`.
511	pub fn stop_providing(&mut self, key: &RecordKey) {
512		if let Some(kad) = self.kademlia.as_mut() {
513			kad.stop_providing(key);
514		}
515	}
516
517	/// Get content providers for `key` from the DHT.
518	pub fn get_providers(&mut self, key: RecordKey) {
519		if let Some(kad) = self.kademlia.as_mut() {
520			let query_id = kad.get_providers(key.clone());
521			self.provider_keys_requested.insert(query_id, key);
522		}
523	}
524
525	/// Store a record in the Kademlia record store.
526	pub fn store_record(
527		&mut self,
528		record_key: RecordKey,
529		record_value: Vec<u8>,
530		publisher: Option<PeerId>,
531		expires: Option<Instant>,
532	) {
533		if let Some(k) = self.kademlia.as_mut() {
534			if let Err(err) = k.store_mut().put(Record {
535				key: record_key,
536				value: record_value,
537				publisher: publisher.map(|publisher| publisher.into()),
538				expires,
539			}) {
540				debug!(
541					target: LOG_TARGET,
542					"Failed to store record with key: {:?}",
543					err
544				);
545			}
546		}
547	}
548
549	/// Returns the number of nodes in each Kademlia kbucket for each Kademlia instance.
550	///
551	/// Identifies Kademlia instances by their [`ProtocolId`] and kbuckets by the base 2 logarithm
552	/// of their lower bound.
553	pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
554		self.kademlia.as_mut().map(|kad| {
555			kad.kbuckets()
556				.map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
557				.collect()
558		})
559	}
560
561	/// Returns the number of records in the Kademlia record stores.
562	pub fn num_kademlia_records(&mut self) -> Option<usize> {
563		// Note that this code is ok only because we use a `MemoryStore`.
564		self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
565	}
566
567	/// Returns the total size in bytes of all the records in the Kademlia record stores.
568	pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
569		// Note that this code is ok only because we use a `MemoryStore`. If the records were
570		// for example stored on disk, this would load every single one of them every single time.
571		self.kademlia
572			.as_mut()
573			.map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
574	}
575
576	/// Can the given `Multiaddr` be put into the DHT?
577	///
578	/// This test is successful only for global IP addresses and DNS names.
579	// NB: Currently all DNS names are allowed and no check for TLD suffixes is done
580	// because the set of valid domains is highly dynamic and would require frequent
581	// updates, for example by utilising publicsuffix.org or IANA.
582	pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
583		let ip = match addr.iter().next() {
584			Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
585			Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
586			Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) => {
587				return true
588			},
589			_ => return false,
590		};
591		ip.is_global()
592	}
593}
594
595/// Event generated by the `DiscoveryBehaviour`.
596#[derive(Debug)]
597pub enum DiscoveryOut {
598	/// We discovered a peer and currenlty have it's addresses stored either in the routing
599	/// table or in the ephemeral addresses list, so a connection can be established.
600	Discovered(PeerId),
601
602	/// A peer connected to this node for whom no listen address is known.
603	///
604	/// In order for the peer to be added to the Kademlia routing table, a known
605	/// listen address must be added via
606	/// [`DiscoveryBehaviour::add_self_reported_address`], e.g. obtained through
607	/// the `identify` protocol.
608	UnroutablePeer(PeerId),
609
610	/// `FIND_NODE` query yielded closest peers with their addresses. This event also delivers
611	/// a partial result in case the query timed out, because it can contain the target peer's
612	/// address.
613	ClosestPeersFound(PeerId, Vec<(PeerId, Vec<Multiaddr>)>, Duration),
614
615	/// The closest peers to the target `PeerId` have not been found.
616	ClosestPeersNotFound(PeerId, Duration),
617
618	/// The DHT yielded results for the record request.
619	///
620	/// Returning the result grouped in (key, value) pairs as well as the request duration.
621	ValueFound(PeerRecord, Duration),
622
623	/// The DHT received a put record request.
624	PutRecordRequest(
625		RecordKey,
626		Vec<u8>,
627		Option<sc_network_types::PeerId>,
628		Option<std::time::Instant>,
629	),
630
631	/// The record requested was not found in the DHT.
632	///
633	/// Returning the corresponding key as well as the request duration.
634	ValueNotFound(RecordKey, Duration),
635
636	/// The record with a given key was successfully inserted into the DHT.
637	///
638	/// Returning the corresponding key as well as the request duration.
639	ValuePut(RecordKey, Duration),
640
641	/// Inserting a value into the DHT failed.
642	///
643	/// Returning the corresponding key as well as the request duration.
644	ValuePutFailed(RecordKey, Duration),
645
646	/// The content provider for a given key was successfully published.
647	StartedProviding(RecordKey, Duration),
648
649	/// Starting providing a key failed.
650	StartProvidingFailed(RecordKey, Duration),
651
652	/// The DHT yielded results for the providers request.
653	ProvidersFound(RecordKey, HashSet<PeerId>, Duration),
654
655	/// The DHT yielded no more providers for the key (`GET_PROVIDERS` query finished).
656	NoMoreProviders(RecordKey, Duration),
657
658	/// Providers for the requested key were not found in the DHT.
659	ProvidersNotFound(RecordKey, Duration),
660
661	/// Started a random Kademlia query.
662	///
663	/// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`.
664	RandomKademliaStarted,
665}
666
667impl NetworkBehaviour for DiscoveryBehaviour {
668	type ConnectionHandler =
669		ToggleConnectionHandler<<Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler>;
670	type ToSwarm = DiscoveryOut;
671
672	fn handle_established_inbound_connection(
673		&mut self,
674		connection_id: ConnectionId,
675		peer: PeerId,
676		local_addr: &Multiaddr,
677		remote_addr: &Multiaddr,
678	) -> Result<THandler<Self>, ConnectionDenied> {
679		self.kademlia.handle_established_inbound_connection(
680			connection_id,
681			peer,
682			local_addr,
683			remote_addr,
684		)
685	}
686
687	fn handle_established_outbound_connection(
688		&mut self,
689		connection_id: ConnectionId,
690		peer: PeerId,
691		addr: &Multiaddr,
692		role_override: Endpoint,
693		port_use: PortUse,
694	) -> Result<THandler<Self>, ConnectionDenied> {
695		self.kademlia.handle_established_outbound_connection(
696			connection_id,
697			peer,
698			addr,
699			role_override,
700			port_use,
701		)
702	}
703
704	fn handle_pending_inbound_connection(
705		&mut self,
706		connection_id: ConnectionId,
707		local_addr: &Multiaddr,
708		remote_addr: &Multiaddr,
709	) -> Result<(), ConnectionDenied> {
710		self.kademlia
711			.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
712	}
713
714	fn handle_pending_outbound_connection(
715		&mut self,
716		connection_id: ConnectionId,
717		maybe_peer: Option<PeerId>,
718		addresses: &[Multiaddr],
719		effective_role: Endpoint,
720	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
721		let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
722
723		// Collect addresses into [`LinkedHashSet`] to eliminate duplicate entries preserving the
724		// order of addresses. Give priority to `permanent_addresses` (used with reserved nodes) and
725		// `ephemeral_addresses` (used for addresses discovered from other sources, like authority
726		// discovery DHT records).
727		let mut list: LinkedHashSet<_> = self
728			.permanent_addresses
729			.iter()
730			.filter_map(|(p, a)| (*p == peer_id).then(|| a.clone()))
731			.collect();
732
733		if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
734			ephemeral_addresses.iter().for_each(|address| {
735				list.insert_if_absent(address.clone());
736			});
737		}
738
739		{
740			let mut list_to_filter = self.kademlia.handle_pending_outbound_connection(
741				connection_id,
742				maybe_peer,
743				addresses,
744				effective_role,
745			)?;
746
747			list_to_filter.extend(self.mdns.handle_pending_outbound_connection(
748				connection_id,
749				maybe_peer,
750				addresses,
751				effective_role,
752			)?);
753
754			if !self.allow_private_ip {
755				list_to_filter.retain(|addr| match addr.iter().next() {
756					Some(Protocol::Ip4(addr)) if !IpNetwork::from(addr).is_global() => false,
757					Some(Protocol::Ip6(addr)) if !IpNetwork::from(addr).is_global() => false,
758					_ => true,
759				});
760			}
761
762			list_to_filter.into_iter().for_each(|address| {
763				list.insert_if_absent(address);
764			});
765		}
766
767		trace!(target: LOG_TARGET, "Addresses of {:?}: {:?}", peer_id, list);
768
769		Ok(list.into_iter().collect())
770	}
771
772	fn on_swarm_event(&mut self, event: FromSwarm) {
773		match event {
774			FromSwarm::ConnectionEstablished(e) => {
775				self.num_connections += 1;
776				self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
777			},
778			FromSwarm::ConnectionClosed(e) => {
779				self.num_connections -= 1;
780				self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
781			},
782			FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
783				if let Some(peer_id) = peer_id {
784					if let DialError::Transport(errors) = error {
785						if let Entry::Occupied(mut entry) = self.ephemeral_addresses.entry(peer_id)
786						{
787							for (addr, _error) in errors {
788								entry.get_mut().retain(|a| a != addr);
789							}
790							if entry.get().is_empty() {
791								entry.remove();
792							}
793						}
794					}
795				}
796
797				self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
798			},
799			FromSwarm::ListenerClosed(e) => {
800				self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
801			},
802			FromSwarm::ListenFailure(e) => {
803				self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
804			},
805			FromSwarm::ListenerError(e) => {
806				self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
807			},
808			FromSwarm::ExternalAddrExpired(e) => {
809				// We intentionally don't remove the element from `known_external_addresses` in
810				// order to not print the log line again.
811
812				self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
813			},
814			FromSwarm::NewListener(e) => {
815				self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
816			},
817			FromSwarm::ExpiredListenAddr(e) => {
818				self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
819			},
820			FromSwarm::NewExternalAddrCandidate(e) => {
821				self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
822			},
823			FromSwarm::AddressChange(e) => {
824				self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
825			},
826			FromSwarm::NewListenAddr(e) => {
827				self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
828				self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
829			},
830			FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
831				let mut address = addr.clone();
832
833				if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
834					if peer_id != self.local_peer_id {
835						warn!(
836							target: LOG_TARGET,
837							"🔍 Discovered external address for a peer that is not us: {addr}",
838						);
839						// Ensure this address is not propagated to kademlia.
840						return;
841					}
842				} else {
843					address.push(Protocol::P2p(self.local_peer_id));
844				}
845
846				if Self::can_add_to_dht(&address) {
847					// NOTE: we might re-discover the same address multiple times
848					// in which case we just want to refrain from logging.
849					if self.known_external_addresses.insert(address.clone()) {
850						info!(
851						  target: LOG_TARGET,
852						  "🔍 Discovered new external address for our node: {address}",
853						);
854					}
855				}
856
857				self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
858			},
859			FromSwarm::NewExternalAddrOfPeer(e) => {
860				self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
861				self.mdns.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
862			},
863			event => {
864				debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
865				self.kademlia.on_swarm_event(event);
866				self.mdns.on_swarm_event(event);
867			},
868		}
869	}
870
871	fn on_connection_handler_event(
872		&mut self,
873		peer_id: PeerId,
874		connection_id: ConnectionId,
875		event: THandlerOutEvent<Self>,
876	) {
877		self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
878	}
879
880	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
881		// Immediately process the content of `discovered`.
882		if let Some(ev) = self.pending_events.pop_front() {
883			return Poll::Ready(ToSwarm::GenerateEvent(ev));
884		}
885
886		// Poll the stream that fires when we need to start a random Kademlia query.
887		if let Some(kademlia) = self.kademlia.as_mut() {
888			if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
889				while next_kad_random_query.poll_unpin(cx).is_ready() {
890					let actually_started =
891						if self.num_connections < self.discovery_only_if_under_num {
892							let random_peer_id = PeerId::random();
893							debug!(
894								target: LOG_TARGET,
895								"Libp2p <= Starting random Kademlia request for {:?}",
896								random_peer_id,
897							);
898							kademlia.get_closest_peers(random_peer_id);
899							true
900						} else {
901							debug!(
902								target: LOG_TARGET,
903								"Kademlia paused due to high number of connections ({})",
904								self.num_connections
905							);
906							false
907						};
908
909					// Schedule the next random query with exponentially increasing delay,
910					// capped at 60 seconds.
911					*next_kad_random_query = Delay::new(self.duration_to_next_kad);
912					self.duration_to_next_kad =
913						cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
914
915					if actually_started {
916						let ev = DiscoveryOut::RandomKademliaStarted;
917						return Poll::Ready(ToSwarm::GenerateEvent(ev));
918					}
919				}
920			}
921		}
922
923		while let Poll::Ready(ev) = self.kademlia.poll(cx) {
924			match ev {
925				ToSwarm::GenerateEvent(ev) => match ev {
926					KademliaEvent::RoutingUpdated { peer, .. } => {
927						let ev = DiscoveryOut::Discovered(peer);
928						return Poll::Ready(ToSwarm::GenerateEvent(ev));
929					},
930					KademliaEvent::UnroutablePeer { peer, .. } => {
931						let ev = DiscoveryOut::UnroutablePeer(peer);
932						return Poll::Ready(ToSwarm::GenerateEvent(ev));
933					},
934					KademliaEvent::RoutablePeer { .. } => {
935						// Generate nothing, because the address was not added to the routing table,
936						// so we will not be able to connect to the peer.
937					},
938					KademliaEvent::PendingRoutablePeer { .. } => {
939						// We are not interested in this event at the moment.
940					},
941					KademliaEvent::InboundRequest { request } => match request {
942						libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } => {
943							return Poll::Ready(ToSwarm::GenerateEvent(
944								DiscoveryOut::PutRecordRequest(
945									record.key,
946									record.value,
947									record.publisher.map(Into::into),
948									record.expires,
949								),
950							))
951						},
952						_ => {},
953					},
954					KademliaEvent::OutboundQueryProgressed {
955						result: QueryResult::GetClosestPeers(res),
956						stats,
957						..
958					} => {
959						let (key, peers, timeout) = match res {
960							Ok(GetClosestPeersOk { key, peers }) => (key, peers, false),
961							Err(GetClosestPeersError::Timeout { key, peers }) => (key, peers, true),
962						};
963
964						let target = match PeerId::from_bytes(&key.clone()) {
965							Ok(peer_id) => peer_id,
966							Err(_) => {
967								warn!(
968									target: LOG_TARGET,
969									"Libp2p => FIND_NODE query finished for target that is not \
970									 a peer ID: {:?}",
971									HexDisplay::from(&key),
972								);
973								continue;
974							},
975						};
976
977						if timeout {
978							debug!(
979								target: LOG_TARGET,
980								"Libp2p => Query for target {target:?} timed out and yielded {} peers",
981								peers.len(),
982							);
983						} else {
984							debug!(
985								target: LOG_TARGET,
986								"Libp2p => Query for target {target:?} yielded {} peers",
987								peers.len(),
988							);
989						}
990
991						let ev = if peers.is_empty() {
992							DiscoveryOut::ClosestPeersNotFound(
993								target,
994								stats.duration().unwrap_or_default(),
995							)
996						} else {
997							DiscoveryOut::ClosestPeersFound(
998								target,
999								peers.into_iter().map(|p| (p.peer_id, p.addrs)).collect(),
1000								stats.duration().unwrap_or_default(),
1001							)
1002						};
1003
1004						return Poll::Ready(ToSwarm::GenerateEvent(ev));
1005					},
1006					KademliaEvent::OutboundQueryProgressed {
1007						result: QueryResult::GetRecord(res),
1008						stats,
1009						id,
1010						..
1011					} => {
1012						let ev = match res {
1013							Ok(GetRecordOk::FoundRecord(r)) => {
1014								debug!(
1015									target: LOG_TARGET,
1016									"Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
1017									r.record.key,
1018									r.record.value,
1019									id,
1020									stats,
1021								);
1022
1023								// Let's directly finish the query if we are above 4.
1024								// This number is small enough to make sure we don't
1025								// unnecessarily flood the network with queries, but high
1026								// enough to make sure we also touch peers which might have
1027								// old record, so that we can update them once we notice
1028								// they have old records.
1029								if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
1030									if let Some(kad) = self.kademlia.as_mut() {
1031										if let Some(mut query) = kad.query_mut(&id) {
1032											query.finish();
1033										}
1034									}
1035								}
1036
1037								// Will be removed below when we receive
1038								// `FinishedWithNoAdditionalRecord`.
1039								self.records_to_publish.insert(id, r.record.clone());
1040
1041								DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
1042							},
1043							Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
1044								cache_candidates,
1045							}) => {
1046								debug!(
1047									target: LOG_TARGET,
1048									"Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
1049									id,
1050									stats,
1051									stats.duration().map(|val| val.as_millis())
1052								);
1053								// We always need to remove the record to not leak any data!
1054								if let Some(record) = self.records_to_publish.remove(&id) {
1055									if cache_candidates.is_empty() {
1056										continue;
1057									}
1058
1059									// Put the record to the `cache_candidates` that are nearest to
1060									// the record key from our point of view of the network.
1061									if let Some(kad) = self.kademlia.as_mut() {
1062										kad.put_record_to(
1063											record,
1064											cache_candidates.into_iter().map(|v| v.1),
1065											Quorum::One,
1066										);
1067									}
1068								}
1069
1070								continue;
1071							},
1072							Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
1073								trace!(
1074									target: LOG_TARGET,
1075									"Libp2p => Failed to get record: {:?}",
1076									e,
1077								);
1078								DiscoveryOut::ValueNotFound(
1079									e.into_key(),
1080									stats.duration().unwrap_or_default(),
1081								)
1082							},
1083							Err(e) => {
1084								debug!(
1085									target: LOG_TARGET,
1086									"Libp2p => Failed to get record: {:?}",
1087									e,
1088								);
1089								DiscoveryOut::ValueNotFound(
1090									e.into_key(),
1091									stats.duration().unwrap_or_default(),
1092								)
1093							},
1094						};
1095						return Poll::Ready(ToSwarm::GenerateEvent(ev));
1096					},
1097					KademliaEvent::OutboundQueryProgressed {
1098						result: QueryResult::GetProviders(res),
1099						stats,
1100						id,
1101						..
1102					} => {
1103						let ev = match res {
1104							Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1105								debug!(
1106									target: LOG_TARGET,
1107									"Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}",
1108									providers,
1109									key,
1110									id,
1111									stats,
1112								);
1113
1114								DiscoveryOut::ProvidersFound(
1115									key,
1116									providers,
1117									stats.duration().unwrap_or_default(),
1118								)
1119							},
1120							Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
1121								closest_peers: _,
1122							}) => {
1123								debug!(
1124									target: LOG_TARGET,
1125									"Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms",
1126									id,
1127									stats,
1128									stats.duration().map(|val| val.as_millis())
1129								);
1130
1131								if let Some(key) = self.provider_keys_requested.remove(&id) {
1132									DiscoveryOut::NoMoreProviders(
1133										key,
1134										stats.duration().unwrap_or_default(),
1135									)
1136								} else {
1137									error!(
1138										target: LOG_TARGET,
1139										"No key found for `GET_PROVIDERS` query {id:?}. This is a bug.",
1140									);
1141									continue;
1142								}
1143							},
1144							Err(GetProvidersError::Timeout { key, closest_peers: _ }) => {
1145								debug!(
1146									target: LOG_TARGET,
1147									"Libp2p => Failed to get providers for {key:?} due to timeout.",
1148								);
1149
1150								self.provider_keys_requested.remove(&id);
1151
1152								DiscoveryOut::ProvidersNotFound(
1153									key,
1154									stats.duration().unwrap_or_default(),
1155								)
1156							},
1157						};
1158						return Poll::Ready(ToSwarm::GenerateEvent(ev));
1159					},
1160					KademliaEvent::OutboundQueryProgressed {
1161						result: QueryResult::PutRecord(res),
1162						stats,
1163						..
1164					} => {
1165						let ev = match res {
1166							Ok(ok) => {
1167								trace!(
1168									target: LOG_TARGET,
1169									"Libp2p => Put record for key: {:?}",
1170									ok.key,
1171								);
1172								DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default())
1173							},
1174							Err(e) => {
1175								debug!(
1176									target: LOG_TARGET,
1177									"Libp2p => Failed to put record for key {:?}: {:?}",
1178									e.key(),
1179									e,
1180								);
1181								DiscoveryOut::ValuePutFailed(
1182									e.into_key(),
1183									stats.duration().unwrap_or_default(),
1184								)
1185							},
1186						};
1187						return Poll::Ready(ToSwarm::GenerateEvent(ev));
1188					},
1189					KademliaEvent::OutboundQueryProgressed {
1190						result: QueryResult::RepublishRecord(res),
1191						..
1192					} => match res {
1193						Ok(ok) => debug!(
1194							target: LOG_TARGET,
1195							"Libp2p => Record republished: {:?}",
1196							ok.key,
1197						),
1198						Err(e) => debug!(
1199							target: LOG_TARGET,
1200							"Libp2p => Republishing of record {:?} failed with: {:?}",
1201							e.key(), e,
1202						),
1203					},
1204					KademliaEvent::OutboundQueryProgressed {
1205						result: QueryResult::StartProviding(res),
1206						stats,
1207						..
1208					} => {
1209						let ev = match res {
1210							Ok(ok) => {
1211								trace!(
1212									target: LOG_TARGET,
1213									"Libp2p => Started providing key {:?}",
1214									ok.key,
1215								);
1216								DiscoveryOut::StartedProviding(
1217									ok.key,
1218									stats.duration().unwrap_or_default(),
1219								)
1220							},
1221							Err(e) => {
1222								debug!(
1223									target: LOG_TARGET,
1224									"Libp2p => Failed to start providing key {:?}: {:?}",
1225									e.key(),
1226									e,
1227								);
1228								DiscoveryOut::StartProvidingFailed(
1229									e.into_key(),
1230									stats.duration().unwrap_or_default(),
1231								)
1232							},
1233						};
1234						return Poll::Ready(ToSwarm::GenerateEvent(ev));
1235					},
1236					KademliaEvent::OutboundQueryProgressed {
1237						result: QueryResult::Bootstrap(res),
1238						..
1239					} => match res {
1240						Ok(ok) => debug!(
1241							target: LOG_TARGET,
1242							"Libp2p => DHT bootstrap progressed: {ok:?}",
1243						),
1244						Err(e) => warn!(
1245							target: LOG_TARGET,
1246							"Libp2p => DHT bootstrap error: {e:?}",
1247						),
1248					},
1249					// We never start any other type of query.
1250					KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
1251						warn!(target: LOG_TARGET, "Libp2p => Unhandled Kademlia event: {:?}", e)
1252					},
1253					Event::ModeChanged { new_mode } => {
1254						debug!(target: LOG_TARGET, "Libp2p => Kademlia mode changed: {new_mode}")
1255					},
1256				},
1257				ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
1258				event => {
1259					return Poll::Ready(event.map_out(|_| {
1260						unreachable!("`GenerateEvent` is handled in a branch above; qed")
1261					}));
1262				},
1263			}
1264		}
1265
1266		// Poll mDNS.
1267		while let Poll::Ready(ev) = self.mdns.poll(cx) {
1268			match ev {
1269				ToSwarm::GenerateEvent(event) => match event {
1270					mdns::Event::Discovered(list) => {
1271						if self.num_connections >= self.discovery_only_if_under_num {
1272							continue;
1273						}
1274
1275						self.pending_events.extend(
1276							list.into_iter().map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)),
1277						);
1278						if let Some(ev) = self.pending_events.pop_front() {
1279							return Poll::Ready(ToSwarm::GenerateEvent(ev));
1280						}
1281					},
1282					mdns::Event::Expired(_) => {},
1283				},
1284				ToSwarm::Dial { .. } => {
1285					unreachable!("mDNS never dials!");
1286				},
1287				// `event` is an enum with no variant
1288				ToSwarm::NotifyHandler { event, .. } => match event {},
1289				event => {
1290					return Poll::Ready(
1291						event
1292							.map_in(|_| {
1293								unreachable!("`NotifyHandler` is handled in a branch above; qed")
1294							})
1295							.map_out(|_| {
1296								unreachable!("`GenerateEvent` is handled in a branch above; qed")
1297							}),
1298					);
1299				},
1300			}
1301		}
1302
1303		Poll::Pending
1304	}
1305}
1306
1307/// Legacy (fallback) Kademlia protocol name based on `protocol_id`.
1308fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol {
1309	let name = format!("/{}/kad", id.as_ref());
1310	StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1311}
1312
1313/// Kademlia protocol name based on `genesis_hash` and `fork_id`.
1314fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
1315	genesis_hash: Hash,
1316	fork_id: Option<&str>,
1317) -> StreamProtocol {
1318	let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
1319	let name = if let Some(fork_id) = fork_id {
1320		format!("/{genesis_hash_hex}/{fork_id}/kad")
1321	} else {
1322		format!("/{genesis_hash_hex}/kad")
1323	};
1324
1325	StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1326}
1327
1328#[cfg(test)]
1329mod tests {
1330	use super::{kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig};
1331	use crate::config::ProtocolId;
1332	use libp2p::{identity::Keypair, Multiaddr};
1333	use sp_core::hash::H256;
1334
1335	#[cfg(ignore_flaky_test)] // https://github.com/paritytech/polkadot-sdk/issues/48
1336	#[tokio::test]
1337	async fn discovery_working() {
1338		use super::DiscoveryOut;
1339		use futures::prelude::*;
1340		use libp2p::{
1341			core::{
1342				transport::{MemoryTransport, Transport},
1343				upgrade,
1344			},
1345			noise,
1346			swarm::{Swarm, SwarmEvent},
1347			yamux,
1348		};
1349		use std::{collections::HashSet, task::Poll, time::Duration};
1350		let mut first_swarm_peer_id_and_addr = None;
1351
1352		let genesis_hash = H256::from_low_u64_be(1);
1353		let fork_id = Some("test-fork-id");
1354		let protocol_id = ProtocolId::from("dot");
1355
1356		// Build swarms whose behaviour is `DiscoveryBehaviour`, each aware of
1357		// the first swarm via `with_permanent_addresses`.
1358		let mut swarms = (0..25)
1359			.map(|i| {
1360				let mut swarm = libp2p::SwarmBuilder::with_new_identity()
1361					.with_tokio()
1362					.with_other_transport(|keypair| {
1363						MemoryTransport::new()
1364							.upgrade(upgrade::Version::V1)
1365							.authenticate(noise::Config::new(&keypair).unwrap())
1366							.multiplex(yamux::Config::default())
1367							.boxed()
1368					})
1369					.unwrap()
1370					.with_behaviour(|keypair| {
1371						let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1372						config
1373							.with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
1374							.allow_private_ip(true)
1375							.allow_non_globals_in_dht(true)
1376							.discovery_limit(50)
1377							.with_kademlia(genesis_hash, fork_id, &protocol_id);
1378
1379						config.finish()
1380					})
1381					.unwrap()
1382					.with_swarm_config(|config| {
1383						// This is taken care of by notification protocols in non-test environment
1384						config.with_idle_connection_timeout(Duration::from_secs(10))
1385					})
1386					.build();
1387
1388				let listen_addr: Multiaddr =
1389					format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1390
1391				if i == 0 {
1392					first_swarm_peer_id_and_addr =
1393						Some((*swarm.local_peer_id(), listen_addr.clone()))
1394				}
1395
1396				swarm.listen_on(listen_addr.clone()).unwrap();
1397				(swarm, listen_addr)
1398			})
1399			.collect::<Vec<_>>();
1400
1401		// Build a `Vec<HashSet<PeerId>>` with the list of nodes remaining to be discovered.
1402		let mut to_discover = (0..swarms.len())
1403			.map(|n| {
1404				(0..swarms.len())
1405					// Skip the first swarm as all other swarms already know it.
1406					.skip(1)
1407					.filter(|p| *p != n)
1408					.map(|p| *Swarm::local_peer_id(&swarms[p].0))
1409					.collect::<HashSet<_>>()
1410			})
1411			.collect::<Vec<_>>();
1412
1413		let fut = futures::future::poll_fn(move |cx| {
1414			'polling: loop {
1415				for swarm_n in 0..swarms.len() {
1416					match swarms[swarm_n].0.poll_next_unpin(cx) {
1417						Poll::Ready(Some(e)) => {
1418							match e {
1419								SwarmEvent::Behaviour(behavior) => {
1420									match behavior {
1421										DiscoveryOut::UnroutablePeer(other) |
1422										DiscoveryOut::Discovered(other) => {
1423											// Call `add_self_reported_address` to simulate identify
1424											// happening.
1425											let addr = swarms
1426												.iter()
1427												.find_map(|(s, a)| {
1428													if s.behaviour().local_peer_id == other {
1429														Some(a.clone())
1430													} else {
1431														None
1432													}
1433												})
1434												.unwrap();
1435											// Test both genesis hash-based and legacy
1436											// protocol names.
1437											let protocol_names = if swarm_n % 2 == 0 {
1438												vec![kademlia_protocol_name(genesis_hash, fork_id)]
1439											} else {
1440												vec![
1441													legacy_kademlia_protocol_name(&protocol_id),
1442													kademlia_protocol_name(genesis_hash, fork_id),
1443												]
1444											};
1445											swarms[swarm_n]
1446												.0
1447												.behaviour_mut()
1448												.add_self_reported_address(
1449													&other,
1450													protocol_names.as_slice(),
1451													addr,
1452												);
1453
1454											to_discover[swarm_n].remove(&other);
1455										},
1456										DiscoveryOut::RandomKademliaStarted => {},
1457										DiscoveryOut::ClosestPeersFound(..) => {},
1458										// libp2p emits this event when it is not particularly
1459										// happy, but this doesn't break the discovery.
1460										DiscoveryOut::ClosestPeersNotFound(..) => {},
1461										e => {
1462											panic!("Unexpected event: {:?}", e)
1463										},
1464									}
1465								},
1466								// ignore non Behaviour events
1467								_ => {},
1468							}
1469							continue 'polling;
1470						},
1471						_ => {},
1472					}
1473				}
1474				break;
1475			}
1476
1477			if to_discover.iter().all(|l| l.is_empty()) {
1478				Poll::Ready(())
1479			} else {
1480				Poll::Pending
1481			}
1482		});
1483
1484		fut.await
1485	}
1486
1487	#[test]
1488	fn discovery_ignores_peers_with_unknown_protocols() {
1489		let supported_genesis_hash = H256::from_low_u64_be(1);
1490		let unsupported_genesis_hash = H256::from_low_u64_be(2);
1491		let supported_protocol_id = ProtocolId::from("a");
1492		let unsupported_protocol_id = ProtocolId::from("b");
1493
1494		let mut discovery = {
1495			let keypair = Keypair::generate_ed25519();
1496			let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1497			config
1498				.allow_private_ip(true)
1499				.allow_non_globals_in_dht(true)
1500				.discovery_limit(50)
1501				.with_kademlia(supported_genesis_hash, None, &supported_protocol_id);
1502			config.finish()
1503		};
1504
1505		let predictable_peer_id = |bytes: &[u8; 32]| {
1506			Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id()
1507		};
1508
1509		let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001");
1510		let remote_addr: Multiaddr = "/memory/1".parse().unwrap();
1511		let another_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1512		let another_addr: Multiaddr = "/memory/2".parse().unwrap();
1513
1514		// Try adding remote peers with unsupported protocols.
1515		discovery.add_self_reported_address(
1516			&remote_peer_id,
1517			&[kademlia_protocol_name(unsupported_genesis_hash, None)],
1518			remote_addr.clone(),
1519		);
1520		discovery.add_self_reported_address(
1521			&another_peer_id,
1522			&[legacy_kademlia_protocol_name(&unsupported_protocol_id)],
1523			another_addr.clone(),
1524		);
1525
1526		{
1527			let kademlia = discovery.kademlia.as_mut().unwrap();
1528			assert!(
1529				kademlia
1530					.kbucket(remote_peer_id)
1531					.expect("Remote peer id not to be equal to local peer id.")
1532					.is_empty(),
1533				"Expect peer with unsupported protocol not to be added."
1534			);
1535			assert!(
1536				kademlia
1537					.kbucket(another_peer_id)
1538					.expect("Remote peer id not to be equal to local peer id.")
1539					.is_empty(),
1540				"Expect peer with unsupported protocol not to be added."
1541			);
1542		}
1543
1544		// Add remote peers with supported protocols.
1545		discovery.add_self_reported_address(
1546			&remote_peer_id,
1547			&[kademlia_protocol_name(supported_genesis_hash, None)],
1548			remote_addr.clone(),
1549		);
1550		{
1551			let kademlia = discovery.kademlia.as_mut().unwrap();
1552			assert!(
1553				!kademlia
1554					.kbucket(remote_peer_id)
1555					.expect("Remote peer id not to be equal to local peer id.")
1556					.is_empty(),
1557				"Expect peer with supported protocol to be added."
1558			);
1559		}
1560
1561		let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1562		let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();
1563
1564		// Check the unsupported peer is not present before and after the call.
1565		{
1566			let kademlia = discovery.kademlia.as_mut().unwrap();
1567			assert!(
1568				kademlia
1569					.kbucket(unsupported_peer_id)
1570					.expect("Remote peer id not to be equal to local peer id.")
1571					.is_empty(),
1572				"Expect unsupported peer not to be added."
1573			);
1574		}
1575		// Note: legacy protocol is not supported without genesis hash and fork ID,
1576		// if the legacy is the only protocol supported, then the peer will not be added.
1577		discovery.add_self_reported_address(
1578			&unsupported_peer_id,
1579			&[legacy_kademlia_protocol_name(&supported_protocol_id)],
1580			unsupported_peer_addr.clone(),
1581		);
1582		{
1583			let kademlia = discovery.kademlia.as_mut().unwrap();
1584			assert!(
1585				kademlia
1586					.kbucket(unsupported_peer_id)
1587					.expect("Remote peer id not to be equal to local peer id.")
1588					.is_empty(),
1589				"Expect unsupported peer not to be added."
1590			);
1591		}
1592
1593		// Supported legacy and genesis based protocols are allowed to be added.
1594		discovery.add_self_reported_address(
1595			&another_peer_id,
1596			&[
1597				legacy_kademlia_protocol_name(&supported_protocol_id),
1598				kademlia_protocol_name(supported_genesis_hash, None),
1599			],
1600			another_addr.clone(),
1601		);
1602
1603		{
1604			let kademlia = discovery.kademlia.as_mut().unwrap();
1605			assert_eq!(
1606				2,
1607				kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
1608				"Expect peers with supported protocol to be added."
1609			);
1610			assert!(
1611				!kademlia
1612					.kbucket(another_peer_id)
1613					.expect("Remote peer id not to be equal to local peer id.")
1614					.is_empty(),
1615				"Expect peer with supported protocol to be added."
1616			);
1617		}
1618	}
1619}