Skip to main content

soil_network/
discovery.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Discovery mechanisms of Substrate.
8//!
9//! The `DiscoveryBehaviour` struct implements the `NetworkBehaviour` trait of libp2p and is
10//! responsible for discovering other nodes that are part of the network.
11//!
12//! Substrate uses the following mechanisms in order to discover nodes that are part of the network:
13//!
14//! - Bootstrap nodes. These are hard-coded node identities and addresses passed in the constructor
15//! of the `DiscoveryBehaviour`. You can also call `add_known_address` later to add an entry.
16//!
17//! - mDNS. Discovers nodes on the local network by broadcasting UDP packets.
18//!
19//! - Kademlia random walk. Once connected, we perform random Kademlia `FIND_NODE` requests on the
20//! configured Kademlia DHTs in order for nodes to propagate to us their view of the network. This
21//! is performed automatically by the `DiscoveryBehaviour`.
22//!
23//! Additionally, the `DiscoveryBehaviour` is also capable of storing and loading value in the
24//! configured DHTs.
25//!
26//! ## Usage
27//!
28//! The `DiscoveryBehaviour` generates events of type `DiscoveryOut`, most notably
29//! `DiscoveryOut::Discovered` that is generated whenever we discover a node.
30//! Only the identity of the node is returned. The node's addresses are stored within the
31//! `DiscoveryBehaviour` and can be queried through the `NetworkBehaviour` trait.
32//!
33//! **Important**: In order for the discovery mechanism to work properly, there needs to be an
34//! active mechanism that asks nodes for the addresses they are listening on. Whenever we learn
35//! of a node's address, you must call `add_self_reported_address`.
36
37use crate::{
38	config::{
39		ProtocolId, KADEMLIA_MAX_PROVIDER_KEYS, KADEMLIA_PROVIDER_RECORD_TTL,
40		KADEMLIA_PROVIDER_REPUBLISH_INTERVAL,
41	},
42	utils::LruHashSet,
43};
44
45use array_bytes::bytes2hex;
46use futures::prelude::*;
47use futures_timer::Delay;
48use ip_network::IpNetwork;
49use libp2p::{
50	core::{transport::PortUse, Endpoint, Multiaddr},
51	kad::{
52		self,
53		store::{MemoryStore, MemoryStoreConfig, RecordStore},
54		Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
55		Event, GetClosestPeersError, GetClosestPeersOk, GetProvidersError, GetProvidersOk,
56		GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record, RecordKey,
57	},
58	mdns::{self, tokio::Behaviour as TokioMdns},
59	multiaddr::Protocol,
60	swarm::{
61		behaviour::{
62			toggle::{Toggle, ToggleConnectionHandler},
63			DialFailure, ExternalAddrConfirmed, FromSwarm,
64		},
65		ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, THandler,
66		THandlerInEvent, THandlerOutEvent, ToSwarm,
67	},
68	PeerId,
69};
70use linked_hash_set::LinkedHashSet;
71use log::{debug, error, info, trace, warn};
72use std::{
73	cmp,
74	collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
75	num::NonZeroUsize,
76	task::{Context, Poll},
77	time::{Duration, Instant},
78};
79use subsoil::core::hexdisplay::HexDisplay;
80
81/// Logging target for the file.
82const LOG_TARGET: &str = "sub-libp2p::discovery";
83
84/// Maximum number of known external addresses that we will cache.
85/// This only affects whether we will log whenever we (re-)discover
86/// a given address.
87const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
88
89/// Default value for Kademlia replication factor which  determines to how many closest peers a
90/// record is replicated to.
91pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
92
93/// The minimum number of peers we expect an answer before we terminate the request.
94const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
95
96/// Query timeout for Kademlia requests. We need to increase this for record/provider publishing
97/// to not timeout most of the time.
98const KAD_QUERY_TIMEOUT: Duration = Duration::from_secs(300);
99
100/// `DiscoveryBehaviour` configuration.
101///
102///
103/// Note: In order to discover nodes or load and store values via Kademlia one has to add
104///       Kademlia protocol via [`DiscoveryConfig::with_kademlia`].
105pub struct DiscoveryConfig {
106	local_peer_id: PeerId,
107	permanent_addresses: Vec<(PeerId, Multiaddr)>,
108	dht_random_walk: bool,
109	allow_private_ip: bool,
110	allow_non_globals_in_dht: bool,
111	discovery_only_if_under_num: u64,
112	enable_mdns: bool,
113	kademlia_disjoint_query_paths: bool,
114	kademlia_protocol: Option<StreamProtocol>,
115	kademlia_legacy_protocol: Option<StreamProtocol>,
116	kademlia_replication_factor: NonZeroUsize,
117}
118
119impl DiscoveryConfig {
120	/// Create a default configuration with the given public key.
121	pub fn new(local_peer_id: PeerId) -> Self {
122		Self {
123			local_peer_id,
124			permanent_addresses: Vec::new(),
125			dht_random_walk: true,
126			allow_private_ip: true,
127			allow_non_globals_in_dht: false,
128			discovery_only_if_under_num: std::u64::MAX,
129			enable_mdns: false,
130			kademlia_disjoint_query_paths: false,
131			kademlia_protocol: None,
132			kademlia_legacy_protocol: None,
133			kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
134				.expect("value is a constant; constant is non-zero; qed."),
135		}
136	}
137
138	/// Set the number of active connections at which we pause discovery.
139	pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
140		self.discovery_only_if_under_num = limit;
141		self
142	}
143
144	/// Set custom nodes which never expire, e.g. bootstrap or reserved nodes.
145	pub fn with_permanent_addresses<I>(&mut self, permanent_addresses: I) -> &mut Self
146	where
147		I: IntoIterator<Item = (PeerId, Multiaddr)>,
148	{
149		self.permanent_addresses.extend(permanent_addresses);
150		self
151	}
152
153	/// Whether the discovery behaviour should periodically perform a random
154	/// walk on the DHT to discover peers.
155	pub fn with_dht_random_walk(&mut self, value: bool) -> &mut Self {
156		self.dht_random_walk = value;
157		self
158	}
159
160	/// Should private IPv4/IPv6 addresses be reported?
161	pub fn allow_private_ip(&mut self, value: bool) -> &mut Self {
162		self.allow_private_ip = value;
163		self
164	}
165
166	/// Should non-global addresses be inserted to the DHT?
167	pub fn allow_non_globals_in_dht(&mut self, value: bool) -> &mut Self {
168		self.allow_non_globals_in_dht = value;
169		self
170	}
171
172	/// Should MDNS discovery be supported?
173	pub fn with_mdns(&mut self, value: bool) -> &mut Self {
174		self.enable_mdns = value;
175		self
176	}
177
178	/// Add discovery via Kademlia for the given protocol.
179	///
180	/// Currently accepts `protocol_id`. This should be removed once all the nodes
181	/// are upgraded to genesis hash- and fork ID-based Kademlia protocol name.
182	pub fn with_kademlia<Hash: AsRef<[u8]>>(
183		&mut self,
184		genesis_hash: Hash,
185		fork_id: Option<&str>,
186		protocol_id: &ProtocolId,
187	) -> &mut Self {
188		self.kademlia_protocol = Some(kademlia_protocol_name(genesis_hash, fork_id));
189		self.kademlia_legacy_protocol = Some(legacy_kademlia_protocol_name(protocol_id));
190		self
191	}
192
193	/// Require iterative Kademlia DHT queries to use disjoint paths for increased resiliency in the
194	/// presence of potentially adversarial nodes.
195	pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
196		self.kademlia_disjoint_query_paths = value;
197		self
198	}
199
200	/// Sets Kademlia replication factor.
201	pub fn with_kademlia_replication_factor(&mut self, value: NonZeroUsize) -> &mut Self {
202		self.kademlia_replication_factor = value;
203		self
204	}
205
206	/// Create a `DiscoveryBehaviour` from this config.
207	pub fn finish(self) -> DiscoveryBehaviour {
208		let Self {
209			local_peer_id,
210			permanent_addresses,
211			dht_random_walk,
212			allow_private_ip,
213			allow_non_globals_in_dht,
214			discovery_only_if_under_num,
215			enable_mdns,
216			kademlia_disjoint_query_paths,
217			kademlia_protocol,
218			kademlia_legacy_protocol: _,
219			kademlia_replication_factor,
220		} = self;
221
222		let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
223			let mut config = KademliaConfig::new(kademlia_protocol.clone());
224
225			config.set_replication_factor(kademlia_replication_factor);
226
227			config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
228
229			config.set_query_timeout(KAD_QUERY_TIMEOUT);
230
231			// By default Kademlia attempts to insert all peers into its routing table once a
232			// dialing attempt succeeds. In order to control which peer is added, disable the
233			// auto-insertion and instead add peers manually.
234			config.set_kbucket_inserts(BucketInserts::Manual);
235			config.disjoint_query_paths(kademlia_disjoint_query_paths);
236
237			config.set_provider_record_ttl(Some(KADEMLIA_PROVIDER_RECORD_TTL));
238			config.set_provider_publication_interval(Some(KADEMLIA_PROVIDER_REPUBLISH_INTERVAL));
239
240			let store = MemoryStore::with_config(
241				local_peer_id,
242				MemoryStoreConfig {
243					max_provided_keys: KADEMLIA_MAX_PROVIDER_KEYS,
244					..Default::default()
245				},
246			);
247
248			let mut kad = Kademlia::with_config(local_peer_id, store, config);
249			kad.set_mode(Some(kad::Mode::Server));
250
251			for (peer_id, addr) in &permanent_addresses {
252				kad.add_address(peer_id, addr.clone());
253			}
254
255			Some(kad)
256		} else {
257			None
258		};
259
260		DiscoveryBehaviour {
261			permanent_addresses,
262			ephemeral_addresses: HashMap::new(),
263			kademlia: Toggle::from(kademlia),
264			next_kad_random_query: if dht_random_walk {
265				Some(Delay::new(Duration::new(0, 0)))
266			} else {
267				None
268			},
269			duration_to_next_kad: Duration::from_secs(1),
270			pending_events: VecDeque::new(),
271			local_peer_id,
272			num_connections: 0,
273			allow_private_ip,
274			discovery_only_if_under_num,
275			mdns: if enable_mdns {
276				match TokioMdns::new(mdns::Config::default(), local_peer_id) {
277					Ok(mdns) => Toggle::from(Some(mdns)),
278					Err(err) => {
279						warn!(target: LOG_TARGET, "Failed to initialize mDNS: {:?}", err);
280						Toggle::from(None)
281					},
282				}
283			} else {
284				Toggle::from(None)
285			},
286			allow_non_globals_in_dht,
287			known_external_addresses: LruHashSet::new(
288				NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
289					.expect("value is a constant; constant is non-zero; qed."),
290			),
291			records_to_publish: Default::default(),
292			kademlia_protocol,
293			provider_keys_requested: HashMap::new(),
294		}
295	}
296}
297
298/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
299pub struct DiscoveryBehaviour {
300	/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
301	/// reserved nodes.
302	permanent_addresses: Vec<(PeerId, Multiaddr)>,
303	/// Same as `permanent_addresses`, except that addresses that fail to reach a peer are
304	/// removed.
305	ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
306	/// Kademlia requests and answers. Even though it's wrapped in `Toggle`, currently
307	/// it's always enabled in `NetworkWorker::new()`.
308	kademlia: Toggle<Kademlia<MemoryStore>>,
309	/// Discovers nodes on the local network.
310	mdns: Toggle<TokioMdns>,
311	/// Stream that fires when we need to perform the next random Kademlia query. `None` if
312	/// random walking is disabled.
313	next_kad_random_query: Option<Delay>,
314	/// After `next_kad_random_query` triggers, the next one triggers after this duration.
315	duration_to_next_kad: Duration,
316	/// Events to return in priority when polled.
317	pending_events: VecDeque<DiscoveryOut>,
318	/// Identity of our local node.
319	local_peer_id: PeerId,
320	/// Number of nodes we're currently connected to.
321	num_connections: u64,
322	/// If false, `addresses_of_peer` won't return any private IPv4/IPv6 address, except for the
323	/// ones stored in `permanent_addresses` or `ephemeral_addresses`.
324	allow_private_ip: bool,
325	/// Number of active connections over which we interrupt the discovery process.
326	discovery_only_if_under_num: u64,
327	/// Should non-global addresses be added to the DHT?
328	allow_non_globals_in_dht: bool,
329	/// A cache of discovered external addresses. Only used for logging purposes.
330	known_external_addresses: LruHashSet<Multiaddr>,
331	/// Records to publish per QueryId.
332	///
333	/// After finishing a Kademlia query, libp2p will return us a list of the closest peers that
334	/// did not return the record(in `FinishedWithNoAdditionalRecord`). We will then put the record
335	/// to these peers.
336	records_to_publish: HashMap<QueryId, Record>,
337	/// The chain based kademlia protocol name (including genesis hash and fork id).
338	///
339	/// Remove when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
340	/// <https://github.com/paritytech/polkadot-sdk/issues/504>.
341	kademlia_protocol: Option<StreamProtocol>,
342	/// Provider keys requested with `GET_PROVIDERS` queries.
343	provider_keys_requested: HashMap<QueryId, RecordKey>,
344}
345
346impl DiscoveryBehaviour {
347	/// Returns the list of nodes that we know exist in the network.
348	pub fn known_peers(&mut self) -> HashSet<PeerId> {
349		let mut peers = HashSet::new();
350		if let Some(k) = self.kademlia.as_mut() {
351			for b in k.kbuckets() {
352				for e in b.iter() {
353					if !peers.contains(e.node.key.preimage()) {
354						peers.insert(*e.node.key.preimage());
355					}
356				}
357			}
358		}
359		peers
360	}
361
362	/// Adds a hard-coded address for the given peer, that never expires.
363	///
364	/// This adds an entry to the parameter that was passed to `new`.
365	///
366	/// If we didn't know this address before, also generates a `Discovered` event.
367	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
368		let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
369		if addrs_list.contains(&addr) {
370			return;
371		}
372
373		if let Some(k) = self.kademlia.as_mut() {
374			k.add_address(&peer_id, addr.clone());
375		}
376
377		self.pending_events.push_back(DiscoveryOut::Discovered(peer_id));
378		addrs_list.push(addr);
379	}
380
381	/// Add a self-reported address of a remote peer to the k-buckets of the DHT
382	/// if it has compatible `supported_protocols`.
383	///
384	/// **Note**: It is important that you call this method. The discovery mechanism will not
385	/// automatically add connecting peers to the Kademlia k-buckets.
386	pub fn add_self_reported_address(
387		&mut self,
388		peer_id: &PeerId,
389		supported_protocols: &[StreamProtocol],
390		addr: Multiaddr,
391	) {
392		if let Some(kademlia) = self.kademlia.as_mut() {
393			if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
394				trace!(
395					target: LOG_TARGET,
396					"Ignoring self-reported non-global address {} from {}.", addr, peer_id
397				);
398				return;
399			}
400
401			// The supported protocols must include the chain-based Kademlia protocol.
402			//
403			// Extract the chain-based Kademlia protocol from `kademlia.protocol_name()`
404			// when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
405			// https://github.com/paritytech/polkadot-sdk/issues/504.
406			if !supported_protocols.iter().any(|p| {
407				p == self
408					.kademlia_protocol
409					.as_ref()
410					.expect("kademlia protocol was checked above to be enabled; qed")
411			}) {
412				trace!(
413					target: LOG_TARGET,
414					"Ignoring self-reported address {} from {} as remote node is not part of the \
415					 Kademlia DHT supported by the local node.", addr, peer_id,
416				);
417				return;
418			}
419
420			trace!(
421				target: LOG_TARGET,
422				"Adding self-reported address {} from {} to Kademlia DHT.",
423				addr, peer_id
424			);
425			kademlia.add_address(peer_id, addr.clone());
426		}
427	}
428
429	/// Start finding the closest peers to the given `PeerId`.
430	///
431	/// A corresponding `ClosestPeersFound` or `ClosestPeersNotFound` event will later be generated.
432	pub fn find_closest_peers(&mut self, target: PeerId) {
433		if let Some(k) = self.kademlia.as_mut() {
434			k.get_closest_peers(target);
435		}
436	}
437
438	/// Start fetching a record from the DHT.
439	///
440	/// A corresponding `ValueFound` or `ValueNotFound` event will later be generated.
441	pub fn get_value(&mut self, key: RecordKey) {
442		if let Some(k) = self.kademlia.as_mut() {
443			k.get_record(key.clone());
444		}
445	}
446
447	/// Start putting a record into the DHT. Other nodes can later fetch that value with
448	/// `get_value`.
449	///
450	/// A corresponding `ValuePut` or `ValuePutFailed` event will later be generated.
451	pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
452		if let Some(k) = self.kademlia.as_mut() {
453			if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
454				warn!(target: LOG_TARGET, "Libp2p => Failed to put record: {:?}", e);
455				self.pending_events
456					.push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
457			}
458		}
459	}
460
461	/// Puts a record into the DHT on the provided `peers`
462	///
463	/// If `update_local_storage` is true, the local storage is update as well.
464	pub fn put_record_to(
465		&mut self,
466		record: Record,
467		peers: HashSet<crate::types::PeerId>,
468		update_local_storage: bool,
469	) {
470		if let Some(kad) = self.kademlia.as_mut() {
471			if update_local_storage {
472				if let Err(_e) = kad.store_mut().put(record.clone()) {
473					warn!(target: LOG_TARGET, "Failed to update local starage");
474				}
475			}
476
477			if !peers.is_empty() {
478				kad.put_record_to(
479					record,
480					peers.into_iter().map(|peer_id| peer_id.into()),
481					Quorum::All,
482				);
483			}
484		}
485	}
486
487	/// Register as a content provider on the DHT for `key`.
488	pub fn start_providing(&mut self, key: RecordKey) {
489		if let Some(kad) = self.kademlia.as_mut() {
490			if let Err(e) = kad.start_providing(key.clone()) {
491				warn!(target: LOG_TARGET, "Libp2p => Failed to start providing {key:?}: {e}.");
492				self.pending_events
493					.push_back(DiscoveryOut::StartProvidingFailed(key, Duration::from_secs(0)));
494			}
495		}
496	}
497
498	/// Deregister as a content provider on the DHT for `key`.
499	pub fn stop_providing(&mut self, key: &RecordKey) {
500		if let Some(kad) = self.kademlia.as_mut() {
501			kad.stop_providing(key);
502		}
503	}
504
505	/// Get content providers for `key` from the DHT.
506	pub fn get_providers(&mut self, key: RecordKey) {
507		if let Some(kad) = self.kademlia.as_mut() {
508			let query_id = kad.get_providers(key.clone());
509			self.provider_keys_requested.insert(query_id, key);
510		}
511	}
512
513	/// Store a record in the Kademlia record store.
514	pub fn store_record(
515		&mut self,
516		record_key: RecordKey,
517		record_value: Vec<u8>,
518		publisher: Option<PeerId>,
519		expires: Option<Instant>,
520	) {
521		if let Some(k) = self.kademlia.as_mut() {
522			if let Err(err) = k.store_mut().put(Record {
523				key: record_key,
524				value: record_value,
525				publisher: publisher.map(|publisher| publisher.into()),
526				expires,
527			}) {
528				debug!(
529					target: LOG_TARGET,
530					"Failed to store record with key: {:?}",
531					err
532				);
533			}
534		}
535	}
536
537	/// Returns the number of nodes in each Kademlia kbucket for each Kademlia instance.
538	///
539	/// Identifies Kademlia instances by their [`ProtocolId`] and kbuckets by the base 2 logarithm
540	/// of their lower bound.
541	pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
542		self.kademlia.as_mut().map(|kad| {
543			kad.kbuckets()
544				.map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
545				.collect()
546		})
547	}
548
549	/// Returns the number of records in the Kademlia record stores.
550	pub fn num_kademlia_records(&mut self) -> Option<usize> {
551		// Note that this code is ok only because we use a `MemoryStore`.
552		self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
553	}
554
555	/// Returns the total size in bytes of all the records in the Kademlia record stores.
556	pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
557		// Note that this code is ok only because we use a `MemoryStore`. If the records were
558		// for example stored on disk, this would load every single one of them every single time.
559		self.kademlia
560			.as_mut()
561			.map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
562	}
563
564	/// Can the given `Multiaddr` be put into the DHT?
565	///
566	/// This test is successful only for global IP addresses and DNS names.
567	// NB: Currently all DNS names are allowed and no check for TLD suffixes is done
568	// because the set of valid domains is highly dynamic and would require frequent
569	// updates, for example by utilising publicsuffix.org or IANA.
570	pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
571		let ip = match addr.iter().next() {
572			Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
573			Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
574			Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) => {
575				return true
576			},
577			_ => return false,
578		};
579		ip.is_global()
580	}
581}
582
583/// Event generated by the `DiscoveryBehaviour`.
584#[derive(Debug)]
585pub enum DiscoveryOut {
586	/// We discovered a peer and currenlty have it's addresses stored either in the routing
587	/// table or in the ephemeral addresses list, so a connection can be established.
588	Discovered(PeerId),
589
590	/// A peer connected to this node for whom no listen address is known.
591	///
592	/// In order for the peer to be added to the Kademlia routing table, a known
593	/// listen address must be added via
594	/// [`DiscoveryBehaviour::add_self_reported_address`], e.g. obtained through
595	/// the `identify` protocol.
596	UnroutablePeer(PeerId),
597
598	/// `FIND_NODE` query yielded closest peers with their addresses. This event also delivers
599	/// a partial result in case the query timed out, because it can contain the target peer's
600	/// address.
601	ClosestPeersFound(PeerId, Vec<(PeerId, Vec<Multiaddr>)>, Duration),
602
603	/// The closest peers to the target `PeerId` have not been found.
604	ClosestPeersNotFound(PeerId, Duration),
605
606	/// The DHT yielded results for the record request.
607	///
608	/// Returning the result grouped in (key, value) pairs as well as the request duration.
609	ValueFound(PeerRecord, Duration),
610
611	/// The DHT received a put record request.
612	PutRecordRequest(RecordKey, Vec<u8>, Option<crate::types::PeerId>, Option<std::time::Instant>),
613
614	/// The record requested was not found in the DHT.
615	///
616	/// Returning the corresponding key as well as the request duration.
617	ValueNotFound(RecordKey, Duration),
618
619	/// The record with a given key was successfully inserted into the DHT.
620	///
621	/// Returning the corresponding key as well as the request duration.
622	ValuePut(RecordKey, Duration),
623
624	/// Inserting a value into the DHT failed.
625	///
626	/// Returning the corresponding key as well as the request duration.
627	ValuePutFailed(RecordKey, Duration),
628
629	/// The content provider for a given key was successfully published.
630	StartedProviding(RecordKey, Duration),
631
632	/// Starting providing a key failed.
633	StartProvidingFailed(RecordKey, Duration),
634
635	/// The DHT yielded results for the providers request.
636	ProvidersFound(RecordKey, HashSet<PeerId>, Duration),
637
638	/// The DHT yielded no more providers for the key (`GET_PROVIDERS` query finished).
639	NoMoreProviders(RecordKey, Duration),
640
641	/// Providers for the requested key were not found in the DHT.
642	ProvidersNotFound(RecordKey, Duration),
643
644	/// Started a random Kademlia query.
645	///
646	/// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`.
647	RandomKademliaStarted,
648}
649
650impl NetworkBehaviour for DiscoveryBehaviour {
651	type ConnectionHandler =
652		ToggleConnectionHandler<<Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler>;
653	type ToSwarm = DiscoveryOut;
654
655	fn handle_established_inbound_connection(
656		&mut self,
657		connection_id: ConnectionId,
658		peer: PeerId,
659		local_addr: &Multiaddr,
660		remote_addr: &Multiaddr,
661	) -> Result<THandler<Self>, ConnectionDenied> {
662		self.kademlia.handle_established_inbound_connection(
663			connection_id,
664			peer,
665			local_addr,
666			remote_addr,
667		)
668	}
669
670	fn handle_established_outbound_connection(
671		&mut self,
672		connection_id: ConnectionId,
673		peer: PeerId,
674		addr: &Multiaddr,
675		role_override: Endpoint,
676		port_use: PortUse,
677	) -> Result<THandler<Self>, ConnectionDenied> {
678		self.kademlia.handle_established_outbound_connection(
679			connection_id,
680			peer,
681			addr,
682			role_override,
683			port_use,
684		)
685	}
686
687	fn handle_pending_inbound_connection(
688		&mut self,
689		connection_id: ConnectionId,
690		local_addr: &Multiaddr,
691		remote_addr: &Multiaddr,
692	) -> Result<(), ConnectionDenied> {
693		self.kademlia
694			.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
695	}
696
697	fn handle_pending_outbound_connection(
698		&mut self,
699		connection_id: ConnectionId,
700		maybe_peer: Option<PeerId>,
701		addresses: &[Multiaddr],
702		effective_role: Endpoint,
703	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
704		let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
705
706		// Collect addresses into [`LinkedHashSet`] to eliminate duplicate entries preserving the
707		// order of addresses. Give priority to `permanent_addresses` (used with reserved nodes) and
708		// `ephemeral_addresses` (used for addresses discovered from other sources, like authority
709		// discovery DHT records).
710		let mut list: LinkedHashSet<_> = self
711			.permanent_addresses
712			.iter()
713			.filter_map(|(p, a)| (*p == peer_id).then(|| a.clone()))
714			.collect();
715
716		if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
717			ephemeral_addresses.iter().for_each(|address| {
718				list.insert_if_absent(address.clone());
719			});
720		}
721
722		{
723			let mut list_to_filter = self.kademlia.handle_pending_outbound_connection(
724				connection_id,
725				maybe_peer,
726				addresses,
727				effective_role,
728			)?;
729
730			list_to_filter.extend(self.mdns.handle_pending_outbound_connection(
731				connection_id,
732				maybe_peer,
733				addresses,
734				effective_role,
735			)?);
736
737			if !self.allow_private_ip {
738				list_to_filter.retain(|addr| match addr.iter().next() {
739					Some(Protocol::Ip4(addr)) if !IpNetwork::from(addr).is_global() => false,
740					Some(Protocol::Ip6(addr)) if !IpNetwork::from(addr).is_global() => false,
741					_ => true,
742				});
743			}
744
745			list_to_filter.into_iter().for_each(|address| {
746				list.insert_if_absent(address);
747			});
748		}
749
750		trace!(target: LOG_TARGET, "Addresses of {:?}: {:?}", peer_id, list);
751
752		Ok(list.into_iter().collect())
753	}
754
755	fn on_swarm_event(&mut self, event: FromSwarm) {
756		match event {
757			FromSwarm::ConnectionEstablished(e) => {
758				self.num_connections += 1;
759				self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
760			},
761			FromSwarm::ConnectionClosed(e) => {
762				self.num_connections -= 1;
763				self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
764			},
765			FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
766				if let Some(peer_id) = peer_id {
767					if let DialError::Transport(errors) = error {
768						if let Entry::Occupied(mut entry) = self.ephemeral_addresses.entry(peer_id)
769						{
770							for (addr, _error) in errors {
771								entry.get_mut().retain(|a| a != addr);
772							}
773							if entry.get().is_empty() {
774								entry.remove();
775							}
776						}
777					}
778				}
779
780				self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
781			},
782			FromSwarm::ListenerClosed(e) => {
783				self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
784			},
785			FromSwarm::ListenFailure(e) => {
786				self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
787			},
788			FromSwarm::ListenerError(e) => {
789				self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
790			},
791			FromSwarm::ExternalAddrExpired(e) => {
792				// We intentionally don't remove the element from `known_external_addresses` in
793				// order to not print the log line again.
794
795				self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
796			},
797			FromSwarm::NewListener(e) => {
798				self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
799			},
800			FromSwarm::ExpiredListenAddr(e) => {
801				self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
802			},
803			FromSwarm::NewExternalAddrCandidate(e) => {
804				self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
805			},
806			FromSwarm::AddressChange(e) => {
807				self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
808			},
809			FromSwarm::NewListenAddr(e) => {
810				self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
811				self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
812			},
813			FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
814				let mut address = addr.clone();
815
816				if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
817					if peer_id != self.local_peer_id {
818						warn!(
819							target: LOG_TARGET,
820							"🔍 Discovered external address for a peer that is not us: {addr}",
821						);
822						// Ensure this address is not propagated to kademlia.
823						return;
824					}
825				} else {
826					address.push(Protocol::P2p(self.local_peer_id));
827				}
828
829				if Self::can_add_to_dht(&address) {
830					// NOTE: we might re-discover the same address multiple times
831					// in which case we just want to refrain from logging.
832					if self.known_external_addresses.insert(address.clone()) {
833						info!(
834						  target: LOG_TARGET,
835						  "🔍 Discovered new external address for our node: {address}",
836						);
837					}
838				}
839
840				self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
841			},
842			FromSwarm::NewExternalAddrOfPeer(e) => {
843				self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
844				self.mdns.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
845			},
846			event => {
847				debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
848				self.kademlia.on_swarm_event(event);
849				self.mdns.on_swarm_event(event);
850			},
851		}
852	}
853
854	fn on_connection_handler_event(
855		&mut self,
856		peer_id: PeerId,
857		connection_id: ConnectionId,
858		event: THandlerOutEvent<Self>,
859	) {
860		self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
861	}
862
863	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
864		// Immediately process the content of `discovered`.
865		if let Some(ev) = self.pending_events.pop_front() {
866			return Poll::Ready(ToSwarm::GenerateEvent(ev));
867		}
868
869		// Poll the stream that fires when we need to start a random Kademlia query.
870		if let Some(kademlia) = self.kademlia.as_mut() {
871			if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
872				while next_kad_random_query.poll_unpin(cx).is_ready() {
873					let actually_started =
874						if self.num_connections < self.discovery_only_if_under_num {
875							let random_peer_id = PeerId::random();
876							debug!(
877								target: LOG_TARGET,
878								"Libp2p <= Starting random Kademlia request for {:?}",
879								random_peer_id,
880							);
881							kademlia.get_closest_peers(random_peer_id);
882							true
883						} else {
884							debug!(
885								target: LOG_TARGET,
886								"Kademlia paused due to high number of connections ({})",
887								self.num_connections
888							);
889							false
890						};
891
892					// Schedule the next random query with exponentially increasing delay,
893					// capped at 60 seconds.
894					*next_kad_random_query = Delay::new(self.duration_to_next_kad);
895					self.duration_to_next_kad =
896						cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
897
898					if actually_started {
899						let ev = DiscoveryOut::RandomKademliaStarted;
900						return Poll::Ready(ToSwarm::GenerateEvent(ev));
901					}
902				}
903			}
904		}
905
906		while let Poll::Ready(ev) = self.kademlia.poll(cx) {
907			match ev {
908				ToSwarm::GenerateEvent(ev) => match ev {
909					KademliaEvent::RoutingUpdated { peer, .. } => {
910						let ev = DiscoveryOut::Discovered(peer);
911						return Poll::Ready(ToSwarm::GenerateEvent(ev));
912					},
913					KademliaEvent::UnroutablePeer { peer, .. } => {
914						let ev = DiscoveryOut::UnroutablePeer(peer);
915						return Poll::Ready(ToSwarm::GenerateEvent(ev));
916					},
917					KademliaEvent::RoutablePeer { .. } => {
918						// Generate nothing, because the address was not added to the routing table,
919						// so we will not be able to connect to the peer.
920					},
921					KademliaEvent::PendingRoutablePeer { .. } => {
922						// We are not interested in this event at the moment.
923					},
924					KademliaEvent::InboundRequest { request } => match request {
925						libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } => {
926							return Poll::Ready(ToSwarm::GenerateEvent(
927								DiscoveryOut::PutRecordRequest(
928									record.key,
929									record.value,
930									record.publisher.map(Into::into),
931									record.expires,
932								),
933							))
934						},
935						_ => {},
936					},
937					KademliaEvent::OutboundQueryProgressed {
938						result: QueryResult::GetClosestPeers(res),
939						stats,
940						..
941					} => {
942						let (key, peers, timeout) = match res {
943							Ok(GetClosestPeersOk { key, peers }) => (key, peers, false),
944							Err(GetClosestPeersError::Timeout { key, peers }) => (key, peers, true),
945						};
946
947						let target = match PeerId::from_bytes(&key.clone()) {
948							Ok(peer_id) => peer_id,
949							Err(_) => {
950								warn!(
951									target: LOG_TARGET,
952									"Libp2p => FIND_NODE query finished for target that is not \
953									 a peer ID: {:?}",
954									HexDisplay::from(&key),
955								);
956								continue;
957							},
958						};
959
960						if timeout {
961							debug!(
962								target: LOG_TARGET,
963								"Libp2p => Query for target {target:?} timed out and yielded {} peers",
964								peers.len(),
965							);
966						} else {
967							debug!(
968								target: LOG_TARGET,
969								"Libp2p => Query for target {target:?} yielded {} peers",
970								peers.len(),
971							);
972						}
973
974						let ev = if peers.is_empty() {
975							DiscoveryOut::ClosestPeersNotFound(
976								target,
977								stats.duration().unwrap_or_default(),
978							)
979						} else {
980							DiscoveryOut::ClosestPeersFound(
981								target,
982								peers.into_iter().map(|p| (p.peer_id, p.addrs)).collect(),
983								stats.duration().unwrap_or_default(),
984							)
985						};
986
987						return Poll::Ready(ToSwarm::GenerateEvent(ev));
988					},
989					KademliaEvent::OutboundQueryProgressed {
990						result: QueryResult::GetRecord(res),
991						stats,
992						id,
993						..
994					} => {
995						let ev = match res {
996							Ok(GetRecordOk::FoundRecord(r)) => {
997								debug!(
998									target: LOG_TARGET,
999									"Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
1000									r.record.key,
1001									r.record.value,
1002									id,
1003									stats,
1004								);
1005
1006								// Let's directly finish the query if we are above 4.
1007								// This number is small enough to make sure we don't
1008								// unnecessarily flood the network with queries, but high
1009								// enough to make sure we also touch peers which might have
1010								// old record, so that we can update them once we notice
1011								// they have old records.
1012								if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
1013									if let Some(kad) = self.kademlia.as_mut() {
1014										if let Some(mut query) = kad.query_mut(&id) {
1015											query.finish();
1016										}
1017									}
1018								}
1019
1020								// Will be removed below when we receive
1021								// `FinishedWithNoAdditionalRecord`.
1022								self.records_to_publish.insert(id, r.record.clone());
1023
1024								DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
1025							},
1026							Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
1027								cache_candidates,
1028							}) => {
1029								debug!(
1030									target: LOG_TARGET,
1031									"Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
1032									id,
1033									stats,
1034									stats.duration().map(|val| val.as_millis())
1035								);
1036								// We always need to remove the record to not leak any data!
1037								if let Some(record) = self.records_to_publish.remove(&id) {
1038									if cache_candidates.is_empty() {
1039										continue;
1040									}
1041
1042									// Put the record to the `cache_candidates` that are nearest to
1043									// the record key from our point of view of the network.
1044									if let Some(kad) = self.kademlia.as_mut() {
1045										kad.put_record_to(
1046											record,
1047											cache_candidates.into_iter().map(|v| v.1),
1048											Quorum::One,
1049										);
1050									}
1051								}
1052
1053								continue;
1054							},
1055							Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
1056								trace!(
1057									target: LOG_TARGET,
1058									"Libp2p => Failed to get record: {:?}",
1059									e,
1060								);
1061								DiscoveryOut::ValueNotFound(
1062									e.into_key(),
1063									stats.duration().unwrap_or_default(),
1064								)
1065							},
1066							Err(e) => {
1067								debug!(
1068									target: LOG_TARGET,
1069									"Libp2p => Failed to get record: {:?}",
1070									e,
1071								);
1072								DiscoveryOut::ValueNotFound(
1073									e.into_key(),
1074									stats.duration().unwrap_or_default(),
1075								)
1076							},
1077						};
1078						return Poll::Ready(ToSwarm::GenerateEvent(ev));
1079					},
1080					KademliaEvent::OutboundQueryProgressed {
1081						result: QueryResult::GetProviders(res),
1082						stats,
1083						id,
1084						..
1085					} => {
1086						let ev = match res {
1087							Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1088								debug!(
1089									target: LOG_TARGET,
1090									"Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}",
1091									providers,
1092									key,
1093									id,
1094									stats,
1095								);
1096
1097								DiscoveryOut::ProvidersFound(
1098									key,
1099									providers,
1100									stats.duration().unwrap_or_default(),
1101								)
1102							},
1103							Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
1104								closest_peers: _,
1105							}) => {
1106								debug!(
1107									target: LOG_TARGET,
1108									"Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms",
1109									id,
1110									stats,
1111									stats.duration().map(|val| val.as_millis())
1112								);
1113
1114								if let Some(key) = self.provider_keys_requested.remove(&id) {
1115									DiscoveryOut::NoMoreProviders(
1116										key,
1117										stats.duration().unwrap_or_default(),
1118									)
1119								} else {
1120									error!(
1121										target: LOG_TARGET,
1122										"No key found for `GET_PROVIDERS` query {id:?}. This is a bug.",
1123									);
1124									continue;
1125								}
1126							},
1127							Err(GetProvidersError::Timeout { key, closest_peers: _ }) => {
1128								debug!(
1129									target: LOG_TARGET,
1130									"Libp2p => Failed to get providers for {key:?} due to timeout.",
1131								);
1132
1133								self.provider_keys_requested.remove(&id);
1134
1135								DiscoveryOut::ProvidersNotFound(
1136									key,
1137									stats.duration().unwrap_or_default(),
1138								)
1139							},
1140						};
1141						return Poll::Ready(ToSwarm::GenerateEvent(ev));
1142					},
1143					KademliaEvent::OutboundQueryProgressed {
1144						result: QueryResult::PutRecord(res),
1145						stats,
1146						..
1147					} => {
1148						let ev = match res {
1149							Ok(ok) => {
1150								trace!(
1151									target: LOG_TARGET,
1152									"Libp2p => Put record for key: {:?}",
1153									ok.key,
1154								);
1155								DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default())
1156							},
1157							Err(e) => {
1158								debug!(
1159									target: LOG_TARGET,
1160									"Libp2p => Failed to put record for key {:?}: {:?}",
1161									e.key(),
1162									e,
1163								);
1164								DiscoveryOut::ValuePutFailed(
1165									e.into_key(),
1166									stats.duration().unwrap_or_default(),
1167								)
1168							},
1169						};
1170						return Poll::Ready(ToSwarm::GenerateEvent(ev));
1171					},
1172					KademliaEvent::OutboundQueryProgressed {
1173						result: QueryResult::RepublishRecord(res),
1174						..
1175					} => match res {
1176						Ok(ok) => debug!(
1177							target: LOG_TARGET,
1178							"Libp2p => Record republished: {:?}",
1179							ok.key,
1180						),
1181						Err(e) => debug!(
1182							target: LOG_TARGET,
1183							"Libp2p => Republishing of record {:?} failed with: {:?}",
1184							e.key(), e,
1185						),
1186					},
1187					KademliaEvent::OutboundQueryProgressed {
1188						result: QueryResult::StartProviding(res),
1189						stats,
1190						..
1191					} => {
1192						let ev = match res {
1193							Ok(ok) => {
1194								trace!(
1195									target: LOG_TARGET,
1196									"Libp2p => Started providing key {:?}",
1197									ok.key,
1198								);
1199								DiscoveryOut::StartedProviding(
1200									ok.key,
1201									stats.duration().unwrap_or_default(),
1202								)
1203							},
1204							Err(e) => {
1205								debug!(
1206									target: LOG_TARGET,
1207									"Libp2p => Failed to start providing key {:?}: {:?}",
1208									e.key(),
1209									e,
1210								);
1211								DiscoveryOut::StartProvidingFailed(
1212									e.into_key(),
1213									stats.duration().unwrap_or_default(),
1214								)
1215							},
1216						};
1217						return Poll::Ready(ToSwarm::GenerateEvent(ev));
1218					},
1219					KademliaEvent::OutboundQueryProgressed {
1220						result: QueryResult::Bootstrap(res),
1221						..
1222					} => match res {
1223						Ok(ok) => debug!(
1224							target: LOG_TARGET,
1225							"Libp2p => DHT bootstrap progressed: {ok:?}",
1226						),
1227						Err(e) => warn!(
1228							target: LOG_TARGET,
1229							"Libp2p => DHT bootstrap error: {e:?}",
1230						),
1231					},
1232					// We never start any other type of query.
1233					KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
1234						warn!(target: LOG_TARGET, "Libp2p => Unhandled Kademlia event: {:?}", e)
1235					},
1236					Event::ModeChanged { new_mode } => {
1237						debug!(target: LOG_TARGET, "Libp2p => Kademlia mode changed: {new_mode}")
1238					},
1239				},
1240				ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
1241				event => {
1242					return Poll::Ready(event.map_out(|_| {
1243						unreachable!("`GenerateEvent` is handled in a branch above; qed")
1244					}));
1245				},
1246			}
1247		}
1248
1249		// Poll mDNS.
1250		while let Poll::Ready(ev) = self.mdns.poll(cx) {
1251			match ev {
1252				ToSwarm::GenerateEvent(event) => match event {
1253					mdns::Event::Discovered(list) => {
1254						if self.num_connections >= self.discovery_only_if_under_num {
1255							continue;
1256						}
1257
1258						self.pending_events.extend(
1259							list.into_iter().map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)),
1260						);
1261						if let Some(ev) = self.pending_events.pop_front() {
1262							return Poll::Ready(ToSwarm::GenerateEvent(ev));
1263						}
1264					},
1265					mdns::Event::Expired(_) => {},
1266				},
1267				ToSwarm::Dial { .. } => {
1268					unreachable!("mDNS never dials!");
1269				},
1270				// `event` is an enum with no variant
1271				ToSwarm::NotifyHandler { event, .. } => match event {},
1272				event => {
1273					return Poll::Ready(
1274						event
1275							.map_in(|_| {
1276								unreachable!("`NotifyHandler` is handled in a branch above; qed")
1277							})
1278							.map_out(|_| {
1279								unreachable!("`GenerateEvent` is handled in a branch above; qed")
1280							}),
1281					);
1282				},
1283			}
1284		}
1285
1286		Poll::Pending
1287	}
1288}
1289
1290/// Legacy (fallback) Kademlia protocol name based on `protocol_id`.
1291fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol {
1292	let name = format!("/{}/kad", id.as_ref());
1293	StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1294}
1295
1296/// Kademlia protocol name based on `genesis_hash` and `fork_id`.
1297fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
1298	genesis_hash: Hash,
1299	fork_id: Option<&str>,
1300) -> StreamProtocol {
1301	let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
1302	let name = if let Some(fork_id) = fork_id {
1303		format!("/{genesis_hash_hex}/{fork_id}/kad")
1304	} else {
1305		format!("/{genesis_hash_hex}/kad")
1306	};
1307
1308	StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1309}
1310
1311#[cfg(test)]
1312mod tests {
1313	use super::{kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig};
1314	use crate::config::ProtocolId;
1315	use libp2p::{identity::Keypair, Multiaddr};
1316	use subsoil::core::hash::H256;
1317
1318	#[cfg(ignore_flaky_test)] // https://github.com/paritytech/polkadot-sdk/issues/48
1319	#[tokio::test]
1320	async fn discovery_working() {
1321		use super::DiscoveryOut;
1322		use futures::prelude::*;
1323		use libp2p::{
1324			core::{
1325				transport::{MemoryTransport, Transport},
1326				upgrade,
1327			},
1328			noise,
1329			swarm::{Swarm, SwarmEvent},
1330			yamux,
1331		};
1332		use std::{collections::HashSet, task::Poll, time::Duration};
1333		let mut first_swarm_peer_id_and_addr = None;
1334
1335		let genesis_hash = H256::from_low_u64_be(1);
1336		let fork_id = Some("test-fork-id");
1337		let protocol_id = ProtocolId::from("dot");
1338
1339		// Build swarms whose behaviour is `DiscoveryBehaviour`, each aware of
1340		// the first swarm via `with_permanent_addresses`.
1341		let mut swarms = (0..25)
1342			.map(|i| {
1343				let mut swarm = libp2p::SwarmBuilder::with_new_identity()
1344					.with_tokio()
1345					.with_other_transport(|keypair| {
1346						MemoryTransport::new()
1347							.upgrade(upgrade::Version::V1)
1348							.authenticate(noise::Config::new(&keypair).unwrap())
1349							.multiplex(yamux::Config::default())
1350							.boxed()
1351					})
1352					.unwrap()
1353					.with_behaviour(|keypair| {
1354						let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1355						config
1356							.with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
1357							.allow_private_ip(true)
1358							.allow_non_globals_in_dht(true)
1359							.discovery_limit(50)
1360							.with_kademlia(genesis_hash, fork_id, &protocol_id);
1361
1362						config.finish()
1363					})
1364					.unwrap()
1365					.with_swarm_config(|config| {
1366						// This is taken care of by notification protocols in non-test environment
1367						config.with_idle_connection_timeout(Duration::from_secs(10))
1368					})
1369					.build();
1370
1371				let listen_addr: Multiaddr =
1372					format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1373
1374				if i == 0 {
1375					first_swarm_peer_id_and_addr =
1376						Some((*swarm.local_peer_id(), listen_addr.clone()))
1377				}
1378
1379				swarm.listen_on(listen_addr.clone()).unwrap();
1380				(swarm, listen_addr)
1381			})
1382			.collect::<Vec<_>>();
1383
1384		// Build a `Vec<HashSet<PeerId>>` with the list of nodes remaining to be discovered.
1385		let mut to_discover = (0..swarms.len())
1386			.map(|n| {
1387				(0..swarms.len())
1388					// Skip the first swarm as all other swarms already know it.
1389					.skip(1)
1390					.filter(|p| *p != n)
1391					.map(|p| *Swarm::local_peer_id(&swarms[p].0))
1392					.collect::<HashSet<_>>()
1393			})
1394			.collect::<Vec<_>>();
1395
1396		let fut = futures::future::poll_fn(move |cx| {
1397			'polling: loop {
1398				for swarm_n in 0..swarms.len() {
1399					match swarms[swarm_n].0.poll_next_unpin(cx) {
1400						Poll::Ready(Some(e)) => {
1401							match e {
1402								SwarmEvent::Behaviour(behavior) => {
1403									match behavior {
1404										DiscoveryOut::UnroutablePeer(other)
1405										| DiscoveryOut::Discovered(other) => {
1406											// Call `add_self_reported_address` to simulate identify
1407											// happening.
1408											let addr = swarms
1409												.iter()
1410												.find_map(|(s, a)| {
1411													if s.behaviour().local_peer_id == other {
1412														Some(a.clone())
1413													} else {
1414														None
1415													}
1416												})
1417												.unwrap();
1418											// Test both genesis hash-based and legacy
1419											// protocol names.
1420											let protocol_names = if swarm_n % 2 == 0 {
1421												vec![kademlia_protocol_name(genesis_hash, fork_id)]
1422											} else {
1423												vec![
1424													legacy_kademlia_protocol_name(&protocol_id),
1425													kademlia_protocol_name(genesis_hash, fork_id),
1426												]
1427											};
1428											swarms[swarm_n]
1429												.0
1430												.behaviour_mut()
1431												.add_self_reported_address(
1432													&other,
1433													protocol_names.as_slice(),
1434													addr,
1435												);
1436
1437											to_discover[swarm_n].remove(&other);
1438										},
1439										DiscoveryOut::RandomKademliaStarted => {},
1440										DiscoveryOut::ClosestPeersFound(..) => {},
1441										// libp2p emits this event when it is not particularly
1442										// happy, but this doesn't break the discovery.
1443										DiscoveryOut::ClosestPeersNotFound(..) => {},
1444										e => {
1445											panic!("Unexpected event: {:?}", e)
1446										},
1447									}
1448								},
1449								// ignore non Behaviour events
1450								_ => {},
1451							}
1452							continue 'polling;
1453						},
1454						_ => {},
1455					}
1456				}
1457				break;
1458			}
1459
1460			if to_discover.iter().all(|l| l.is_empty()) {
1461				Poll::Ready(())
1462			} else {
1463				Poll::Pending
1464			}
1465		});
1466
1467		fut.await
1468	}
1469
1470	#[test]
1471	fn discovery_ignores_peers_with_unknown_protocols() {
1472		let supported_genesis_hash = H256::from_low_u64_be(1);
1473		let unsupported_genesis_hash = H256::from_low_u64_be(2);
1474		let supported_protocol_id = ProtocolId::from("a");
1475		let unsupported_protocol_id = ProtocolId::from("b");
1476
1477		let mut discovery = {
1478			let keypair = Keypair::generate_ed25519();
1479			let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1480			config
1481				.allow_private_ip(true)
1482				.allow_non_globals_in_dht(true)
1483				.discovery_limit(50)
1484				.with_kademlia(supported_genesis_hash, None, &supported_protocol_id);
1485			config.finish()
1486		};
1487
1488		let predictable_peer_id = |bytes: &[u8; 32]| {
1489			Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id()
1490		};
1491
1492		let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001");
1493		let remote_addr: Multiaddr = "/memory/1".parse().unwrap();
1494		let another_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1495		let another_addr: Multiaddr = "/memory/2".parse().unwrap();
1496
1497		// Try adding remote peers with unsupported protocols.
1498		discovery.add_self_reported_address(
1499			&remote_peer_id,
1500			&[kademlia_protocol_name(unsupported_genesis_hash, None)],
1501			remote_addr.clone(),
1502		);
1503		discovery.add_self_reported_address(
1504			&another_peer_id,
1505			&[legacy_kademlia_protocol_name(&unsupported_protocol_id)],
1506			another_addr.clone(),
1507		);
1508
1509		{
1510			let kademlia = discovery.kademlia.as_mut().unwrap();
1511			assert!(
1512				kademlia
1513					.kbucket(remote_peer_id)
1514					.expect("Remote peer id not to be equal to local peer id.")
1515					.is_empty(),
1516				"Expect peer with unsupported protocol not to be added."
1517			);
1518			assert!(
1519				kademlia
1520					.kbucket(another_peer_id)
1521					.expect("Remote peer id not to be equal to local peer id.")
1522					.is_empty(),
1523				"Expect peer with unsupported protocol not to be added."
1524			);
1525		}
1526
1527		// Add remote peers with supported protocols.
1528		discovery.add_self_reported_address(
1529			&remote_peer_id,
1530			&[kademlia_protocol_name(supported_genesis_hash, None)],
1531			remote_addr.clone(),
1532		);
1533		{
1534			let kademlia = discovery.kademlia.as_mut().unwrap();
1535			assert!(
1536				!kademlia
1537					.kbucket(remote_peer_id)
1538					.expect("Remote peer id not to be equal to local peer id.")
1539					.is_empty(),
1540				"Expect peer with supported protocol to be added."
1541			);
1542		}
1543
1544		let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1545		let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();
1546
1547		// Check the unsupported peer is not present before and after the call.
1548		{
1549			let kademlia = discovery.kademlia.as_mut().unwrap();
1550			assert!(
1551				kademlia
1552					.kbucket(unsupported_peer_id)
1553					.expect("Remote peer id not to be equal to local peer id.")
1554					.is_empty(),
1555				"Expect unsupported peer not to be added."
1556			);
1557		}
1558		// Note: legacy protocol is not supported without genesis hash and fork ID,
1559		// if the legacy is the only protocol supported, then the peer will not be added.
1560		discovery.add_self_reported_address(
1561			&unsupported_peer_id,
1562			&[legacy_kademlia_protocol_name(&supported_protocol_id)],
1563			unsupported_peer_addr.clone(),
1564		);
1565		{
1566			let kademlia = discovery.kademlia.as_mut().unwrap();
1567			assert!(
1568				kademlia
1569					.kbucket(unsupported_peer_id)
1570					.expect("Remote peer id not to be equal to local peer id.")
1571					.is_empty(),
1572				"Expect unsupported peer not to be added."
1573			);
1574		}
1575
1576		// Supported legacy and genesis based protocols are allowed to be added.
1577		discovery.add_self_reported_address(
1578			&another_peer_id,
1579			&[
1580				legacy_kademlia_protocol_name(&supported_protocol_id),
1581				kademlia_protocol_name(supported_genesis_hash, None),
1582			],
1583			another_addr.clone(),
1584		);
1585
1586		{
1587			let kademlia = discovery.kademlia.as_mut().unwrap();
1588			assert_eq!(
1589				2,
1590				kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
1591				"Expect peers with supported protocol to be added."
1592			);
1593			assert!(
1594				!kademlia
1595					.kbucket(another_peer_id)
1596					.expect("Remote peer id not to be equal to local peer id.")
1597					.is_empty(),
1598				"Expect peer with supported protocol to be added."
1599			);
1600		}
1601	}
1602}