1use crate::{
50 config::{
51 ProtocolId, KADEMLIA_MAX_PROVIDER_KEYS, KADEMLIA_PROVIDER_RECORD_TTL,
52 KADEMLIA_PROVIDER_REPUBLISH_INTERVAL,
53 },
54 utils::LruHashSet,
55};
56
57use array_bytes::bytes2hex;
58use futures::prelude::*;
59use futures_timer::Delay;
60use ip_network::IpNetwork;
61use libp2p::{
62 core::{transport::PortUse, Endpoint, Multiaddr},
63 kad::{
64 self,
65 store::{MemoryStore, MemoryStoreConfig, RecordStore},
66 Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
67 Event, GetClosestPeersError, GetClosestPeersOk, GetProvidersError, GetProvidersOk,
68 GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record, RecordKey,
69 },
70 mdns::{self, tokio::Behaviour as TokioMdns},
71 multiaddr::Protocol,
72 swarm::{
73 behaviour::{
74 toggle::{Toggle, ToggleConnectionHandler},
75 DialFailure, ExternalAddrConfirmed, FromSwarm,
76 },
77 ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, THandler,
78 THandlerInEvent, THandlerOutEvent, ToSwarm,
79 },
80 PeerId,
81};
82use linked_hash_set::LinkedHashSet;
83use log::{debug, error, info, trace, warn};
84use sp_core::hexdisplay::HexDisplay;
85use std::{
86 cmp,
87 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
88 num::NonZeroUsize,
89 task::{Context, Poll},
90 time::{Duration, Instant},
91};
92
93const LOG_TARGET: &str = "sub-libp2p::discovery";
95
96const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
100
101pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
104
105const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
107
108const KAD_QUERY_TIMEOUT: Duration = Duration::from_secs(300);
111
112pub struct DiscoveryConfig {
118 local_peer_id: PeerId,
119 permanent_addresses: Vec<(PeerId, Multiaddr)>,
120 dht_random_walk: bool,
121 allow_private_ip: bool,
122 allow_non_globals_in_dht: bool,
123 discovery_only_if_under_num: u64,
124 enable_mdns: bool,
125 kademlia_disjoint_query_paths: bool,
126 kademlia_protocol: Option<StreamProtocol>,
127 kademlia_legacy_protocol: Option<StreamProtocol>,
128 kademlia_replication_factor: NonZeroUsize,
129}
130
131impl DiscoveryConfig {
132 pub fn new(local_peer_id: PeerId) -> Self {
134 Self {
135 local_peer_id,
136 permanent_addresses: Vec::new(),
137 dht_random_walk: true,
138 allow_private_ip: true,
139 allow_non_globals_in_dht: false,
140 discovery_only_if_under_num: std::u64::MAX,
141 enable_mdns: false,
142 kademlia_disjoint_query_paths: false,
143 kademlia_protocol: None,
144 kademlia_legacy_protocol: None,
145 kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
146 .expect("value is a constant; constant is non-zero; qed."),
147 }
148 }
149
150 pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
152 self.discovery_only_if_under_num = limit;
153 self
154 }
155
156 pub fn with_permanent_addresses<I>(&mut self, permanent_addresses: I) -> &mut Self
158 where
159 I: IntoIterator<Item = (PeerId, Multiaddr)>,
160 {
161 self.permanent_addresses.extend(permanent_addresses);
162 self
163 }
164
165 pub fn with_dht_random_walk(&mut self, value: bool) -> &mut Self {
168 self.dht_random_walk = value;
169 self
170 }
171
172 pub fn allow_private_ip(&mut self, value: bool) -> &mut Self {
174 self.allow_private_ip = value;
175 self
176 }
177
178 pub fn allow_non_globals_in_dht(&mut self, value: bool) -> &mut Self {
180 self.allow_non_globals_in_dht = value;
181 self
182 }
183
184 pub fn with_mdns(&mut self, value: bool) -> &mut Self {
186 self.enable_mdns = value;
187 self
188 }
189
190 pub fn with_kademlia<Hash: AsRef<[u8]>>(
195 &mut self,
196 genesis_hash: Hash,
197 fork_id: Option<&str>,
198 protocol_id: &ProtocolId,
199 ) -> &mut Self {
200 self.kademlia_protocol = Some(kademlia_protocol_name(genesis_hash, fork_id));
201 self.kademlia_legacy_protocol = Some(legacy_kademlia_protocol_name(protocol_id));
202 self
203 }
204
205 pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
208 self.kademlia_disjoint_query_paths = value;
209 self
210 }
211
212 pub fn with_kademlia_replication_factor(&mut self, value: NonZeroUsize) -> &mut Self {
214 self.kademlia_replication_factor = value;
215 self
216 }
217
218 pub fn finish(self) -> DiscoveryBehaviour {
220 let Self {
221 local_peer_id,
222 permanent_addresses,
223 dht_random_walk,
224 allow_private_ip,
225 allow_non_globals_in_dht,
226 discovery_only_if_under_num,
227 enable_mdns,
228 kademlia_disjoint_query_paths,
229 kademlia_protocol,
230 kademlia_legacy_protocol: _,
231 kademlia_replication_factor,
232 } = self;
233
234 let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
235 let mut config = KademliaConfig::new(kademlia_protocol.clone());
236
237 config.set_replication_factor(kademlia_replication_factor);
238
239 config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
240
241 config.set_query_timeout(KAD_QUERY_TIMEOUT);
242
243 config.set_kbucket_inserts(BucketInserts::Manual);
247 config.disjoint_query_paths(kademlia_disjoint_query_paths);
248
249 config.set_provider_record_ttl(Some(KADEMLIA_PROVIDER_RECORD_TTL));
250 config.set_provider_publication_interval(Some(KADEMLIA_PROVIDER_REPUBLISH_INTERVAL));
251
252 let store = MemoryStore::with_config(
253 local_peer_id,
254 MemoryStoreConfig {
255 max_provided_keys: KADEMLIA_MAX_PROVIDER_KEYS,
256 ..Default::default()
257 },
258 );
259
260 let mut kad = Kademlia::with_config(local_peer_id, store, config);
261 kad.set_mode(Some(kad::Mode::Server));
262
263 for (peer_id, addr) in &permanent_addresses {
264 kad.add_address(peer_id, addr.clone());
265 }
266
267 Some(kad)
268 } else {
269 None
270 };
271
272 DiscoveryBehaviour {
273 permanent_addresses,
274 ephemeral_addresses: HashMap::new(),
275 kademlia: Toggle::from(kademlia),
276 next_kad_random_query: if dht_random_walk {
277 Some(Delay::new(Duration::new(0, 0)))
278 } else {
279 None
280 },
281 duration_to_next_kad: Duration::from_secs(1),
282 pending_events: VecDeque::new(),
283 local_peer_id,
284 num_connections: 0,
285 allow_private_ip,
286 discovery_only_if_under_num,
287 mdns: if enable_mdns {
288 match TokioMdns::new(mdns::Config::default(), local_peer_id) {
289 Ok(mdns) => Toggle::from(Some(mdns)),
290 Err(err) => {
291 warn!(target: LOG_TARGET, "Failed to initialize mDNS: {:?}", err);
292 Toggle::from(None)
293 },
294 }
295 } else {
296 Toggle::from(None)
297 },
298 allow_non_globals_in_dht,
299 known_external_addresses: LruHashSet::new(
300 NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
301 .expect("value is a constant; constant is non-zero; qed."),
302 ),
303 records_to_publish: Default::default(),
304 kademlia_protocol,
305 provider_keys_requested: HashMap::new(),
306 }
307 }
308}
309
310pub struct DiscoveryBehaviour {
312 permanent_addresses: Vec<(PeerId, Multiaddr)>,
315 ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
318 kademlia: Toggle<Kademlia<MemoryStore>>,
321 mdns: Toggle<TokioMdns>,
323 next_kad_random_query: Option<Delay>,
326 duration_to_next_kad: Duration,
328 pending_events: VecDeque<DiscoveryOut>,
330 local_peer_id: PeerId,
332 num_connections: u64,
334 allow_private_ip: bool,
337 discovery_only_if_under_num: u64,
339 allow_non_globals_in_dht: bool,
341 known_external_addresses: LruHashSet<Multiaddr>,
343 records_to_publish: HashMap<QueryId, Record>,
349 kademlia_protocol: Option<StreamProtocol>,
354 provider_keys_requested: HashMap<QueryId, RecordKey>,
356}
357
358impl DiscoveryBehaviour {
359 pub fn known_peers(&mut self) -> HashSet<PeerId> {
361 let mut peers = HashSet::new();
362 if let Some(k) = self.kademlia.as_mut() {
363 for b in k.kbuckets() {
364 for e in b.iter() {
365 if !peers.contains(e.node.key.preimage()) {
366 peers.insert(*e.node.key.preimage());
367 }
368 }
369 }
370 }
371 peers
372 }
373
374 pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
380 let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
381 if addrs_list.contains(&addr) {
382 return;
383 }
384
385 if let Some(k) = self.kademlia.as_mut() {
386 k.add_address(&peer_id, addr.clone());
387 }
388
389 self.pending_events.push_back(DiscoveryOut::Discovered(peer_id));
390 addrs_list.push(addr);
391 }
392
393 pub fn add_self_reported_address(
399 &mut self,
400 peer_id: &PeerId,
401 supported_protocols: &[StreamProtocol],
402 addr: Multiaddr,
403 ) {
404 if let Some(kademlia) = self.kademlia.as_mut() {
405 if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
406 trace!(
407 target: LOG_TARGET,
408 "Ignoring self-reported non-global address {} from {}.", addr, peer_id
409 );
410 return;
411 }
412
413 if !supported_protocols.iter().any(|p| {
419 p == self
420 .kademlia_protocol
421 .as_ref()
422 .expect("kademlia protocol was checked above to be enabled; qed")
423 }) {
424 trace!(
425 target: LOG_TARGET,
426 "Ignoring self-reported address {} from {} as remote node is not part of the \
427 Kademlia DHT supported by the local node.", addr, peer_id,
428 );
429 return;
430 }
431
432 trace!(
433 target: LOG_TARGET,
434 "Adding self-reported address {} from {} to Kademlia DHT.",
435 addr, peer_id
436 );
437 kademlia.add_address(peer_id, addr.clone());
438 }
439 }
440
441 pub fn find_closest_peers(&mut self, target: PeerId) {
445 if let Some(k) = self.kademlia.as_mut() {
446 k.get_closest_peers(target);
447 }
448 }
449
450 pub fn get_value(&mut self, key: RecordKey) {
454 if let Some(k) = self.kademlia.as_mut() {
455 k.get_record(key.clone());
456 }
457 }
458
459 pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
464 if let Some(k) = self.kademlia.as_mut() {
465 if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
466 warn!(target: LOG_TARGET, "Libp2p => Failed to put record: {:?}", e);
467 self.pending_events
468 .push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
469 }
470 }
471 }
472
473 pub fn put_record_to(
477 &mut self,
478 record: Record,
479 peers: HashSet<sc_network_types::PeerId>,
480 update_local_storage: bool,
481 ) {
482 if let Some(kad) = self.kademlia.as_mut() {
483 if update_local_storage {
484 if let Err(_e) = kad.store_mut().put(record.clone()) {
485 warn!(target: LOG_TARGET, "Failed to update local starage");
486 }
487 }
488
489 if !peers.is_empty() {
490 kad.put_record_to(
491 record,
492 peers.into_iter().map(|peer_id| peer_id.into()),
493 Quorum::All,
494 );
495 }
496 }
497 }
498
499 pub fn start_providing(&mut self, key: RecordKey) {
501 if let Some(kad) = self.kademlia.as_mut() {
502 if let Err(e) = kad.start_providing(key.clone()) {
503 warn!(target: LOG_TARGET, "Libp2p => Failed to start providing {key:?}: {e}.");
504 self.pending_events
505 .push_back(DiscoveryOut::StartProvidingFailed(key, Duration::from_secs(0)));
506 }
507 }
508 }
509
510 pub fn stop_providing(&mut self, key: &RecordKey) {
512 if let Some(kad) = self.kademlia.as_mut() {
513 kad.stop_providing(key);
514 }
515 }
516
517 pub fn get_providers(&mut self, key: RecordKey) {
519 if let Some(kad) = self.kademlia.as_mut() {
520 let query_id = kad.get_providers(key.clone());
521 self.provider_keys_requested.insert(query_id, key);
522 }
523 }
524
525 pub fn store_record(
527 &mut self,
528 record_key: RecordKey,
529 record_value: Vec<u8>,
530 publisher: Option<PeerId>,
531 expires: Option<Instant>,
532 ) {
533 if let Some(k) = self.kademlia.as_mut() {
534 if let Err(err) = k.store_mut().put(Record {
535 key: record_key,
536 value: record_value,
537 publisher: publisher.map(|publisher| publisher.into()),
538 expires,
539 }) {
540 debug!(
541 target: LOG_TARGET,
542 "Failed to store record with key: {:?}",
543 err
544 );
545 }
546 }
547 }
548
549 pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
554 self.kademlia.as_mut().map(|kad| {
555 kad.kbuckets()
556 .map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
557 .collect()
558 })
559 }
560
561 pub fn num_kademlia_records(&mut self) -> Option<usize> {
563 self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
565 }
566
567 pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
569 self.kademlia
572 .as_mut()
573 .map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
574 }
575
576 pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
583 let ip = match addr.iter().next() {
584 Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
585 Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
586 Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) => {
587 return true
588 },
589 _ => return false,
590 };
591 ip.is_global()
592 }
593}
594
595#[derive(Debug)]
597pub enum DiscoveryOut {
598 Discovered(PeerId),
601
602 UnroutablePeer(PeerId),
609
610 ClosestPeersFound(PeerId, Vec<(PeerId, Vec<Multiaddr>)>, Duration),
614
615 ClosestPeersNotFound(PeerId, Duration),
617
618 ValueFound(PeerRecord, Duration),
622
623 PutRecordRequest(
625 RecordKey,
626 Vec<u8>,
627 Option<sc_network_types::PeerId>,
628 Option<std::time::Instant>,
629 ),
630
631 ValueNotFound(RecordKey, Duration),
635
636 ValuePut(RecordKey, Duration),
640
641 ValuePutFailed(RecordKey, Duration),
645
646 StartedProviding(RecordKey, Duration),
648
649 StartProvidingFailed(RecordKey, Duration),
651
652 ProvidersFound(RecordKey, HashSet<PeerId>, Duration),
654
655 NoMoreProviders(RecordKey, Duration),
657
658 ProvidersNotFound(RecordKey, Duration),
660
661 RandomKademliaStarted,
665}
666
667impl NetworkBehaviour for DiscoveryBehaviour {
668 type ConnectionHandler =
669 ToggleConnectionHandler<<Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler>;
670 type ToSwarm = DiscoveryOut;
671
672 fn handle_established_inbound_connection(
673 &mut self,
674 connection_id: ConnectionId,
675 peer: PeerId,
676 local_addr: &Multiaddr,
677 remote_addr: &Multiaddr,
678 ) -> Result<THandler<Self>, ConnectionDenied> {
679 self.kademlia.handle_established_inbound_connection(
680 connection_id,
681 peer,
682 local_addr,
683 remote_addr,
684 )
685 }
686
687 fn handle_established_outbound_connection(
688 &mut self,
689 connection_id: ConnectionId,
690 peer: PeerId,
691 addr: &Multiaddr,
692 role_override: Endpoint,
693 port_use: PortUse,
694 ) -> Result<THandler<Self>, ConnectionDenied> {
695 self.kademlia.handle_established_outbound_connection(
696 connection_id,
697 peer,
698 addr,
699 role_override,
700 port_use,
701 )
702 }
703
704 fn handle_pending_inbound_connection(
705 &mut self,
706 connection_id: ConnectionId,
707 local_addr: &Multiaddr,
708 remote_addr: &Multiaddr,
709 ) -> Result<(), ConnectionDenied> {
710 self.kademlia
711 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
712 }
713
714 fn handle_pending_outbound_connection(
715 &mut self,
716 connection_id: ConnectionId,
717 maybe_peer: Option<PeerId>,
718 addresses: &[Multiaddr],
719 effective_role: Endpoint,
720 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
721 let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
722
723 let mut list: LinkedHashSet<_> = self
728 .permanent_addresses
729 .iter()
730 .filter_map(|(p, a)| (*p == peer_id).then(|| a.clone()))
731 .collect();
732
733 if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
734 ephemeral_addresses.iter().for_each(|address| {
735 list.insert_if_absent(address.clone());
736 });
737 }
738
739 {
740 let mut list_to_filter = self.kademlia.handle_pending_outbound_connection(
741 connection_id,
742 maybe_peer,
743 addresses,
744 effective_role,
745 )?;
746
747 list_to_filter.extend(self.mdns.handle_pending_outbound_connection(
748 connection_id,
749 maybe_peer,
750 addresses,
751 effective_role,
752 )?);
753
754 if !self.allow_private_ip {
755 list_to_filter.retain(|addr| match addr.iter().next() {
756 Some(Protocol::Ip4(addr)) if !IpNetwork::from(addr).is_global() => false,
757 Some(Protocol::Ip6(addr)) if !IpNetwork::from(addr).is_global() => false,
758 _ => true,
759 });
760 }
761
762 list_to_filter.into_iter().for_each(|address| {
763 list.insert_if_absent(address);
764 });
765 }
766
767 trace!(target: LOG_TARGET, "Addresses of {:?}: {:?}", peer_id, list);
768
769 Ok(list.into_iter().collect())
770 }
771
772 fn on_swarm_event(&mut self, event: FromSwarm) {
773 match event {
774 FromSwarm::ConnectionEstablished(e) => {
775 self.num_connections += 1;
776 self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
777 },
778 FromSwarm::ConnectionClosed(e) => {
779 self.num_connections -= 1;
780 self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
781 },
782 FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
783 if let Some(peer_id) = peer_id {
784 if let DialError::Transport(errors) = error {
785 if let Entry::Occupied(mut entry) = self.ephemeral_addresses.entry(peer_id)
786 {
787 for (addr, _error) in errors {
788 entry.get_mut().retain(|a| a != addr);
789 }
790 if entry.get().is_empty() {
791 entry.remove();
792 }
793 }
794 }
795 }
796
797 self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
798 },
799 FromSwarm::ListenerClosed(e) => {
800 self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
801 },
802 FromSwarm::ListenFailure(e) => {
803 self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
804 },
805 FromSwarm::ListenerError(e) => {
806 self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
807 },
808 FromSwarm::ExternalAddrExpired(e) => {
809 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
813 },
814 FromSwarm::NewListener(e) => {
815 self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
816 },
817 FromSwarm::ExpiredListenAddr(e) => {
818 self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
819 },
820 FromSwarm::NewExternalAddrCandidate(e) => {
821 self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
822 },
823 FromSwarm::AddressChange(e) => {
824 self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
825 },
826 FromSwarm::NewListenAddr(e) => {
827 self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
828 self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
829 },
830 FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
831 let mut address = addr.clone();
832
833 if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
834 if peer_id != self.local_peer_id {
835 warn!(
836 target: LOG_TARGET,
837 "🔍 Discovered external address for a peer that is not us: {addr}",
838 );
839 return;
841 }
842 } else {
843 address.push(Protocol::P2p(self.local_peer_id));
844 }
845
846 if Self::can_add_to_dht(&address) {
847 if self.known_external_addresses.insert(address.clone()) {
850 info!(
851 target: LOG_TARGET,
852 "🔍 Discovered new external address for our node: {address}",
853 );
854 }
855 }
856
857 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
858 },
859 FromSwarm::NewExternalAddrOfPeer(e) => {
860 self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
861 self.mdns.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
862 },
863 event => {
864 debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
865 self.kademlia.on_swarm_event(event);
866 self.mdns.on_swarm_event(event);
867 },
868 }
869 }
870
871 fn on_connection_handler_event(
872 &mut self,
873 peer_id: PeerId,
874 connection_id: ConnectionId,
875 event: THandlerOutEvent<Self>,
876 ) {
877 self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
878 }
879
880 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
881 if let Some(ev) = self.pending_events.pop_front() {
883 return Poll::Ready(ToSwarm::GenerateEvent(ev));
884 }
885
886 if let Some(kademlia) = self.kademlia.as_mut() {
888 if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
889 while next_kad_random_query.poll_unpin(cx).is_ready() {
890 let actually_started =
891 if self.num_connections < self.discovery_only_if_under_num {
892 let random_peer_id = PeerId::random();
893 debug!(
894 target: LOG_TARGET,
895 "Libp2p <= Starting random Kademlia request for {:?}",
896 random_peer_id,
897 );
898 kademlia.get_closest_peers(random_peer_id);
899 true
900 } else {
901 debug!(
902 target: LOG_TARGET,
903 "Kademlia paused due to high number of connections ({})",
904 self.num_connections
905 );
906 false
907 };
908
909 *next_kad_random_query = Delay::new(self.duration_to_next_kad);
912 self.duration_to_next_kad =
913 cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
914
915 if actually_started {
916 let ev = DiscoveryOut::RandomKademliaStarted;
917 return Poll::Ready(ToSwarm::GenerateEvent(ev));
918 }
919 }
920 }
921 }
922
923 while let Poll::Ready(ev) = self.kademlia.poll(cx) {
924 match ev {
925 ToSwarm::GenerateEvent(ev) => match ev {
926 KademliaEvent::RoutingUpdated { peer, .. } => {
927 let ev = DiscoveryOut::Discovered(peer);
928 return Poll::Ready(ToSwarm::GenerateEvent(ev));
929 },
930 KademliaEvent::UnroutablePeer { peer, .. } => {
931 let ev = DiscoveryOut::UnroutablePeer(peer);
932 return Poll::Ready(ToSwarm::GenerateEvent(ev));
933 },
934 KademliaEvent::RoutablePeer { .. } => {
935 },
938 KademliaEvent::PendingRoutablePeer { .. } => {
939 },
941 KademliaEvent::InboundRequest { request } => match request {
942 libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } => {
943 return Poll::Ready(ToSwarm::GenerateEvent(
944 DiscoveryOut::PutRecordRequest(
945 record.key,
946 record.value,
947 record.publisher.map(Into::into),
948 record.expires,
949 ),
950 ))
951 },
952 _ => {},
953 },
954 KademliaEvent::OutboundQueryProgressed {
955 result: QueryResult::GetClosestPeers(res),
956 stats,
957 ..
958 } => {
959 let (key, peers, timeout) = match res {
960 Ok(GetClosestPeersOk { key, peers }) => (key, peers, false),
961 Err(GetClosestPeersError::Timeout { key, peers }) => (key, peers, true),
962 };
963
964 let target = match PeerId::from_bytes(&key.clone()) {
965 Ok(peer_id) => peer_id,
966 Err(_) => {
967 warn!(
968 target: LOG_TARGET,
969 "Libp2p => FIND_NODE query finished for target that is not \
970 a peer ID: {:?}",
971 HexDisplay::from(&key),
972 );
973 continue;
974 },
975 };
976
977 if timeout {
978 debug!(
979 target: LOG_TARGET,
980 "Libp2p => Query for target {target:?} timed out and yielded {} peers",
981 peers.len(),
982 );
983 } else {
984 debug!(
985 target: LOG_TARGET,
986 "Libp2p => Query for target {target:?} yielded {} peers",
987 peers.len(),
988 );
989 }
990
991 let ev = if peers.is_empty() {
992 DiscoveryOut::ClosestPeersNotFound(
993 target,
994 stats.duration().unwrap_or_default(),
995 )
996 } else {
997 DiscoveryOut::ClosestPeersFound(
998 target,
999 peers.into_iter().map(|p| (p.peer_id, p.addrs)).collect(),
1000 stats.duration().unwrap_or_default(),
1001 )
1002 };
1003
1004 return Poll::Ready(ToSwarm::GenerateEvent(ev));
1005 },
1006 KademliaEvent::OutboundQueryProgressed {
1007 result: QueryResult::GetRecord(res),
1008 stats,
1009 id,
1010 ..
1011 } => {
1012 let ev = match res {
1013 Ok(GetRecordOk::FoundRecord(r)) => {
1014 debug!(
1015 target: LOG_TARGET,
1016 "Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
1017 r.record.key,
1018 r.record.value,
1019 id,
1020 stats,
1021 );
1022
1023 if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
1030 if let Some(kad) = self.kademlia.as_mut() {
1031 if let Some(mut query) = kad.query_mut(&id) {
1032 query.finish();
1033 }
1034 }
1035 }
1036
1037 self.records_to_publish.insert(id, r.record.clone());
1040
1041 DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
1042 },
1043 Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
1044 cache_candidates,
1045 }) => {
1046 debug!(
1047 target: LOG_TARGET,
1048 "Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
1049 id,
1050 stats,
1051 stats.duration().map(|val| val.as_millis())
1052 );
1053 if let Some(record) = self.records_to_publish.remove(&id) {
1055 if cache_candidates.is_empty() {
1056 continue;
1057 }
1058
1059 if let Some(kad) = self.kademlia.as_mut() {
1062 kad.put_record_to(
1063 record,
1064 cache_candidates.into_iter().map(|v| v.1),
1065 Quorum::One,
1066 );
1067 }
1068 }
1069
1070 continue;
1071 },
1072 Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
1073 trace!(
1074 target: LOG_TARGET,
1075 "Libp2p => Failed to get record: {:?}",
1076 e,
1077 );
1078 DiscoveryOut::ValueNotFound(
1079 e.into_key(),
1080 stats.duration().unwrap_or_default(),
1081 )
1082 },
1083 Err(e) => {
1084 debug!(
1085 target: LOG_TARGET,
1086 "Libp2p => Failed to get record: {:?}",
1087 e,
1088 );
1089 DiscoveryOut::ValueNotFound(
1090 e.into_key(),
1091 stats.duration().unwrap_or_default(),
1092 )
1093 },
1094 };
1095 return Poll::Ready(ToSwarm::GenerateEvent(ev));
1096 },
1097 KademliaEvent::OutboundQueryProgressed {
1098 result: QueryResult::GetProviders(res),
1099 stats,
1100 id,
1101 ..
1102 } => {
1103 let ev = match res {
1104 Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1105 debug!(
1106 target: LOG_TARGET,
1107 "Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}",
1108 providers,
1109 key,
1110 id,
1111 stats,
1112 );
1113
1114 DiscoveryOut::ProvidersFound(
1115 key,
1116 providers,
1117 stats.duration().unwrap_or_default(),
1118 )
1119 },
1120 Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
1121 closest_peers: _,
1122 }) => {
1123 debug!(
1124 target: LOG_TARGET,
1125 "Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms",
1126 id,
1127 stats,
1128 stats.duration().map(|val| val.as_millis())
1129 );
1130
1131 if let Some(key) = self.provider_keys_requested.remove(&id) {
1132 DiscoveryOut::NoMoreProviders(
1133 key,
1134 stats.duration().unwrap_or_default(),
1135 )
1136 } else {
1137 error!(
1138 target: LOG_TARGET,
1139 "No key found for `GET_PROVIDERS` query {id:?}. This is a bug.",
1140 );
1141 continue;
1142 }
1143 },
1144 Err(GetProvidersError::Timeout { key, closest_peers: _ }) => {
1145 debug!(
1146 target: LOG_TARGET,
1147 "Libp2p => Failed to get providers for {key:?} due to timeout.",
1148 );
1149
1150 self.provider_keys_requested.remove(&id);
1151
1152 DiscoveryOut::ProvidersNotFound(
1153 key,
1154 stats.duration().unwrap_or_default(),
1155 )
1156 },
1157 };
1158 return Poll::Ready(ToSwarm::GenerateEvent(ev));
1159 },
1160 KademliaEvent::OutboundQueryProgressed {
1161 result: QueryResult::PutRecord(res),
1162 stats,
1163 ..
1164 } => {
1165 let ev = match res {
1166 Ok(ok) => {
1167 trace!(
1168 target: LOG_TARGET,
1169 "Libp2p => Put record for key: {:?}",
1170 ok.key,
1171 );
1172 DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default())
1173 },
1174 Err(e) => {
1175 debug!(
1176 target: LOG_TARGET,
1177 "Libp2p => Failed to put record for key {:?}: {:?}",
1178 e.key(),
1179 e,
1180 );
1181 DiscoveryOut::ValuePutFailed(
1182 e.into_key(),
1183 stats.duration().unwrap_or_default(),
1184 )
1185 },
1186 };
1187 return Poll::Ready(ToSwarm::GenerateEvent(ev));
1188 },
1189 KademliaEvent::OutboundQueryProgressed {
1190 result: QueryResult::RepublishRecord(res),
1191 ..
1192 } => match res {
1193 Ok(ok) => debug!(
1194 target: LOG_TARGET,
1195 "Libp2p => Record republished: {:?}",
1196 ok.key,
1197 ),
1198 Err(e) => debug!(
1199 target: LOG_TARGET,
1200 "Libp2p => Republishing of record {:?} failed with: {:?}",
1201 e.key(), e,
1202 ),
1203 },
1204 KademliaEvent::OutboundQueryProgressed {
1205 result: QueryResult::StartProviding(res),
1206 stats,
1207 ..
1208 } => {
1209 let ev = match res {
1210 Ok(ok) => {
1211 trace!(
1212 target: LOG_TARGET,
1213 "Libp2p => Started providing key {:?}",
1214 ok.key,
1215 );
1216 DiscoveryOut::StartedProviding(
1217 ok.key,
1218 stats.duration().unwrap_or_default(),
1219 )
1220 },
1221 Err(e) => {
1222 debug!(
1223 target: LOG_TARGET,
1224 "Libp2p => Failed to start providing key {:?}: {:?}",
1225 e.key(),
1226 e,
1227 );
1228 DiscoveryOut::StartProvidingFailed(
1229 e.into_key(),
1230 stats.duration().unwrap_or_default(),
1231 )
1232 },
1233 };
1234 return Poll::Ready(ToSwarm::GenerateEvent(ev));
1235 },
1236 KademliaEvent::OutboundQueryProgressed {
1237 result: QueryResult::Bootstrap(res),
1238 ..
1239 } => match res {
1240 Ok(ok) => debug!(
1241 target: LOG_TARGET,
1242 "Libp2p => DHT bootstrap progressed: {ok:?}",
1243 ),
1244 Err(e) => warn!(
1245 target: LOG_TARGET,
1246 "Libp2p => DHT bootstrap error: {e:?}",
1247 ),
1248 },
1249 KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
1251 warn!(target: LOG_TARGET, "Libp2p => Unhandled Kademlia event: {:?}", e)
1252 },
1253 Event::ModeChanged { new_mode } => {
1254 debug!(target: LOG_TARGET, "Libp2p => Kademlia mode changed: {new_mode}")
1255 },
1256 },
1257 ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
1258 event => {
1259 return Poll::Ready(event.map_out(|_| {
1260 unreachable!("`GenerateEvent` is handled in a branch above; qed")
1261 }));
1262 },
1263 }
1264 }
1265
1266 while let Poll::Ready(ev) = self.mdns.poll(cx) {
1268 match ev {
1269 ToSwarm::GenerateEvent(event) => match event {
1270 mdns::Event::Discovered(list) => {
1271 if self.num_connections >= self.discovery_only_if_under_num {
1272 continue;
1273 }
1274
1275 self.pending_events.extend(
1276 list.into_iter().map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)),
1277 );
1278 if let Some(ev) = self.pending_events.pop_front() {
1279 return Poll::Ready(ToSwarm::GenerateEvent(ev));
1280 }
1281 },
1282 mdns::Event::Expired(_) => {},
1283 },
1284 ToSwarm::Dial { .. } => {
1285 unreachable!("mDNS never dials!");
1286 },
1287 ToSwarm::NotifyHandler { event, .. } => match event {},
1289 event => {
1290 return Poll::Ready(
1291 event
1292 .map_in(|_| {
1293 unreachable!("`NotifyHandler` is handled in a branch above; qed")
1294 })
1295 .map_out(|_| {
1296 unreachable!("`GenerateEvent` is handled in a branch above; qed")
1297 }),
1298 );
1299 },
1300 }
1301 }
1302
1303 Poll::Pending
1304 }
1305}
1306
1307fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol {
1309 let name = format!("/{}/kad", id.as_ref());
1310 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1311}
1312
1313fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
1315 genesis_hash: Hash,
1316 fork_id: Option<&str>,
1317) -> StreamProtocol {
1318 let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
1319 let name = if let Some(fork_id) = fork_id {
1320 format!("/{genesis_hash_hex}/{fork_id}/kad")
1321 } else {
1322 format!("/{genesis_hash_hex}/kad")
1323 };
1324
1325 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1326}
1327
1328#[cfg(test)]
1329mod tests {
1330 use super::{kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig};
1331 use crate::config::ProtocolId;
1332 use libp2p::{identity::Keypair, Multiaddr};
1333 use sp_core::hash::H256;
1334
1335 #[cfg(ignore_flaky_test)] #[tokio::test]
1337 async fn discovery_working() {
1338 use super::DiscoveryOut;
1339 use futures::prelude::*;
1340 use libp2p::{
1341 core::{
1342 transport::{MemoryTransport, Transport},
1343 upgrade,
1344 },
1345 noise,
1346 swarm::{Swarm, SwarmEvent},
1347 yamux,
1348 };
1349 use std::{collections::HashSet, task::Poll, time::Duration};
1350 let mut first_swarm_peer_id_and_addr = None;
1351
1352 let genesis_hash = H256::from_low_u64_be(1);
1353 let fork_id = Some("test-fork-id");
1354 let protocol_id = ProtocolId::from("dot");
1355
1356 let mut swarms = (0..25)
1359 .map(|i| {
1360 let mut swarm = libp2p::SwarmBuilder::with_new_identity()
1361 .with_tokio()
1362 .with_other_transport(|keypair| {
1363 MemoryTransport::new()
1364 .upgrade(upgrade::Version::V1)
1365 .authenticate(noise::Config::new(&keypair).unwrap())
1366 .multiplex(yamux::Config::default())
1367 .boxed()
1368 })
1369 .unwrap()
1370 .with_behaviour(|keypair| {
1371 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1372 config
1373 .with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
1374 .allow_private_ip(true)
1375 .allow_non_globals_in_dht(true)
1376 .discovery_limit(50)
1377 .with_kademlia(genesis_hash, fork_id, &protocol_id);
1378
1379 config.finish()
1380 })
1381 .unwrap()
1382 .with_swarm_config(|config| {
1383 config.with_idle_connection_timeout(Duration::from_secs(10))
1385 })
1386 .build();
1387
1388 let listen_addr: Multiaddr =
1389 format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1390
1391 if i == 0 {
1392 first_swarm_peer_id_and_addr =
1393 Some((*swarm.local_peer_id(), listen_addr.clone()))
1394 }
1395
1396 swarm.listen_on(listen_addr.clone()).unwrap();
1397 (swarm, listen_addr)
1398 })
1399 .collect::<Vec<_>>();
1400
1401 let mut to_discover = (0..swarms.len())
1403 .map(|n| {
1404 (0..swarms.len())
1405 .skip(1)
1407 .filter(|p| *p != n)
1408 .map(|p| *Swarm::local_peer_id(&swarms[p].0))
1409 .collect::<HashSet<_>>()
1410 })
1411 .collect::<Vec<_>>();
1412
1413 let fut = futures::future::poll_fn(move |cx| {
1414 'polling: loop {
1415 for swarm_n in 0..swarms.len() {
1416 match swarms[swarm_n].0.poll_next_unpin(cx) {
1417 Poll::Ready(Some(e)) => {
1418 match e {
1419 SwarmEvent::Behaviour(behavior) => {
1420 match behavior {
1421 DiscoveryOut::UnroutablePeer(other) |
1422 DiscoveryOut::Discovered(other) => {
1423 let addr = swarms
1426 .iter()
1427 .find_map(|(s, a)| {
1428 if s.behaviour().local_peer_id == other {
1429 Some(a.clone())
1430 } else {
1431 None
1432 }
1433 })
1434 .unwrap();
1435 let protocol_names = if swarm_n % 2 == 0 {
1438 vec![kademlia_protocol_name(genesis_hash, fork_id)]
1439 } else {
1440 vec![
1441 legacy_kademlia_protocol_name(&protocol_id),
1442 kademlia_protocol_name(genesis_hash, fork_id),
1443 ]
1444 };
1445 swarms[swarm_n]
1446 .0
1447 .behaviour_mut()
1448 .add_self_reported_address(
1449 &other,
1450 protocol_names.as_slice(),
1451 addr,
1452 );
1453
1454 to_discover[swarm_n].remove(&other);
1455 },
1456 DiscoveryOut::RandomKademliaStarted => {},
1457 DiscoveryOut::ClosestPeersFound(..) => {},
1458 DiscoveryOut::ClosestPeersNotFound(..) => {},
1461 e => {
1462 panic!("Unexpected event: {:?}", e)
1463 },
1464 }
1465 },
1466 _ => {},
1468 }
1469 continue 'polling;
1470 },
1471 _ => {},
1472 }
1473 }
1474 break;
1475 }
1476
1477 if to_discover.iter().all(|l| l.is_empty()) {
1478 Poll::Ready(())
1479 } else {
1480 Poll::Pending
1481 }
1482 });
1483
1484 fut.await
1485 }
1486
1487 #[test]
1488 fn discovery_ignores_peers_with_unknown_protocols() {
1489 let supported_genesis_hash = H256::from_low_u64_be(1);
1490 let unsupported_genesis_hash = H256::from_low_u64_be(2);
1491 let supported_protocol_id = ProtocolId::from("a");
1492 let unsupported_protocol_id = ProtocolId::from("b");
1493
1494 let mut discovery = {
1495 let keypair = Keypair::generate_ed25519();
1496 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1497 config
1498 .allow_private_ip(true)
1499 .allow_non_globals_in_dht(true)
1500 .discovery_limit(50)
1501 .with_kademlia(supported_genesis_hash, None, &supported_protocol_id);
1502 config.finish()
1503 };
1504
1505 let predictable_peer_id = |bytes: &[u8; 32]| {
1506 Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id()
1507 };
1508
1509 let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001");
1510 let remote_addr: Multiaddr = "/memory/1".parse().unwrap();
1511 let another_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1512 let another_addr: Multiaddr = "/memory/2".parse().unwrap();
1513
1514 discovery.add_self_reported_address(
1516 &remote_peer_id,
1517 &[kademlia_protocol_name(unsupported_genesis_hash, None)],
1518 remote_addr.clone(),
1519 );
1520 discovery.add_self_reported_address(
1521 &another_peer_id,
1522 &[legacy_kademlia_protocol_name(&unsupported_protocol_id)],
1523 another_addr.clone(),
1524 );
1525
1526 {
1527 let kademlia = discovery.kademlia.as_mut().unwrap();
1528 assert!(
1529 kademlia
1530 .kbucket(remote_peer_id)
1531 .expect("Remote peer id not to be equal to local peer id.")
1532 .is_empty(),
1533 "Expect peer with unsupported protocol not to be added."
1534 );
1535 assert!(
1536 kademlia
1537 .kbucket(another_peer_id)
1538 .expect("Remote peer id not to be equal to local peer id.")
1539 .is_empty(),
1540 "Expect peer with unsupported protocol not to be added."
1541 );
1542 }
1543
1544 discovery.add_self_reported_address(
1546 &remote_peer_id,
1547 &[kademlia_protocol_name(supported_genesis_hash, None)],
1548 remote_addr.clone(),
1549 );
1550 {
1551 let kademlia = discovery.kademlia.as_mut().unwrap();
1552 assert!(
1553 !kademlia
1554 .kbucket(remote_peer_id)
1555 .expect("Remote peer id not to be equal to local peer id.")
1556 .is_empty(),
1557 "Expect peer with supported protocol to be added."
1558 );
1559 }
1560
1561 let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1562 let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();
1563
1564 {
1566 let kademlia = discovery.kademlia.as_mut().unwrap();
1567 assert!(
1568 kademlia
1569 .kbucket(unsupported_peer_id)
1570 .expect("Remote peer id not to be equal to local peer id.")
1571 .is_empty(),
1572 "Expect unsupported peer not to be added."
1573 );
1574 }
1575 discovery.add_self_reported_address(
1578 &unsupported_peer_id,
1579 &[legacy_kademlia_protocol_name(&supported_protocol_id)],
1580 unsupported_peer_addr.clone(),
1581 );
1582 {
1583 let kademlia = discovery.kademlia.as_mut().unwrap();
1584 assert!(
1585 kademlia
1586 .kbucket(unsupported_peer_id)
1587 .expect("Remote peer id not to be equal to local peer id.")
1588 .is_empty(),
1589 "Expect unsupported peer not to be added."
1590 );
1591 }
1592
1593 discovery.add_self_reported_address(
1595 &another_peer_id,
1596 &[
1597 legacy_kademlia_protocol_name(&supported_protocol_id),
1598 kademlia_protocol_name(supported_genesis_hash, None),
1599 ],
1600 another_addr.clone(),
1601 );
1602
1603 {
1604 let kademlia = discovery.kademlia.as_mut().unwrap();
1605 assert_eq!(
1606 2,
1607 kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
1608 "Expect peers with supported protocol to be added."
1609 );
1610 assert!(
1611 !kademlia
1612 .kbucket(another_peer_id)
1613 .expect("Remote peer id not to be equal to local peer id.")
1614 .is_empty(),
1615 "Expect peer with supported protocol to be added."
1616 );
1617 }
1618 }
1619}