Skip to main content

sc_network/
discovery.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Discovery mechanisms of Substrate.
20//!
21//! The `DiscoveryBehaviour` struct implements the `NetworkBehaviour` trait of libp2p and is
22//! responsible for discovering other nodes that are part of the network.
23//!
24//! Substrate uses the following mechanisms in order to discover nodes that are part of the network:
25//!
26//! - Bootstrap nodes. These are hard-coded node identities and addresses passed in the constructor
27//! of the `DiscoveryBehaviour`. You can also call `add_known_address` later to add an entry.
28//!
29//! - mDNS. Discovers nodes on the local network by broadcasting UDP packets.
30//!
31//! - Kademlia random walk. Once connected, we perform random Kademlia `FIND_NODE` requests on the
32//! configured Kademlia DHTs in order for nodes to propagate to us their view of the network. This
33//! is performed automatically by the `DiscoveryBehaviour`.
34//!
35//! Additionally, the `DiscoveryBehaviour` is also capable of storing and loading value in the
36//! configured DHTs.
37//!
38//! ## Usage
39//!
40//! The `DiscoveryBehaviour` generates events of type `DiscoveryOut`, most notably
41//! `DiscoveryOut::Discovered` that is generated whenever we discover a node.
42//! Only the identity of the node is returned. The node's addresses are stored within the
43//! `DiscoveryBehaviour` and can be queried through the `NetworkBehaviour` trait.
44//!
45//! **Important**: In order for the discovery mechanism to work properly, there needs to be an
46//! active mechanism that asks nodes for the addresses they are listening on. Whenever we learn
47//! of a node's address, you must call `add_self_reported_address`.
48
49use crate::{config::ProtocolId, utils::LruHashSet};
50
51use array_bytes::bytes2hex;
52use futures::prelude::*;
53use futures_timer::Delay;
54use ip_network::IpNetwork;
55use libp2p::{
56	core::{transport::PortUse, Endpoint, Multiaddr},
57	kad::{
58		self,
59		store::{MemoryStore, RecordStore},
60		Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
61		Event, GetClosestPeersError, GetClosestPeersOk, GetProvidersError, GetProvidersOk,
62		GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record, RecordKey,
63	},
64	mdns::{self, tokio::Behaviour as TokioMdns},
65	multiaddr::Protocol,
66	swarm::{
67		behaviour::{
68			toggle::{Toggle, ToggleConnectionHandler},
69			DialFailure, ExternalAddrConfirmed, FromSwarm,
70		},
71		ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, THandler,
72		THandlerInEvent, THandlerOutEvent, ToSwarm,
73	},
74	PeerId,
75};
76use linked_hash_set::LinkedHashSet;
77use log::{debug, error, info, trace, warn};
78use sp_core::hexdisplay::HexDisplay;
79use std::{
80	cmp,
81	collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
82	num::NonZeroUsize,
83	task::{Context, Poll},
84	time::{Duration, Instant},
85};
86
87/// Logging target for the file.
88const LOG_TARGET: &str = "sub-libp2p::discovery";
89
90/// Maximum number of known external addresses that we will cache.
91/// This only affects whether we will log whenever we (re-)discover
92/// a given address.
93const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
94
95/// Default value for Kademlia replication factor which  determines to how many closest peers a
96/// record is replicated to.
97pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
98
99/// The minimum number of peers we expect an answer before we terminate the request.
100const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
101
102/// Query timeout for Kademlia requests. We need to increase this for record/provider publishing
103/// to not timeout most of the time.
104const KAD_QUERY_TIMEOUT: Duration = Duration::from_secs(300);
105
106/// `DiscoveryBehaviour` configuration.
107///
108///
109/// Note: In order to discover nodes or load and store values via Kademlia one has to add
110///       Kademlia protocol via [`DiscoveryConfig::with_kademlia`].
111pub struct DiscoveryConfig {
112	local_peer_id: PeerId,
113	permanent_addresses: Vec<(PeerId, Multiaddr)>,
114	dht_random_walk: bool,
115	allow_private_ip: bool,
116	allow_non_globals_in_dht: bool,
117	discovery_only_if_under_num: u64,
118	enable_mdns: bool,
119	kademlia_disjoint_query_paths: bool,
120	kademlia_protocol: Option<StreamProtocol>,
121	kademlia_legacy_protocol: Option<StreamProtocol>,
122	kademlia_replication_factor: NonZeroUsize,
123}
124
125impl DiscoveryConfig {
126	/// Create a default configuration with the given public key.
127	pub fn new(local_peer_id: PeerId) -> Self {
128		Self {
129			local_peer_id,
130			permanent_addresses: Vec::new(),
131			dht_random_walk: true,
132			allow_private_ip: true,
133			allow_non_globals_in_dht: false,
134			discovery_only_if_under_num: std::u64::MAX,
135			enable_mdns: false,
136			kademlia_disjoint_query_paths: false,
137			kademlia_protocol: None,
138			kademlia_legacy_protocol: None,
139			kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
140				.expect("value is a constant; constant is non-zero; qed."),
141		}
142	}
143
144	/// Set the number of active connections at which we pause discovery.
145	pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
146		self.discovery_only_if_under_num = limit;
147		self
148	}
149
150	/// Set custom nodes which never expire, e.g. bootstrap or reserved nodes.
151	pub fn with_permanent_addresses<I>(&mut self, permanent_addresses: I) -> &mut Self
152	where
153		I: IntoIterator<Item = (PeerId, Multiaddr)>,
154	{
155		self.permanent_addresses.extend(permanent_addresses);
156		self
157	}
158
159	/// Whether the discovery behaviour should periodically perform a random
160	/// walk on the DHT to discover peers.
161	pub fn with_dht_random_walk(&mut self, value: bool) -> &mut Self {
162		self.dht_random_walk = value;
163		self
164	}
165
166	/// Should private IPv4/IPv6 addresses be reported?
167	pub fn allow_private_ip(&mut self, value: bool) -> &mut Self {
168		self.allow_private_ip = value;
169		self
170	}
171
172	/// Should non-global addresses be inserted to the DHT?
173	pub fn allow_non_globals_in_dht(&mut self, value: bool) -> &mut Self {
174		self.allow_non_globals_in_dht = value;
175		self
176	}
177
178	/// Should MDNS discovery be supported?
179	pub fn with_mdns(&mut self, value: bool) -> &mut Self {
180		self.enable_mdns = value;
181		self
182	}
183
184	/// Add discovery via Kademlia for the given protocol.
185	///
186	/// Currently accepts `protocol_id`. This should be removed once all the nodes
187	/// are upgraded to genesis hash- and fork ID-based Kademlia protocol name.
188	pub fn with_kademlia<Hash: AsRef<[u8]>>(
189		&mut self,
190		genesis_hash: Hash,
191		fork_id: Option<&str>,
192		protocol_id: &ProtocolId,
193	) -> &mut Self {
194		self.kademlia_protocol = Some(kademlia_protocol_name(genesis_hash, fork_id));
195		self.kademlia_legacy_protocol = Some(legacy_kademlia_protocol_name(protocol_id));
196		self
197	}
198
199	/// Require iterative Kademlia DHT queries to use disjoint paths for increased resiliency in the
200	/// presence of potentially adversarial nodes.
201	pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
202		self.kademlia_disjoint_query_paths = value;
203		self
204	}
205
206	/// Sets Kademlia replication factor.
207	pub fn with_kademlia_replication_factor(&mut self, value: NonZeroUsize) -> &mut Self {
208		self.kademlia_replication_factor = value;
209		self
210	}
211
212	/// Create a `DiscoveryBehaviour` from this config.
213	pub fn finish(self) -> DiscoveryBehaviour {
214		let Self {
215			local_peer_id,
216			permanent_addresses,
217			dht_random_walk,
218			allow_private_ip,
219			allow_non_globals_in_dht,
220			discovery_only_if_under_num,
221			enable_mdns,
222			kademlia_disjoint_query_paths,
223			kademlia_protocol,
224			kademlia_legacy_protocol: _,
225			kademlia_replication_factor,
226		} = self;
227
228		let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
229			let mut config = KademliaConfig::new(kademlia_protocol.clone());
230
231			config.set_replication_factor(kademlia_replication_factor);
232
233			config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
234
235			config.set_query_timeout(KAD_QUERY_TIMEOUT);
236
237			// By default Kademlia attempts to insert all peers into its routing table once a
238			// dialing attempt succeeds. In order to control which peer is added, disable the
239			// auto-insertion and instead add peers manually.
240			config.set_kbucket_inserts(BucketInserts::Manual);
241			config.disjoint_query_paths(kademlia_disjoint_query_paths);
242			let store = MemoryStore::new(local_peer_id);
243			let mut kad = Kademlia::with_config(local_peer_id, store, config);
244			kad.set_mode(Some(kad::Mode::Server));
245
246			for (peer_id, addr) in &permanent_addresses {
247				kad.add_address(peer_id, addr.clone());
248			}
249
250			Some(kad)
251		} else {
252			None
253		};
254
255		DiscoveryBehaviour {
256			permanent_addresses,
257			ephemeral_addresses: HashMap::new(),
258			kademlia: Toggle::from(kademlia),
259			next_kad_random_query: if dht_random_walk {
260				Some(Delay::new(Duration::new(0, 0)))
261			} else {
262				None
263			},
264			duration_to_next_kad: Duration::from_secs(1),
265			pending_events: VecDeque::new(),
266			local_peer_id,
267			num_connections: 0,
268			allow_private_ip,
269			discovery_only_if_under_num,
270			mdns: if enable_mdns {
271				match TokioMdns::new(mdns::Config::default(), local_peer_id) {
272					Ok(mdns) => Toggle::from(Some(mdns)),
273					Err(err) => {
274						warn!(target: LOG_TARGET, "Failed to initialize mDNS: {:?}", err);
275						Toggle::from(None)
276					},
277				}
278			} else {
279				Toggle::from(None)
280			},
281			allow_non_globals_in_dht,
282			known_external_addresses: LruHashSet::new(
283				NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
284					.expect("value is a constant; constant is non-zero; qed."),
285			),
286			records_to_publish: Default::default(),
287			kademlia_protocol,
288			provider_keys_requested: HashMap::new(),
289		}
290	}
291}
292
293/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
294pub struct DiscoveryBehaviour {
295	/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
296	/// reserved nodes.
297	permanent_addresses: Vec<(PeerId, Multiaddr)>,
298	/// Same as `permanent_addresses`, except that addresses that fail to reach a peer are
299	/// removed.
300	ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
301	/// Kademlia requests and answers. Even though it's wrapped in `Toggle`, currently
302	/// it's always enabled in `NetworkWorker::new()`.
303	kademlia: Toggle<Kademlia<MemoryStore>>,
304	/// Discovers nodes on the local network.
305	mdns: Toggle<TokioMdns>,
306	/// Stream that fires when we need to perform the next random Kademlia query. `None` if
307	/// random walking is disabled.
308	next_kad_random_query: Option<Delay>,
309	/// After `next_kad_random_query` triggers, the next one triggers after this duration.
310	duration_to_next_kad: Duration,
311	/// Events to return in priority when polled.
312	pending_events: VecDeque<DiscoveryOut>,
313	/// Identity of our local node.
314	local_peer_id: PeerId,
315	/// Number of nodes we're currently connected to.
316	num_connections: u64,
317	/// If false, `addresses_of_peer` won't return any private IPv4/IPv6 address, except for the
318	/// ones stored in `permanent_addresses` or `ephemeral_addresses`.
319	allow_private_ip: bool,
320	/// Number of active connections over which we interrupt the discovery process.
321	discovery_only_if_under_num: u64,
322	/// Should non-global addresses be added to the DHT?
323	allow_non_globals_in_dht: bool,
324	/// A cache of discovered external addresses. Only used for logging purposes.
325	known_external_addresses: LruHashSet<Multiaddr>,
326	/// Records to publish per QueryId.
327	///
328	/// After finishing a Kademlia query, libp2p will return us a list of the closest peers that
329	/// did not return the record(in `FinishedWithNoAdditionalRecord`). We will then put the record
330	/// to these peers.
331	records_to_publish: HashMap<QueryId, Record>,
332	/// The chain based kademlia protocol name (including genesis hash and fork id).
333	///
334	/// Remove when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
335	/// <https://github.com/paritytech/polkadot-sdk/issues/504>.
336	kademlia_protocol: Option<StreamProtocol>,
337	/// Provider keys requested with `GET_PROVIDERS` queries.
338	provider_keys_requested: HashMap<QueryId, RecordKey>,
339}
340
341impl DiscoveryBehaviour {
342	/// Returns the list of nodes that we know exist in the network.
343	pub fn known_peers(&mut self) -> HashSet<PeerId> {
344		let mut peers = HashSet::new();
345		if let Some(k) = self.kademlia.as_mut() {
346			for b in k.kbuckets() {
347				for e in b.iter() {
348					if !peers.contains(e.node.key.preimage()) {
349						peers.insert(*e.node.key.preimage());
350					}
351				}
352			}
353		}
354		peers
355	}
356
357	/// Adds a hard-coded address for the given peer, that never expires.
358	///
359	/// This adds an entry to the parameter that was passed to `new`.
360	///
361	/// If we didn't know this address before, also generates a `Discovered` event.
362	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
363		let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
364		if addrs_list.contains(&addr) {
365			return
366		}
367
368		if let Some(k) = self.kademlia.as_mut() {
369			k.add_address(&peer_id, addr.clone());
370		}
371
372		self.pending_events.push_back(DiscoveryOut::Discovered(peer_id));
373		addrs_list.push(addr);
374	}
375
376	/// Add a self-reported address of a remote peer to the k-buckets of the DHT
377	/// if it has compatible `supported_protocols`.
378	///
379	/// **Note**: It is important that you call this method. The discovery mechanism will not
380	/// automatically add connecting peers to the Kademlia k-buckets.
381	pub fn add_self_reported_address(
382		&mut self,
383		peer_id: &PeerId,
384		supported_protocols: &[StreamProtocol],
385		addr: Multiaddr,
386	) {
387		if let Some(kademlia) = self.kademlia.as_mut() {
388			if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
389				trace!(
390					target: LOG_TARGET,
391					"Ignoring self-reported non-global address {} from {}.", addr, peer_id
392				);
393				return
394			}
395
396			// The supported protocols must include the chain-based Kademlia protocol.
397			//
398			// Extract the chain-based Kademlia protocol from `kademlia.protocol_name()`
399			// when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
400			// https://github.com/paritytech/polkadot-sdk/issues/504.
401			if !supported_protocols.iter().any(|p| {
402				p == self
403					.kademlia_protocol
404					.as_ref()
405					.expect("kademlia protocol was checked above to be enabled; qed")
406			}) {
407				trace!(
408					target: LOG_TARGET,
409					"Ignoring self-reported address {} from {} as remote node is not part of the \
410					 Kademlia DHT supported by the local node.", addr, peer_id,
411				);
412				return
413			}
414
415			trace!(
416				target: LOG_TARGET,
417				"Adding self-reported address {} from {} to Kademlia DHT.",
418				addr, peer_id
419			);
420			kademlia.add_address(peer_id, addr.clone());
421		}
422	}
423
424	/// Start finding the closest peers to the given `PeerId`.
425	///
426	/// A corresponding `ClosestPeersFound` or `ClosestPeersNotFound` event will later be generated.
427	pub fn find_closest_peers(&mut self, target: PeerId) {
428		if let Some(k) = self.kademlia.as_mut() {
429			k.get_closest_peers(target);
430		}
431	}
432
433	/// Start fetching a record from the DHT.
434	///
435	/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
436	pub fn get_value(&mut self, key: RecordKey) {
437		if let Some(k) = self.kademlia.as_mut() {
438			k.get_record(key.clone());
439		}
440	}
441
442	/// Start putting a record into the DHT. Other nodes can later fetch that value with
443	/// `get_value`.
444	///
445	/// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated.
446	pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
447		if let Some(k) = self.kademlia.as_mut() {
448			if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
449				warn!(target: LOG_TARGET, "Libp2p => Failed to put record: {:?}", e);
450				self.pending_events
451					.push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
452			}
453		}
454	}
455
456	/// Puts a record into the DHT on the provided `peers`
457	///
458	/// If `update_local_storage` is true, the local storage is update as well.
459	pub fn put_record_to(
460		&mut self,
461		record: Record,
462		peers: HashSet<sc_network_types::PeerId>,
463		update_local_storage: bool,
464	) {
465		if let Some(kad) = self.kademlia.as_mut() {
466			if update_local_storage {
467				if let Err(_e) = kad.store_mut().put(record.clone()) {
468					warn!(target: LOG_TARGET, "Failed to update local starage");
469				}
470			}
471
472			if !peers.is_empty() {
473				kad.put_record_to(
474					record,
475					peers.into_iter().map(|peer_id| peer_id.into()),
476					Quorum::All,
477				);
478			}
479		}
480	}
481
482	/// Register as a content provider on the DHT for `key`.
483	pub fn start_providing(&mut self, key: RecordKey) {
484		if let Some(kad) = self.kademlia.as_mut() {
485			if let Err(e) = kad.start_providing(key.clone()) {
486				warn!(target: LOG_TARGET, "Libp2p => Failed to start providing {key:?}: {e}.");
487				self.pending_events
488					.push_back(DiscoveryOut::StartProvidingFailed(key, Duration::from_secs(0)));
489			}
490		}
491	}
492
493	/// Deregister as a content provider on the DHT for `key`.
494	pub fn stop_providing(&mut self, key: &RecordKey) {
495		if let Some(kad) = self.kademlia.as_mut() {
496			kad.stop_providing(key);
497		}
498	}
499
500	/// Get content providers for `key` from the DHT.
501	pub fn get_providers(&mut self, key: RecordKey) {
502		if let Some(kad) = self.kademlia.as_mut() {
503			let query_id = kad.get_providers(key.clone());
504			self.provider_keys_requested.insert(query_id, key);
505		}
506	}
507
508	/// Store a record in the Kademlia record store.
509	pub fn store_record(
510		&mut self,
511		record_key: RecordKey,
512		record_value: Vec<u8>,
513		publisher: Option<PeerId>,
514		expires: Option<Instant>,
515	) {
516		if let Some(k) = self.kademlia.as_mut() {
517			if let Err(err) = k.store_mut().put(Record {
518				key: record_key,
519				value: record_value,
520				publisher: publisher.map(|publisher| publisher.into()),
521				expires,
522			}) {
523				debug!(
524					target: LOG_TARGET,
525					"Failed to store record with key: {:?}",
526					err
527				);
528			}
529		}
530	}
531
532	/// Returns the number of nodes in each Kademlia kbucket for each Kademlia instance.
533	///
534	/// Identifies Kademlia instances by their [`ProtocolId`] and kbuckets by the base 2 logarithm
535	/// of their lower bound.
536	pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
537		self.kademlia.as_mut().map(|kad| {
538			kad.kbuckets()
539				.map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
540				.collect()
541		})
542	}
543
544	/// Returns the number of records in the Kademlia record stores.
545	pub fn num_kademlia_records(&mut self) -> Option<usize> {
546		// Note that this code is ok only because we use a `MemoryStore`.
547		self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
548	}
549
550	/// Returns the total size in bytes of all the records in the Kademlia record stores.
551	pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
552		// Note that this code is ok only because we use a `MemoryStore`. If the records were
553		// for example stored on disk, this would load every single one of them every single time.
554		self.kademlia
555			.as_mut()
556			.map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
557	}
558
559	/// Can the given `Multiaddr` be put into the DHT?
560	///
561	/// This test is successful only for global IP addresses and DNS names.
562	// NB: Currently all DNS names are allowed and no check for TLD suffixes is done
563	// because the set of valid domains is highly dynamic and would require frequent
564	// updates, for example by utilising publicsuffix.org or IANA.
565	pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
566		let ip = match addr.iter().next() {
567			Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
568			Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
569			Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
570				return true,
571			_ => return false,
572		};
573		ip.is_global()
574	}
575}
576
577/// Event generated by the `DiscoveryBehaviour`.
578#[derive(Debug)]
579pub enum DiscoveryOut {
580	/// We discovered a peer and currenlty have it's addresses stored either in the routing
581	/// table or in the ephemeral addresses list, so a connection can be established.
582	Discovered(PeerId),
583
584	/// A peer connected to this node for whom no listen address is known.
585	///
586	/// In order for the peer to be added to the Kademlia routing table, a known
587	/// listen address must be added via
588	/// [`DiscoveryBehaviour::add_self_reported_address`], e.g. obtained through
589	/// the `identify` protocol.
590	UnroutablePeer(PeerId),
591
592	/// `FIND_NODE` query yielded closest peers with their addresses. This event also delivers
593	/// a partial result in case the query timed out, because it can contain the target peer's
594	/// address.
595	ClosestPeersFound(PeerId, Vec<(PeerId, Vec<Multiaddr>)>, Duration),
596
597	/// The closest peers to the target `PeerId` have not been found.
598	ClosestPeersNotFound(PeerId, Duration),
599
600	/// The DHT yielded results for the record request.
601	///
602	/// Returning the result grouped in (key, value) pairs as well as the request duration.
603	ValueFound(PeerRecord, Duration),
604
605	/// The DHT received a put record request.
606	PutRecordRequest(
607		RecordKey,
608		Vec<u8>,
609		Option<sc_network_types::PeerId>,
610		Option<std::time::Instant>,
611	),
612
613	/// The record requested was not found in the DHT.
614	///
615	/// Returning the corresponding key as well as the request duration.
616	ValueNotFound(RecordKey, Duration),
617
618	/// The record with a given key was successfully inserted into the DHT.
619	///
620	/// Returning the corresponding key as well as the request duration.
621	ValuePut(RecordKey, Duration),
622
623	/// Inserting a value into the DHT failed.
624	///
625	/// Returning the corresponding key as well as the request duration.
626	ValuePutFailed(RecordKey, Duration),
627
628	/// The content provider for a given key was successfully published.
629	StartedProviding(RecordKey, Duration),
630
631	/// Starting providing a key failed.
632	StartProvidingFailed(RecordKey, Duration),
633
634	/// The DHT yielded results for the providers request.
635	ProvidersFound(RecordKey, HashSet<PeerId>, Duration),
636
637	/// The DHT yielded no more providers for the key (`GET_PROVIDERS` query finished).
638	NoMoreProviders(RecordKey, Duration),
639
640	/// Providers for the requested key were not found in the DHT.
641	ProvidersNotFound(RecordKey, Duration),
642
643	/// Started a random Kademlia query.
644	///
645	/// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`.
646	RandomKademliaStarted,
647}
648
649impl NetworkBehaviour for DiscoveryBehaviour {
650	type ConnectionHandler =
651		ToggleConnectionHandler<<Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler>;
652	type ToSwarm = DiscoveryOut;
653
654	fn handle_established_inbound_connection(
655		&mut self,
656		connection_id: ConnectionId,
657		peer: PeerId,
658		local_addr: &Multiaddr,
659		remote_addr: &Multiaddr,
660	) -> Result<THandler<Self>, ConnectionDenied> {
661		self.kademlia.handle_established_inbound_connection(
662			connection_id,
663			peer,
664			local_addr,
665			remote_addr,
666		)
667	}
668
669	fn handle_established_outbound_connection(
670		&mut self,
671		connection_id: ConnectionId,
672		peer: PeerId,
673		addr: &Multiaddr,
674		role_override: Endpoint,
675		port_use: PortUse,
676	) -> Result<THandler<Self>, ConnectionDenied> {
677		self.kademlia.handle_established_outbound_connection(
678			connection_id,
679			peer,
680			addr,
681			role_override,
682			port_use,
683		)
684	}
685
686	fn handle_pending_inbound_connection(
687		&mut self,
688		connection_id: ConnectionId,
689		local_addr: &Multiaddr,
690		remote_addr: &Multiaddr,
691	) -> Result<(), ConnectionDenied> {
692		self.kademlia
693			.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
694	}
695
696	fn handle_pending_outbound_connection(
697		&mut self,
698		connection_id: ConnectionId,
699		maybe_peer: Option<PeerId>,
700		addresses: &[Multiaddr],
701		effective_role: Endpoint,
702	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
703		let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
704
705		// Collect addresses into [`LinkedHashSet`] to eliminate duplicate entries preserving the
706		// order of addresses. Give priority to `permanent_addresses` (used with reserved nodes) and
707		// `ephemeral_addresses` (used for addresses discovered from other sources, like authority
708		// discovery DHT records).
709		let mut list: LinkedHashSet<_> = self
710			.permanent_addresses
711			.iter()
712			.filter_map(|(p, a)| (*p == peer_id).then(|| a.clone()))
713			.collect();
714
715		if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
716			ephemeral_addresses.iter().for_each(|address| {
717				list.insert_if_absent(address.clone());
718			});
719		}
720
721		{
722			let mut list_to_filter = self.kademlia.handle_pending_outbound_connection(
723				connection_id,
724				maybe_peer,
725				addresses,
726				effective_role,
727			)?;
728
729			list_to_filter.extend(self.mdns.handle_pending_outbound_connection(
730				connection_id,
731				maybe_peer,
732				addresses,
733				effective_role,
734			)?);
735
736			if !self.allow_private_ip {
737				list_to_filter.retain(|addr| match addr.iter().next() {
738					Some(Protocol::Ip4(addr)) if !IpNetwork::from(addr).is_global() => false,
739					Some(Protocol::Ip6(addr)) if !IpNetwork::from(addr).is_global() => false,
740					_ => true,
741				});
742			}
743
744			list_to_filter.into_iter().for_each(|address| {
745				list.insert_if_absent(address);
746			});
747		}
748
749		trace!(target: LOG_TARGET, "Addresses of {:?}: {:?}", peer_id, list);
750
751		Ok(list.into_iter().collect())
752	}
753
754	fn on_swarm_event(&mut self, event: FromSwarm) {
755		match event {
756			FromSwarm::ConnectionEstablished(e) => {
757				self.num_connections += 1;
758				self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
759			},
760			FromSwarm::ConnectionClosed(e) => {
761				self.num_connections -= 1;
762				self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
763			},
764			FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
765				if let Some(peer_id) = peer_id {
766					if let DialError::Transport(errors) = error {
767						if let Entry::Occupied(mut entry) = self.ephemeral_addresses.entry(peer_id)
768						{
769							for (addr, _error) in errors {
770								entry.get_mut().retain(|a| a != addr);
771							}
772							if entry.get().is_empty() {
773								entry.remove();
774							}
775						}
776					}
777				}
778
779				self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
780			},
781			FromSwarm::ListenerClosed(e) => {
782				self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
783			},
784			FromSwarm::ListenFailure(e) => {
785				self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
786			},
787			FromSwarm::ListenerError(e) => {
788				self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
789			},
790			FromSwarm::ExternalAddrExpired(e) => {
791				// We intentionally don't remove the element from `known_external_addresses` in
792				// order to not print the log line again.
793
794				self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
795			},
796			FromSwarm::NewListener(e) => {
797				self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
798			},
799			FromSwarm::ExpiredListenAddr(e) => {
800				self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
801			},
802			FromSwarm::NewExternalAddrCandidate(e) => {
803				self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
804			},
805			FromSwarm::AddressChange(e) => {
806				self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
807			},
808			FromSwarm::NewListenAddr(e) => {
809				self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
810				self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
811			},
812			FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
813				let mut address = addr.clone();
814
815				if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
816					if peer_id != self.local_peer_id {
817						warn!(
818							target: LOG_TARGET,
819							"🔍 Discovered external address for a peer that is not us: {addr}",
820						);
821						// Ensure this address is not propagated to kademlia.
822						return
823					}
824				} else {
825					address.push(Protocol::P2p(self.local_peer_id));
826				}
827
828				if Self::can_add_to_dht(&address) {
829					// NOTE: we might re-discover the same address multiple times
830					// in which case we just want to refrain from logging.
831					if self.known_external_addresses.insert(address.clone()) {
832						info!(
833						  target: LOG_TARGET,
834						  "🔍 Discovered new external address for our node: {address}",
835						);
836					}
837				}
838
839				self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
840			},
841			FromSwarm::NewExternalAddrOfPeer(e) => {
842				self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
843				self.mdns.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
844			},
845			event => {
846				debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
847				self.kademlia.on_swarm_event(event);
848				self.mdns.on_swarm_event(event);
849			},
850		}
851	}
852
853	fn on_connection_handler_event(
854		&mut self,
855		peer_id: PeerId,
856		connection_id: ConnectionId,
857		event: THandlerOutEvent<Self>,
858	) {
859		self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
860	}
861
862	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
863		// Immediately process the content of `discovered`.
864		if let Some(ev) = self.pending_events.pop_front() {
865			return Poll::Ready(ToSwarm::GenerateEvent(ev))
866		}
867
868		// Poll the stream that fires when we need to start a random Kademlia query.
869		if let Some(kademlia) = self.kademlia.as_mut() {
870			if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
871				while next_kad_random_query.poll_unpin(cx).is_ready() {
872					let actually_started =
873						if self.num_connections < self.discovery_only_if_under_num {
874							let random_peer_id = PeerId::random();
875							debug!(
876								target: LOG_TARGET,
877								"Libp2p <= Starting random Kademlia request for {:?}",
878								random_peer_id,
879							);
880							kademlia.get_closest_peers(random_peer_id);
881							true
882						} else {
883							debug!(
884								target: LOG_TARGET,
885								"Kademlia paused due to high number of connections ({})",
886								self.num_connections
887							);
888							false
889						};
890
891					// Schedule the next random query with exponentially increasing delay,
892					// capped at 60 seconds.
893					*next_kad_random_query = Delay::new(self.duration_to_next_kad);
894					self.duration_to_next_kad =
895						cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
896
897					if actually_started {
898						let ev = DiscoveryOut::RandomKademliaStarted;
899						return Poll::Ready(ToSwarm::GenerateEvent(ev))
900					}
901				}
902			}
903		}
904
905		while let Poll::Ready(ev) = self.kademlia.poll(cx) {
906			match ev {
907				ToSwarm::GenerateEvent(ev) => match ev {
908					KademliaEvent::RoutingUpdated { peer, .. } => {
909						let ev = DiscoveryOut::Discovered(peer);
910						return Poll::Ready(ToSwarm::GenerateEvent(ev))
911					},
912					KademliaEvent::UnroutablePeer { peer, .. } => {
913						let ev = DiscoveryOut::UnroutablePeer(peer);
914						return Poll::Ready(ToSwarm::GenerateEvent(ev))
915					},
916					KademliaEvent::RoutablePeer { .. } => {
917						// Generate nothing, because the address was not added to the routing table,
918						// so we will not be able to connect to the peer.
919					},
920					KademliaEvent::PendingRoutablePeer { .. } => {
921						// We are not interested in this event at the moment.
922					},
923					KademliaEvent::InboundRequest { request } => match request {
924						libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } =>
925							return Poll::Ready(ToSwarm::GenerateEvent(
926								DiscoveryOut::PutRecordRequest(
927									record.key,
928									record.value,
929									record.publisher.map(Into::into),
930									record.expires,
931								),
932							)),
933						_ => {},
934					},
935					KademliaEvent::OutboundQueryProgressed {
936						result: QueryResult::GetClosestPeers(res),
937						stats,
938						..
939					} => {
940						let (key, peers, timeout) = match res {
941							Ok(GetClosestPeersOk { key, peers }) => (key, peers, false),
942							Err(GetClosestPeersError::Timeout { key, peers }) => (key, peers, true),
943						};
944
945						let target = match PeerId::from_bytes(&key.clone()) {
946							Ok(peer_id) => peer_id,
947							Err(_) => {
948								warn!(
949									target: LOG_TARGET,
950									"Libp2p => FIND_NODE query finished for target that is not \
951									 a peer ID: {:?}",
952									HexDisplay::from(&key),
953								);
954								continue
955							},
956						};
957
958						if timeout {
959							debug!(
960								target: LOG_TARGET,
961								"Libp2p => Query for target {target:?} timed out and yielded {} peers",
962								peers.len(),
963							);
964						} else {
965							debug!(
966								target: LOG_TARGET,
967								"Libp2p => Query for target {target:?} yielded {} peers",
968								peers.len(),
969							);
970						}
971
972						let ev = if peers.is_empty() {
973							DiscoveryOut::ClosestPeersNotFound(
974								target,
975								stats.duration().unwrap_or_default(),
976							)
977						} else {
978							DiscoveryOut::ClosestPeersFound(
979								target,
980								peers.into_iter().map(|p| (p.peer_id, p.addrs)).collect(),
981								stats.duration().unwrap_or_default(),
982							)
983						};
984
985						return Poll::Ready(ToSwarm::GenerateEvent(ev))
986					},
987					KademliaEvent::OutboundQueryProgressed {
988						result: QueryResult::GetRecord(res),
989						stats,
990						id,
991						..
992					} => {
993						let ev = match res {
994							Ok(GetRecordOk::FoundRecord(r)) => {
995								debug!(
996									target: LOG_TARGET,
997									"Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
998									r.record.key,
999									r.record.value,
1000									id,
1001									stats,
1002								);
1003
1004								// Let's directly finish the query if we are above 4.
1005								// This number is small enough to make sure we don't
1006								// unnecessarily flood the network with queries, but high
1007								// enough to make sure we also touch peers which might have
1008								// old record, so that we can update them once we notice
1009								// they have old records.
1010								if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
1011									if let Some(kad) = self.kademlia.as_mut() {
1012										if let Some(mut query) = kad.query_mut(&id) {
1013											query.finish();
1014										}
1015									}
1016								}
1017
1018								// Will be removed below when we receive
1019								// `FinishedWithNoAdditionalRecord`.
1020								self.records_to_publish.insert(id, r.record.clone());
1021
1022								DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
1023							},
1024							Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
1025								cache_candidates,
1026							}) => {
1027								debug!(
1028									target: LOG_TARGET,
1029									"Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
1030									id,
1031									stats,
1032									stats.duration().map(|val| val.as_millis())
1033								);
1034								// We always need to remove the record to not leak any data!
1035								if let Some(record) = self.records_to_publish.remove(&id) {
1036									if cache_candidates.is_empty() {
1037										continue
1038									}
1039
1040									// Put the record to the `cache_candidates` that are nearest to
1041									// the record key from our point of view of the network.
1042									if let Some(kad) = self.kademlia.as_mut() {
1043										kad.put_record_to(
1044											record,
1045											cache_candidates.into_iter().map(|v| v.1),
1046											Quorum::One,
1047										);
1048									}
1049								}
1050
1051								continue
1052							},
1053							Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
1054								trace!(
1055									target: LOG_TARGET,
1056									"Libp2p => Failed to get record: {:?}",
1057									e,
1058								);
1059								DiscoveryOut::ValueNotFound(
1060									e.into_key(),
1061									stats.duration().unwrap_or_default(),
1062								)
1063							},
1064							Err(e) => {
1065								debug!(
1066									target: LOG_TARGET,
1067									"Libp2p => Failed to get record: {:?}",
1068									e,
1069								);
1070								DiscoveryOut::ValueNotFound(
1071									e.into_key(),
1072									stats.duration().unwrap_or_default(),
1073								)
1074							},
1075						};
1076						return Poll::Ready(ToSwarm::GenerateEvent(ev))
1077					},
1078					KademliaEvent::OutboundQueryProgressed {
1079						result: QueryResult::GetProviders(res),
1080						stats,
1081						id,
1082						..
1083					} => {
1084						let ev = match res {
1085							Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1086								debug!(
1087									target: LOG_TARGET,
1088									"Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}",
1089									providers,
1090									key,
1091									id,
1092									stats,
1093								);
1094
1095								DiscoveryOut::ProvidersFound(
1096									key,
1097									providers,
1098									stats.duration().unwrap_or_default(),
1099								)
1100							},
1101							Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
1102								closest_peers: _,
1103							}) => {
1104								debug!(
1105									target: LOG_TARGET,
1106									"Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms",
1107									id,
1108									stats,
1109									stats.duration().map(|val| val.as_millis())
1110								);
1111
1112								if let Some(key) = self.provider_keys_requested.remove(&id) {
1113									DiscoveryOut::NoMoreProviders(
1114										key,
1115										stats.duration().unwrap_or_default(),
1116									)
1117								} else {
1118									error!(
1119										target: LOG_TARGET,
1120										"No key found for `GET_PROVIDERS` query {id:?}. This is a bug.",
1121									);
1122									continue
1123								}
1124							},
1125							Err(GetProvidersError::Timeout { key, closest_peers: _ }) => {
1126								debug!(
1127									target: LOG_TARGET,
1128									"Libp2p => Failed to get providers for {key:?} due to timeout.",
1129								);
1130
1131								self.provider_keys_requested.remove(&id);
1132
1133								DiscoveryOut::ProvidersNotFound(
1134									key,
1135									stats.duration().unwrap_or_default(),
1136								)
1137							},
1138						};
1139						return Poll::Ready(ToSwarm::GenerateEvent(ev))
1140					},
1141					KademliaEvent::OutboundQueryProgressed {
1142						result: QueryResult::PutRecord(res),
1143						stats,
1144						..
1145					} => {
1146						let ev = match res {
1147							Ok(ok) => {
1148								trace!(
1149									target: LOG_TARGET,
1150									"Libp2p => Put record for key: {:?}",
1151									ok.key,
1152								);
1153								DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default())
1154							},
1155							Err(e) => {
1156								debug!(
1157									target: LOG_TARGET,
1158									"Libp2p => Failed to put record for key {:?}: {:?}",
1159									e.key(),
1160									e,
1161								);
1162								DiscoveryOut::ValuePutFailed(
1163									e.into_key(),
1164									stats.duration().unwrap_or_default(),
1165								)
1166							},
1167						};
1168						return Poll::Ready(ToSwarm::GenerateEvent(ev))
1169					},
1170					KademliaEvent::OutboundQueryProgressed {
1171						result: QueryResult::RepublishRecord(res),
1172						..
1173					} => match res {
1174						Ok(ok) => debug!(
1175							target: LOG_TARGET,
1176							"Libp2p => Record republished: {:?}",
1177							ok.key,
1178						),
1179						Err(e) => debug!(
1180							target: LOG_TARGET,
1181							"Libp2p => Republishing of record {:?} failed with: {:?}",
1182							e.key(), e,
1183						),
1184					},
1185					KademliaEvent::OutboundQueryProgressed {
1186						result: QueryResult::StartProviding(res),
1187						stats,
1188						..
1189					} => {
1190						let ev = match res {
1191							Ok(ok) => {
1192								trace!(
1193									target: LOG_TARGET,
1194									"Libp2p => Started providing key {:?}",
1195									ok.key,
1196								);
1197								DiscoveryOut::StartedProviding(
1198									ok.key,
1199									stats.duration().unwrap_or_default(),
1200								)
1201							},
1202							Err(e) => {
1203								debug!(
1204									target: LOG_TARGET,
1205									"Libp2p => Failed to start providing key {:?}: {:?}",
1206									e.key(),
1207									e,
1208								);
1209								DiscoveryOut::StartProvidingFailed(
1210									e.into_key(),
1211									stats.duration().unwrap_or_default(),
1212								)
1213							},
1214						};
1215						return Poll::Ready(ToSwarm::GenerateEvent(ev))
1216					},
1217					KademliaEvent::OutboundQueryProgressed {
1218						result: QueryResult::Bootstrap(res),
1219						..
1220					} => match res {
1221						Ok(ok) => debug!(
1222							target: LOG_TARGET,
1223							"Libp2p => DHT bootstrap progressed: {ok:?}",
1224						),
1225						Err(e) => warn!(
1226							target: LOG_TARGET,
1227							"Libp2p => DHT bootstrap error: {e:?}",
1228						),
1229					},
1230					// We never start any other type of query.
1231					KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
1232						warn!(target: LOG_TARGET, "Libp2p => Unhandled Kademlia event: {:?}", e)
1233					},
1234					Event::ModeChanged { new_mode } => {
1235						debug!(target: LOG_TARGET, "Libp2p => Kademlia mode changed: {new_mode}")
1236					},
1237				},
1238				ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
1239				event => {
1240					return Poll::Ready(event.map_out(|_| {
1241						unreachable!("`GenerateEvent` is handled in a branch above; qed")
1242					}));
1243				},
1244			}
1245		}
1246
1247		// Poll mDNS.
1248		while let Poll::Ready(ev) = self.mdns.poll(cx) {
1249			match ev {
1250				ToSwarm::GenerateEvent(event) => match event {
1251					mdns::Event::Discovered(list) => {
1252						if self.num_connections >= self.discovery_only_if_under_num {
1253							continue
1254						}
1255
1256						self.pending_events.extend(
1257							list.into_iter().map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)),
1258						);
1259						if let Some(ev) = self.pending_events.pop_front() {
1260							return Poll::Ready(ToSwarm::GenerateEvent(ev))
1261						}
1262					},
1263					mdns::Event::Expired(_) => {},
1264				},
1265				ToSwarm::Dial { .. } => {
1266					unreachable!("mDNS never dials!");
1267				},
1268				// `event` is an enum with no variant
1269				ToSwarm::NotifyHandler { event, .. } => match event {},
1270				event => {
1271					return Poll::Ready(
1272						event
1273							.map_in(|_| {
1274								unreachable!("`NotifyHandler` is handled in a branch above; qed")
1275							})
1276							.map_out(|_| {
1277								unreachable!("`GenerateEvent` is handled in a branch above; qed")
1278							}),
1279					);
1280				},
1281			}
1282		}
1283
1284		Poll::Pending
1285	}
1286}
1287
1288/// Legacy (fallback) Kademlia protocol name based on `protocol_id`.
1289fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol {
1290	let name = format!("/{}/kad", id.as_ref());
1291	StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1292}
1293
1294/// Kademlia protocol name based on `genesis_hash` and `fork_id`.
1295fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
1296	genesis_hash: Hash,
1297	fork_id: Option<&str>,
1298) -> StreamProtocol {
1299	let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
1300	let name = if let Some(fork_id) = fork_id {
1301		format!("/{genesis_hash_hex}/{fork_id}/kad")
1302	} else {
1303		format!("/{genesis_hash_hex}/kad")
1304	};
1305
1306	StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1307}
1308
1309#[cfg(test)]
1310mod tests {
1311	use super::{kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig};
1312	use crate::config::ProtocolId;
1313	use libp2p::{identity::Keypair, Multiaddr};
1314	use sp_core::hash::H256;
1315
1316	#[cfg(ignore_flaky_test)] // https://github.com/paritytech/polkadot-sdk/issues/48
1317	#[tokio::test]
1318	async fn discovery_working() {
1319		use super::DiscoveryOut;
1320		use futures::prelude::*;
1321		use libp2p::{
1322			core::{
1323				transport::{MemoryTransport, Transport},
1324				upgrade,
1325			},
1326			noise,
1327			swarm::{Swarm, SwarmEvent},
1328			yamux,
1329		};
1330		use std::{collections::HashSet, task::Poll, time::Duration};
1331		let mut first_swarm_peer_id_and_addr = None;
1332
1333		let genesis_hash = H256::from_low_u64_be(1);
1334		let fork_id = Some("test-fork-id");
1335		let protocol_id = ProtocolId::from("dot");
1336
1337		// Build swarms whose behaviour is `DiscoveryBehaviour`, each aware of
1338		// the first swarm via `with_permanent_addresses`.
1339		let mut swarms = (0..25)
1340			.map(|i| {
1341				let mut swarm = libp2p::SwarmBuilder::with_new_identity()
1342					.with_tokio()
1343					.with_other_transport(|keypair| {
1344						MemoryTransport::new()
1345							.upgrade(upgrade::Version::V1)
1346							.authenticate(noise::Config::new(&keypair).unwrap())
1347							.multiplex(yamux::Config::default())
1348							.boxed()
1349					})
1350					.unwrap()
1351					.with_behaviour(|keypair| {
1352						let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1353						config
1354							.with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
1355							.allow_private_ip(true)
1356							.allow_non_globals_in_dht(true)
1357							.discovery_limit(50)
1358							.with_kademlia(genesis_hash, fork_id, &protocol_id);
1359
1360						config.finish()
1361					})
1362					.unwrap()
1363					.with_swarm_config(|config| {
1364						// This is taken care of by notification protocols in non-test environment
1365						config.with_idle_connection_timeout(Duration::from_secs(10))
1366					})
1367					.build();
1368
1369				let listen_addr: Multiaddr =
1370					format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1371
1372				if i == 0 {
1373					first_swarm_peer_id_and_addr =
1374						Some((*swarm.local_peer_id(), listen_addr.clone()))
1375				}
1376
1377				swarm.listen_on(listen_addr.clone()).unwrap();
1378				(swarm, listen_addr)
1379			})
1380			.collect::<Vec<_>>();
1381
1382		// Build a `Vec<HashSet<PeerId>>` with the list of nodes remaining to be discovered.
1383		let mut to_discover = (0..swarms.len())
1384			.map(|n| {
1385				(0..swarms.len())
1386					// Skip the first swarm as all other swarms already know it.
1387					.skip(1)
1388					.filter(|p| *p != n)
1389					.map(|p| *Swarm::local_peer_id(&swarms[p].0))
1390					.collect::<HashSet<_>>()
1391			})
1392			.collect::<Vec<_>>();
1393
1394		let fut = futures::future::poll_fn(move |cx| {
1395			'polling: loop {
1396				for swarm_n in 0..swarms.len() {
1397					match swarms[swarm_n].0.poll_next_unpin(cx) {
1398						Poll::Ready(Some(e)) => {
1399							match e {
1400								SwarmEvent::Behaviour(behavior) => {
1401									match behavior {
1402										DiscoveryOut::UnroutablePeer(other) |
1403										DiscoveryOut::Discovered(other) => {
1404											// Call `add_self_reported_address` to simulate identify
1405											// happening.
1406											let addr = swarms
1407												.iter()
1408												.find_map(|(s, a)| {
1409													if s.behaviour().local_peer_id == other {
1410														Some(a.clone())
1411													} else {
1412														None
1413													}
1414												})
1415												.unwrap();
1416											// Test both genesis hash-based and legacy
1417											// protocol names.
1418											let protocol_names = if swarm_n % 2 == 0 {
1419												vec![kademlia_protocol_name(genesis_hash, fork_id)]
1420											} else {
1421												vec![
1422													legacy_kademlia_protocol_name(&protocol_id),
1423													kademlia_protocol_name(genesis_hash, fork_id),
1424												]
1425											};
1426											swarms[swarm_n]
1427												.0
1428												.behaviour_mut()
1429												.add_self_reported_address(
1430													&other,
1431													protocol_names.as_slice(),
1432													addr,
1433												);
1434
1435											to_discover[swarm_n].remove(&other);
1436										},
1437										DiscoveryOut::RandomKademliaStarted => {},
1438										DiscoveryOut::ClosestPeersFound(..) => {},
1439										// libp2p emits this event when it is not particularly
1440										// happy, but this doesn't break the discovery.
1441										DiscoveryOut::ClosestPeersNotFound(..) => {},
1442										e => {
1443											panic!("Unexpected event: {:?}", e)
1444										},
1445									}
1446								},
1447								// ignore non Behaviour events
1448								_ => {},
1449							}
1450							continue 'polling
1451						},
1452						_ => {},
1453					}
1454				}
1455				break
1456			}
1457
1458			if to_discover.iter().all(|l| l.is_empty()) {
1459				Poll::Ready(())
1460			} else {
1461				Poll::Pending
1462			}
1463		});
1464
1465		fut.await
1466	}
1467
1468	#[test]
1469	fn discovery_ignores_peers_with_unknown_protocols() {
1470		let supported_genesis_hash = H256::from_low_u64_be(1);
1471		let unsupported_genesis_hash = H256::from_low_u64_be(2);
1472		let supported_protocol_id = ProtocolId::from("a");
1473		let unsupported_protocol_id = ProtocolId::from("b");
1474
1475		let mut discovery = {
1476			let keypair = Keypair::generate_ed25519();
1477			let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1478			config
1479				.allow_private_ip(true)
1480				.allow_non_globals_in_dht(true)
1481				.discovery_limit(50)
1482				.with_kademlia(supported_genesis_hash, None, &supported_protocol_id);
1483			config.finish()
1484		};
1485
1486		let predictable_peer_id = |bytes: &[u8; 32]| {
1487			Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id()
1488		};
1489
1490		let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001");
1491		let remote_addr: Multiaddr = "/memory/1".parse().unwrap();
1492		let another_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1493		let another_addr: Multiaddr = "/memory/2".parse().unwrap();
1494
1495		// Try adding remote peers with unsupported protocols.
1496		discovery.add_self_reported_address(
1497			&remote_peer_id,
1498			&[kademlia_protocol_name(unsupported_genesis_hash, None)],
1499			remote_addr.clone(),
1500		);
1501		discovery.add_self_reported_address(
1502			&another_peer_id,
1503			&[legacy_kademlia_protocol_name(&unsupported_protocol_id)],
1504			another_addr.clone(),
1505		);
1506
1507		{
1508			let kademlia = discovery.kademlia.as_mut().unwrap();
1509			assert!(
1510				kademlia
1511					.kbucket(remote_peer_id)
1512					.expect("Remote peer id not to be equal to local peer id.")
1513					.is_empty(),
1514				"Expect peer with unsupported protocol not to be added."
1515			);
1516			assert!(
1517				kademlia
1518					.kbucket(another_peer_id)
1519					.expect("Remote peer id not to be equal to local peer id.")
1520					.is_empty(),
1521				"Expect peer with unsupported protocol not to be added."
1522			);
1523		}
1524
1525		// Add remote peers with supported protocols.
1526		discovery.add_self_reported_address(
1527			&remote_peer_id,
1528			&[kademlia_protocol_name(supported_genesis_hash, None)],
1529			remote_addr.clone(),
1530		);
1531		{
1532			let kademlia = discovery.kademlia.as_mut().unwrap();
1533			assert!(
1534				!kademlia
1535					.kbucket(remote_peer_id)
1536					.expect("Remote peer id not to be equal to local peer id.")
1537					.is_empty(),
1538				"Expect peer with supported protocol to be added."
1539			);
1540		}
1541
1542		let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1543		let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();
1544
1545		// Check the unsupported peer is not present before and after the call.
1546		{
1547			let kademlia = discovery.kademlia.as_mut().unwrap();
1548			assert!(
1549				kademlia
1550					.kbucket(unsupported_peer_id)
1551					.expect("Remote peer id not to be equal to local peer id.")
1552					.is_empty(),
1553				"Expect unsupported peer not to be added."
1554			);
1555		}
1556		// Note: legacy protocol is not supported without genesis hash and fork ID,
1557		// if the legacy is the only protocol supported, then the peer will not be added.
1558		discovery.add_self_reported_address(
1559			&unsupported_peer_id,
1560			&[legacy_kademlia_protocol_name(&supported_protocol_id)],
1561			unsupported_peer_addr.clone(),
1562		);
1563		{
1564			let kademlia = discovery.kademlia.as_mut().unwrap();
1565			assert!(
1566				kademlia
1567					.kbucket(unsupported_peer_id)
1568					.expect("Remote peer id not to be equal to local peer id.")
1569					.is_empty(),
1570				"Expect unsupported peer not to be added."
1571			);
1572		}
1573
1574		// Supported legacy and genesis based protocols are allowed to be added.
1575		discovery.add_self_reported_address(
1576			&another_peer_id,
1577			&[
1578				legacy_kademlia_protocol_name(&supported_protocol_id),
1579				kademlia_protocol_name(supported_genesis_hash, None),
1580			],
1581			another_addr.clone(),
1582		);
1583
1584		{
1585			let kademlia = discovery.kademlia.as_mut().unwrap();
1586			assert_eq!(
1587				2,
1588				kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
1589				"Expect peers with supported protocol to be added."
1590			);
1591			assert!(
1592				!kademlia
1593					.kbucket(another_peer_id)
1594					.expect("Remote peer id not to be equal to local peer id.")
1595					.is_empty(),
1596				"Expect peer with supported protocol to be added."
1597			);
1598		}
1599	}
1600}