1use crate::{config::ProtocolId, utils::LruHashSet};
50
51use array_bytes::bytes2hex;
52use futures::prelude::*;
53use futures_timer::Delay;
54use ip_network::IpNetwork;
55use libp2p::{
56 core::{transport::PortUse, Endpoint, Multiaddr},
57 kad::{
58 self,
59 store::{MemoryStore, RecordStore},
60 Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
61 Event, GetClosestPeersError, GetProvidersError, GetProvidersOk, GetRecordOk, PeerRecord,
62 QueryId, QueryResult, Quorum, Record, RecordKey,
63 },
64 mdns::{self, tokio::Behaviour as TokioMdns},
65 multiaddr::Protocol,
66 swarm::{
67 behaviour::{
68 toggle::{Toggle, ToggleConnectionHandler},
69 DialFailure, ExternalAddrConfirmed, FromSwarm,
70 },
71 ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, THandler,
72 THandlerInEvent, THandlerOutEvent, ToSwarm,
73 },
74 PeerId,
75};
76use linked_hash_set::LinkedHashSet;
77use log::{debug, info, trace, warn};
78use sp_core::hexdisplay::HexDisplay;
79use std::{
80 cmp,
81 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
82 num::NonZeroUsize,
83 task::{Context, Poll},
84 time::{Duration, Instant},
85};
86
87const LOG_TARGET: &str = "sub-libp2p::discovery";
89
90const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
94
95pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
98
99const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
101
102pub struct DiscoveryConfig {
108 local_peer_id: PeerId,
109 permanent_addresses: Vec<(PeerId, Multiaddr)>,
110 dht_random_walk: bool,
111 allow_private_ip: bool,
112 allow_non_globals_in_dht: bool,
113 discovery_only_if_under_num: u64,
114 enable_mdns: bool,
115 kademlia_disjoint_query_paths: bool,
116 kademlia_protocol: Option<StreamProtocol>,
117 kademlia_legacy_protocol: Option<StreamProtocol>,
118 kademlia_replication_factor: NonZeroUsize,
119}
120
121impl DiscoveryConfig {
122 pub fn new(local_peer_id: PeerId) -> Self {
124 Self {
125 local_peer_id,
126 permanent_addresses: Vec::new(),
127 dht_random_walk: true,
128 allow_private_ip: true,
129 allow_non_globals_in_dht: false,
130 discovery_only_if_under_num: std::u64::MAX,
131 enable_mdns: false,
132 kademlia_disjoint_query_paths: false,
133 kademlia_protocol: None,
134 kademlia_legacy_protocol: None,
135 kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
136 .expect("value is a constant; constant is non-zero; qed."),
137 }
138 }
139
140 pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
142 self.discovery_only_if_under_num = limit;
143 self
144 }
145
146 pub fn with_permanent_addresses<I>(&mut self, permanent_addresses: I) -> &mut Self
148 where
149 I: IntoIterator<Item = (PeerId, Multiaddr)>,
150 {
151 self.permanent_addresses.extend(permanent_addresses);
152 self
153 }
154
155 pub fn with_dht_random_walk(&mut self, value: bool) -> &mut Self {
158 self.dht_random_walk = value;
159 self
160 }
161
162 pub fn allow_private_ip(&mut self, value: bool) -> &mut Self {
164 self.allow_private_ip = value;
165 self
166 }
167
168 pub fn allow_non_globals_in_dht(&mut self, value: bool) -> &mut Self {
170 self.allow_non_globals_in_dht = value;
171 self
172 }
173
174 pub fn with_mdns(&mut self, value: bool) -> &mut Self {
176 self.enable_mdns = value;
177 self
178 }
179
180 pub fn with_kademlia<Hash: AsRef<[u8]>>(
185 &mut self,
186 genesis_hash: Hash,
187 fork_id: Option<&str>,
188 protocol_id: &ProtocolId,
189 ) -> &mut Self {
190 self.kademlia_protocol = Some(kademlia_protocol_name(genesis_hash, fork_id));
191 self.kademlia_legacy_protocol = Some(legacy_kademlia_protocol_name(protocol_id));
192 self
193 }
194
195 pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
198 self.kademlia_disjoint_query_paths = value;
199 self
200 }
201
202 pub fn with_kademlia_replication_factor(&mut self, value: NonZeroUsize) -> &mut Self {
204 self.kademlia_replication_factor = value;
205 self
206 }
207
208 pub fn finish(self) -> DiscoveryBehaviour {
210 let Self {
211 local_peer_id,
212 permanent_addresses,
213 dht_random_walk,
214 allow_private_ip,
215 allow_non_globals_in_dht,
216 discovery_only_if_under_num,
217 enable_mdns,
218 kademlia_disjoint_query_paths,
219 kademlia_protocol,
220 kademlia_legacy_protocol: _,
221 kademlia_replication_factor,
222 } = self;
223
224 let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
225 let mut config = KademliaConfig::new(kademlia_protocol.clone());
226
227 config.set_replication_factor(kademlia_replication_factor);
228
229 config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
230
231 config.set_kbucket_inserts(BucketInserts::Manual);
235 config.disjoint_query_paths(kademlia_disjoint_query_paths);
236 let store = MemoryStore::new(local_peer_id);
237 let mut kad = Kademlia::with_config(local_peer_id, store, config);
238 kad.set_mode(Some(kad::Mode::Server));
239
240 for (peer_id, addr) in &permanent_addresses {
241 kad.add_address(peer_id, addr.clone());
242 }
243
244 Some(kad)
245 } else {
246 None
247 };
248
249 DiscoveryBehaviour {
250 permanent_addresses,
251 ephemeral_addresses: HashMap::new(),
252 kademlia: Toggle::from(kademlia),
253 next_kad_random_query: if dht_random_walk {
254 Some(Delay::new(Duration::new(0, 0)))
255 } else {
256 None
257 },
258 duration_to_next_kad: Duration::from_secs(1),
259 pending_events: VecDeque::new(),
260 local_peer_id,
261 num_connections: 0,
262 allow_private_ip,
263 discovery_only_if_under_num,
264 mdns: if enable_mdns {
265 match TokioMdns::new(mdns::Config::default(), local_peer_id) {
266 Ok(mdns) => Toggle::from(Some(mdns)),
267 Err(err) => {
268 warn!(target: LOG_TARGET, "Failed to initialize mDNS: {:?}", err);
269 Toggle::from(None)
270 },
271 }
272 } else {
273 Toggle::from(None)
274 },
275 allow_non_globals_in_dht,
276 known_external_addresses: LruHashSet::new(
277 NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
278 .expect("value is a constant; constant is non-zero; qed."),
279 ),
280 records_to_publish: Default::default(),
281 kademlia_protocol,
282 }
283 }
284}
285
286pub struct DiscoveryBehaviour {
288 permanent_addresses: Vec<(PeerId, Multiaddr)>,
291 ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
294 kademlia: Toggle<Kademlia<MemoryStore>>,
297 mdns: Toggle<TokioMdns>,
299 next_kad_random_query: Option<Delay>,
302 duration_to_next_kad: Duration,
304 pending_events: VecDeque<DiscoveryOut>,
306 local_peer_id: PeerId,
308 num_connections: u64,
310 allow_private_ip: bool,
313 discovery_only_if_under_num: u64,
315 allow_non_globals_in_dht: bool,
317 known_external_addresses: LruHashSet<Multiaddr>,
319 records_to_publish: HashMap<QueryId, Record>,
325 kademlia_protocol: Option<StreamProtocol>,
330}
331
332impl DiscoveryBehaviour {
333 pub fn known_peers(&mut self) -> HashSet<PeerId> {
335 let mut peers = HashSet::new();
336 if let Some(k) = self.kademlia.as_mut() {
337 for b in k.kbuckets() {
338 for e in b.iter() {
339 if !peers.contains(e.node.key.preimage()) {
340 peers.insert(*e.node.key.preimage());
341 }
342 }
343 }
344 }
345 peers
346 }
347
348 pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
354 let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
355 if addrs_list.contains(&addr) {
356 return
357 }
358
359 if let Some(k) = self.kademlia.as_mut() {
360 k.add_address(&peer_id, addr.clone());
361 }
362
363 self.pending_events.push_back(DiscoveryOut::Discovered(peer_id));
364 addrs_list.push(addr);
365 }
366
367 pub fn add_self_reported_address(
373 &mut self,
374 peer_id: &PeerId,
375 supported_protocols: &[StreamProtocol],
376 addr: Multiaddr,
377 ) {
378 if let Some(kademlia) = self.kademlia.as_mut() {
379 if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
380 trace!(
381 target: LOG_TARGET,
382 "Ignoring self-reported non-global address {} from {}.", addr, peer_id
383 );
384 return
385 }
386
387 if !supported_protocols.iter().any(|p| {
393 p == self
394 .kademlia_protocol
395 .as_ref()
396 .expect("kademlia protocol was checked above to be enabled; qed")
397 }) {
398 trace!(
399 target: LOG_TARGET,
400 "Ignoring self-reported address {} from {} as remote node is not part of the \
401 Kademlia DHT supported by the local node.", addr, peer_id,
402 );
403 return
404 }
405
406 trace!(
407 target: LOG_TARGET,
408 "Adding self-reported address {} from {} to Kademlia DHT.",
409 addr, peer_id
410 );
411 kademlia.add_address(peer_id, addr.clone());
412 }
413 }
414
415 pub fn get_value(&mut self, key: RecordKey) {
419 if let Some(k) = self.kademlia.as_mut() {
420 k.get_record(key.clone());
421 }
422 }
423
424 pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
429 if let Some(k) = self.kademlia.as_mut() {
430 if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
431 warn!(target: LOG_TARGET, "Libp2p => Failed to put record: {:?}", e);
432 self.pending_events
433 .push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
434 }
435 }
436 }
437
438 pub fn put_record_to(
442 &mut self,
443 record: Record,
444 peers: HashSet<sc_network_types::PeerId>,
445 update_local_storage: bool,
446 ) {
447 if let Some(kad) = self.kademlia.as_mut() {
448 if update_local_storage {
449 if let Err(_e) = kad.store_mut().put(record.clone()) {
450 warn!(target: LOG_TARGET, "Failed to update local starage");
451 }
452 }
453
454 if !peers.is_empty() {
455 kad.put_record_to(
456 record,
457 peers.into_iter().map(|peer_id| peer_id.into()),
458 Quorum::All,
459 );
460 }
461 }
462 }
463
464 pub fn start_providing(&mut self, key: RecordKey) {
466 if let Some(kad) = self.kademlia.as_mut() {
467 if let Err(e) = kad.start_providing(key.clone()) {
468 warn!(target: LOG_TARGET, "Libp2p => Failed to start providing {key:?}: {e}.");
469 self.pending_events.push_back(DiscoveryOut::StartProvidingFailed(key));
470 }
471 }
472 }
473
474 pub fn stop_providing(&mut self, key: &RecordKey) {
476 if let Some(kad) = self.kademlia.as_mut() {
477 kad.stop_providing(key);
478 }
479 }
480
481 pub fn get_providers(&mut self, key: RecordKey) {
483 if let Some(kad) = self.kademlia.as_mut() {
484 kad.get_providers(key);
485 }
486 }
487
488 pub fn store_record(
490 &mut self,
491 record_key: RecordKey,
492 record_value: Vec<u8>,
493 publisher: Option<PeerId>,
494 expires: Option<Instant>,
495 ) {
496 if let Some(k) = self.kademlia.as_mut() {
497 if let Err(err) = k.store_mut().put(Record {
498 key: record_key,
499 value: record_value,
500 publisher: publisher.map(|publisher| publisher.into()),
501 expires,
502 }) {
503 debug!(
504 target: LOG_TARGET,
505 "Failed to store record with key: {:?}",
506 err
507 );
508 }
509 }
510 }
511
512 pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
517 self.kademlia.as_mut().map(|kad| {
518 kad.kbuckets()
519 .map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
520 .collect()
521 })
522 }
523
524 pub fn num_kademlia_records(&mut self) -> Option<usize> {
526 self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
528 }
529
530 pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
532 self.kademlia
535 .as_mut()
536 .map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
537 }
538
539 pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
546 let ip = match addr.iter().next() {
547 Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
548 Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
549 Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
550 return true,
551 _ => return false,
552 };
553 ip.is_global()
554 }
555}
556
557#[derive(Debug)]
559pub enum DiscoveryOut {
560 Discovered(PeerId),
566
567 UnroutablePeer(PeerId),
574
575 ValueFound(PeerRecord, Duration),
579
580 PutRecordRequest(
582 RecordKey,
583 Vec<u8>,
584 Option<sc_network_types::PeerId>,
585 Option<std::time::Instant>,
586 ),
587
588 ValueNotFound(RecordKey, Duration),
592
593 ValuePut(RecordKey, Duration),
597
598 ValuePutFailed(RecordKey, Duration),
602
603 StartProvidingFailed(RecordKey),
605
606 ProvidersFound(RecordKey, HashSet<PeerId>, Duration),
608
609 ProvidersNotFound(RecordKey, Duration),
611
612 RandomKademliaStarted,
616}
617
618impl NetworkBehaviour for DiscoveryBehaviour {
619 type ConnectionHandler =
620 ToggleConnectionHandler<<Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler>;
621 type ToSwarm = DiscoveryOut;
622
623 fn handle_established_inbound_connection(
624 &mut self,
625 connection_id: ConnectionId,
626 peer: PeerId,
627 local_addr: &Multiaddr,
628 remote_addr: &Multiaddr,
629 ) -> Result<THandler<Self>, ConnectionDenied> {
630 self.kademlia.handle_established_inbound_connection(
631 connection_id,
632 peer,
633 local_addr,
634 remote_addr,
635 )
636 }
637
638 fn handle_established_outbound_connection(
639 &mut self,
640 connection_id: ConnectionId,
641 peer: PeerId,
642 addr: &Multiaddr,
643 role_override: Endpoint,
644 port_use: PortUse,
645 ) -> Result<THandler<Self>, ConnectionDenied> {
646 self.kademlia.handle_established_outbound_connection(
647 connection_id,
648 peer,
649 addr,
650 role_override,
651 port_use,
652 )
653 }
654
655 fn handle_pending_inbound_connection(
656 &mut self,
657 connection_id: ConnectionId,
658 local_addr: &Multiaddr,
659 remote_addr: &Multiaddr,
660 ) -> Result<(), ConnectionDenied> {
661 self.kademlia
662 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
663 }
664
665 fn handle_pending_outbound_connection(
666 &mut self,
667 connection_id: ConnectionId,
668 maybe_peer: Option<PeerId>,
669 addresses: &[Multiaddr],
670 effective_role: Endpoint,
671 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
672 let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
673
674 let mut list: LinkedHashSet<_> = self
679 .permanent_addresses
680 .iter()
681 .filter_map(|(p, a)| (*p == peer_id).then(|| a.clone()))
682 .collect();
683
684 if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
685 ephemeral_addresses.iter().for_each(|address| {
686 list.insert_if_absent(address.clone());
687 });
688 }
689
690 {
691 let mut list_to_filter = self.kademlia.handle_pending_outbound_connection(
692 connection_id,
693 maybe_peer,
694 addresses,
695 effective_role,
696 )?;
697
698 list_to_filter.extend(self.mdns.handle_pending_outbound_connection(
699 connection_id,
700 maybe_peer,
701 addresses,
702 effective_role,
703 )?);
704
705 if !self.allow_private_ip {
706 list_to_filter.retain(|addr| match addr.iter().next() {
707 Some(Protocol::Ip4(addr)) if !IpNetwork::from(addr).is_global() => false,
708 Some(Protocol::Ip6(addr)) if !IpNetwork::from(addr).is_global() => false,
709 _ => true,
710 });
711 }
712
713 list_to_filter.into_iter().for_each(|address| {
714 list.insert_if_absent(address);
715 });
716 }
717
718 trace!(target: LOG_TARGET, "Addresses of {:?}: {:?}", peer_id, list);
719
720 Ok(list.into_iter().collect())
721 }
722
723 fn on_swarm_event(&mut self, event: FromSwarm) {
724 match event {
725 FromSwarm::ConnectionEstablished(e) => {
726 self.num_connections += 1;
727 self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
728 },
729 FromSwarm::ConnectionClosed(e) => {
730 self.num_connections -= 1;
731 self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
732 },
733 FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
734 if let Some(peer_id) = peer_id {
735 if let DialError::Transport(errors) = error {
736 if let Entry::Occupied(mut entry) = self.ephemeral_addresses.entry(peer_id)
737 {
738 for (addr, _error) in errors {
739 entry.get_mut().retain(|a| a != addr);
740 }
741 if entry.get().is_empty() {
742 entry.remove();
743 }
744 }
745 }
746 }
747
748 self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
749 },
750 FromSwarm::ListenerClosed(e) => {
751 self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
752 },
753 FromSwarm::ListenFailure(e) => {
754 self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
755 },
756 FromSwarm::ListenerError(e) => {
757 self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
758 },
759 FromSwarm::ExternalAddrExpired(e) => {
760 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
764 },
765 FromSwarm::NewListener(e) => {
766 self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
767 },
768 FromSwarm::ExpiredListenAddr(e) => {
769 self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
770 },
771 FromSwarm::NewExternalAddrCandidate(e) => {
772 self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
773 },
774 FromSwarm::AddressChange(e) => {
775 self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
776 },
777 FromSwarm::NewListenAddr(e) => {
778 self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
779 self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
780 },
781 FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
782 let mut address = addr.clone();
783
784 if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
785 if peer_id != self.local_peer_id {
786 warn!(
787 target: LOG_TARGET,
788 "🔍 Discovered external address for a peer that is not us: {addr}",
789 );
790 return
792 }
793 } else {
794 address.push(Protocol::P2p(self.local_peer_id));
795 }
796
797 if Self::can_add_to_dht(&address) {
798 if self.known_external_addresses.insert(address.clone()) {
801 info!(
802 target: LOG_TARGET,
803 "🔍 Discovered new external address for our node: {address}",
804 );
805 }
806 }
807
808 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
809 },
810 event => {
811 debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
812 self.kademlia.on_swarm_event(event);
813 },
814 }
815 }
816
817 fn on_connection_handler_event(
818 &mut self,
819 peer_id: PeerId,
820 connection_id: ConnectionId,
821 event: THandlerOutEvent<Self>,
822 ) {
823 self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
824 }
825
826 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
827 if let Some(ev) = self.pending_events.pop_front() {
829 return Poll::Ready(ToSwarm::GenerateEvent(ev))
830 }
831
832 if let Some(kademlia) = self.kademlia.as_mut() {
834 if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
835 while next_kad_random_query.poll_unpin(cx).is_ready() {
836 let actually_started =
837 if self.num_connections < self.discovery_only_if_under_num {
838 let random_peer_id = PeerId::random();
839 debug!(
840 target: LOG_TARGET,
841 "Libp2p <= Starting random Kademlia request for {:?}",
842 random_peer_id,
843 );
844 kademlia.get_closest_peers(random_peer_id);
845 true
846 } else {
847 debug!(
848 target: LOG_TARGET,
849 "Kademlia paused due to high number of connections ({})",
850 self.num_connections
851 );
852 false
853 };
854
855 *next_kad_random_query = Delay::new(self.duration_to_next_kad);
858 self.duration_to_next_kad =
859 cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
860
861 if actually_started {
862 let ev = DiscoveryOut::RandomKademliaStarted;
863 return Poll::Ready(ToSwarm::GenerateEvent(ev))
864 }
865 }
866 }
867 }
868
869 while let Poll::Ready(ev) = self.kademlia.poll(cx) {
870 match ev {
871 ToSwarm::GenerateEvent(ev) => match ev {
872 KademliaEvent::RoutingUpdated { peer, .. } => {
873 let ev = DiscoveryOut::Discovered(peer);
874 return Poll::Ready(ToSwarm::GenerateEvent(ev))
875 },
876 KademliaEvent::UnroutablePeer { peer, .. } => {
877 let ev = DiscoveryOut::UnroutablePeer(peer);
878 return Poll::Ready(ToSwarm::GenerateEvent(ev))
879 },
880 KademliaEvent::RoutablePeer { peer, .. } => {
881 let ev = DiscoveryOut::Discovered(peer);
882 return Poll::Ready(ToSwarm::GenerateEvent(ev))
883 },
884 KademliaEvent::PendingRoutablePeer { .. } => {
885 },
887 KademliaEvent::InboundRequest { request } => match request {
888 libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } =>
889 return Poll::Ready(ToSwarm::GenerateEvent(
890 DiscoveryOut::PutRecordRequest(
891 record.key,
892 record.value,
893 record.publisher.map(Into::into),
894 record.expires,
895 ),
896 )),
897 _ => {},
898 },
899 KademliaEvent::OutboundQueryProgressed {
900 result: QueryResult::GetClosestPeers(res),
901 ..
902 } => match res {
903 Err(GetClosestPeersError::Timeout { key, peers }) => {
904 debug!(
905 target: LOG_TARGET,
906 "Libp2p => Query for {:?} timed out with {} results",
907 HexDisplay::from(&key), peers.len(),
908 );
909 },
910 Ok(ok) => {
911 trace!(
912 target: LOG_TARGET,
913 "Libp2p => Query for {:?} yielded {:?} results",
914 HexDisplay::from(&ok.key), ok.peers.len(),
915 );
916 if ok.peers.is_empty() && self.num_connections != 0 {
917 debug!(
918 target: LOG_TARGET,
919 "Libp2p => Random Kademlia query has yielded empty results",
920 );
921 }
922 },
923 },
924 KademliaEvent::OutboundQueryProgressed {
925 result: QueryResult::GetRecord(res),
926 stats,
927 id,
928 ..
929 } => {
930 let ev = match res {
931 Ok(GetRecordOk::FoundRecord(r)) => {
932 debug!(
933 target: LOG_TARGET,
934 "Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
935 r.record.key,
936 r.record.value,
937 id,
938 stats,
939 );
940
941 if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
948 if let Some(kad) = self.kademlia.as_mut() {
949 if let Some(mut query) = kad.query_mut(&id) {
950 query.finish();
951 }
952 }
953 }
954
955 self.records_to_publish.insert(id, r.record.clone());
958
959 DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
960 },
961 Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
962 cache_candidates,
963 }) => {
964 debug!(
965 target: LOG_TARGET,
966 "Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
967 id,
968 stats,
969 stats.duration().map(|val| val.as_millis())
970 );
971 if let Some(record) = self.records_to_publish.remove(&id) {
973 if cache_candidates.is_empty() {
974 continue
975 }
976
977 if let Some(kad) = self.kademlia.as_mut() {
980 kad.put_record_to(
981 record,
982 cache_candidates.into_iter().map(|v| v.1),
983 Quorum::One,
984 );
985 }
986 }
987
988 continue
989 },
990 Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
991 trace!(
992 target: LOG_TARGET,
993 "Libp2p => Failed to get record: {:?}",
994 e,
995 );
996 DiscoveryOut::ValueNotFound(
997 e.into_key(),
998 stats.duration().unwrap_or_default(),
999 )
1000 },
1001 Err(e) => {
1002 debug!(
1003 target: LOG_TARGET,
1004 "Libp2p => Failed to get record: {:?}",
1005 e,
1006 );
1007 DiscoveryOut::ValueNotFound(
1008 e.into_key(),
1009 stats.duration().unwrap_or_default(),
1010 )
1011 },
1012 };
1013 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1014 },
1015 KademliaEvent::OutboundQueryProgressed {
1016 result: QueryResult::GetProviders(res),
1017 stats,
1018 id,
1019 ..
1020 } => {
1021 let ev = match res {
1022 Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1023 debug!(
1024 target: LOG_TARGET,
1025 "Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}",
1026 providers,
1027 key,
1028 id,
1029 stats,
1030 );
1031
1032 DiscoveryOut::ProvidersFound(
1033 key,
1034 providers,
1035 stats.duration().unwrap_or_default(),
1036 )
1037 },
1038 Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
1039 closest_peers: _,
1040 }) => {
1041 debug!(
1042 target: LOG_TARGET,
1043 "Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms",
1044 id,
1045 stats,
1046 stats.duration().map(|val| val.as_millis())
1047 );
1048
1049 continue
1050 },
1051 Err(GetProvidersError::Timeout { key, closest_peers: _ }) => {
1052 debug!(
1053 target: LOG_TARGET,
1054 "Libp2p => Failed to get providers for {key:?} due to timeout.",
1055 );
1056
1057 DiscoveryOut::ProvidersNotFound(
1058 key,
1059 stats.duration().unwrap_or_default(),
1060 )
1061 },
1062 };
1063 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1064 },
1065 KademliaEvent::OutboundQueryProgressed {
1066 result: QueryResult::PutRecord(res),
1067 stats,
1068 ..
1069 } => {
1070 let ev = match res {
1071 Ok(ok) =>
1072 DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default()),
1073 Err(e) => {
1074 debug!(
1075 target: LOG_TARGET,
1076 "Libp2p => Failed to put record: {:?}",
1077 e,
1078 );
1079 DiscoveryOut::ValuePutFailed(
1080 e.into_key(),
1081 stats.duration().unwrap_or_default(),
1082 )
1083 },
1084 };
1085 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1086 },
1087 KademliaEvent::OutboundQueryProgressed {
1088 result: QueryResult::RepublishRecord(res),
1089 ..
1090 } => match res {
1091 Ok(ok) => debug!(
1092 target: LOG_TARGET,
1093 "Libp2p => Record republished: {:?}",
1094 ok.key,
1095 ),
1096 Err(e) => debug!(
1097 target: LOG_TARGET,
1098 "Libp2p => Republishing of record {:?} failed with: {:?}",
1099 e.key(), e,
1100 ),
1101 },
1102 KademliaEvent::OutboundQueryProgressed {
1103 result: QueryResult::Bootstrap(res),
1104 ..
1105 } => match res {
1106 Ok(ok) => debug!(
1107 target: LOG_TARGET,
1108 "Libp2p => DHT bootstrap progressed: {ok:?}",
1109 ),
1110 Err(e) => warn!(
1111 target: LOG_TARGET,
1112 "Libp2p => DHT bootstrap error: {e:?}",
1113 ),
1114 },
1115 KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
1117 warn!(target: LOG_TARGET, "Libp2p => Unhandled Kademlia event: {:?}", e)
1118 },
1119 Event::ModeChanged { new_mode } => {
1120 debug!(target: LOG_TARGET, "Libp2p => Kademlia mode changed: {new_mode}")
1121 },
1122 },
1123 ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
1124 event => {
1125 return Poll::Ready(event.map_out(|_| {
1126 unreachable!("`GenerateEvent` is handled in a branch above; qed")
1127 }));
1128 },
1129 }
1130 }
1131
1132 while let Poll::Ready(ev) = self.mdns.poll(cx) {
1134 match ev {
1135 ToSwarm::GenerateEvent(event) => match event {
1136 mdns::Event::Discovered(list) => {
1137 if self.num_connections >= self.discovery_only_if_under_num {
1138 continue
1139 }
1140
1141 self.pending_events.extend(
1142 list.into_iter().map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)),
1143 );
1144 if let Some(ev) = self.pending_events.pop_front() {
1145 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1146 }
1147 },
1148 mdns::Event::Expired(_) => {},
1149 },
1150 ToSwarm::Dial { .. } => {
1151 unreachable!("mDNS never dials!");
1152 },
1153 ToSwarm::NotifyHandler { event, .. } => match event {},
1155 event => {
1156 return Poll::Ready(
1157 event
1158 .map_in(|_| {
1159 unreachable!("`NotifyHandler` is handled in a branch above; qed")
1160 })
1161 .map_out(|_| {
1162 unreachable!("`GenerateEvent` is handled in a branch above; qed")
1163 }),
1164 );
1165 },
1166 }
1167 }
1168
1169 Poll::Pending
1170 }
1171}
1172
1173fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol {
1175 let name = format!("/{}/kad", id.as_ref());
1176 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1177}
1178
1179fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
1181 genesis_hash: Hash,
1182 fork_id: Option<&str>,
1183) -> StreamProtocol {
1184 let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
1185 let name = if let Some(fork_id) = fork_id {
1186 format!("/{genesis_hash_hex}/{fork_id}/kad")
1187 } else {
1188 format!("/{genesis_hash_hex}/kad")
1189 };
1190
1191 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196 use super::{
1197 kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig, DiscoveryOut,
1198 };
1199 use crate::config::ProtocolId;
1200 use futures::prelude::*;
1201 use libp2p::{
1202 core::{
1203 transport::{MemoryTransport, Transport},
1204 upgrade,
1205 },
1206 identity::Keypair,
1207 noise,
1208 swarm::{Swarm, SwarmEvent},
1209 yamux, Multiaddr,
1210 };
1211 use sp_core::hash::H256;
1212 use std::{collections::HashSet, task::Poll, time::Duration};
1213
1214 #[tokio::test]
1215 async fn discovery_working() {
1216 let mut first_swarm_peer_id_and_addr = None;
1217
1218 let genesis_hash = H256::from_low_u64_be(1);
1219 let fork_id = Some("test-fork-id");
1220 let protocol_id = ProtocolId::from("dot");
1221
1222 let mut swarms = (0..25)
1225 .map(|i| {
1226 let mut swarm = libp2p::SwarmBuilder::with_new_identity()
1227 .with_tokio()
1228 .with_other_transport(|keypair| {
1229 MemoryTransport::new()
1230 .upgrade(upgrade::Version::V1)
1231 .authenticate(noise::Config::new(&keypair).unwrap())
1232 .multiplex(yamux::Config::default())
1233 .boxed()
1234 })
1235 .unwrap()
1236 .with_behaviour(|keypair| {
1237 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1238 config
1239 .with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
1240 .allow_private_ip(true)
1241 .allow_non_globals_in_dht(true)
1242 .discovery_limit(50)
1243 .with_kademlia(genesis_hash, fork_id, &protocol_id);
1244
1245 config.finish()
1246 })
1247 .unwrap()
1248 .with_swarm_config(|config| {
1249 config.with_idle_connection_timeout(Duration::from_secs(10))
1251 })
1252 .build();
1253
1254 let listen_addr: Multiaddr =
1255 format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1256
1257 if i == 0 {
1258 first_swarm_peer_id_and_addr =
1259 Some((*swarm.local_peer_id(), listen_addr.clone()))
1260 }
1261
1262 swarm.listen_on(listen_addr.clone()).unwrap();
1263 (swarm, listen_addr)
1264 })
1265 .collect::<Vec<_>>();
1266
1267 let mut to_discover = (0..swarms.len())
1269 .map(|n| {
1270 (0..swarms.len())
1271 .skip(1)
1273 .filter(|p| *p != n)
1274 .map(|p| *Swarm::local_peer_id(&swarms[p].0))
1275 .collect::<HashSet<_>>()
1276 })
1277 .collect::<Vec<_>>();
1278
1279 let fut = futures::future::poll_fn(move |cx| {
1280 'polling: loop {
1281 for swarm_n in 0..swarms.len() {
1282 match swarms[swarm_n].0.poll_next_unpin(cx) {
1283 Poll::Ready(Some(e)) => {
1284 match e {
1285 SwarmEvent::Behaviour(behavior) => {
1286 match behavior {
1287 DiscoveryOut::UnroutablePeer(other) |
1288 DiscoveryOut::Discovered(other) => {
1289 let addr = swarms
1292 .iter()
1293 .find_map(|(s, a)| {
1294 if s.behaviour().local_peer_id == other {
1295 Some(a.clone())
1296 } else {
1297 None
1298 }
1299 })
1300 .unwrap();
1301 let protocol_names = if swarm_n % 2 == 0 {
1304 vec![kademlia_protocol_name(genesis_hash, fork_id)]
1305 } else {
1306 vec![
1307 legacy_kademlia_protocol_name(&protocol_id),
1308 kademlia_protocol_name(genesis_hash, fork_id),
1309 ]
1310 };
1311 swarms[swarm_n]
1312 .0
1313 .behaviour_mut()
1314 .add_self_reported_address(
1315 &other,
1316 protocol_names.as_slice(),
1317 addr,
1318 );
1319
1320 to_discover[swarm_n].remove(&other);
1321 },
1322 DiscoveryOut::RandomKademliaStarted => {},
1323 e => {
1324 panic!("Unexpected event: {:?}", e)
1325 },
1326 }
1327 },
1328 _ => {},
1330 }
1331 continue 'polling
1332 },
1333 _ => {},
1334 }
1335 }
1336 break
1337 }
1338
1339 if to_discover.iter().all(|l| l.is_empty()) {
1340 Poll::Ready(())
1341 } else {
1342 Poll::Pending
1343 }
1344 });
1345
1346 fut.await
1347 }
1348
1349 #[test]
1350 fn discovery_ignores_peers_with_unknown_protocols() {
1351 let supported_genesis_hash = H256::from_low_u64_be(1);
1352 let unsupported_genesis_hash = H256::from_low_u64_be(2);
1353 let supported_protocol_id = ProtocolId::from("a");
1354 let unsupported_protocol_id = ProtocolId::from("b");
1355
1356 let mut discovery = {
1357 let keypair = Keypair::generate_ed25519();
1358 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1359 config
1360 .allow_private_ip(true)
1361 .allow_non_globals_in_dht(true)
1362 .discovery_limit(50)
1363 .with_kademlia(supported_genesis_hash, None, &supported_protocol_id);
1364 config.finish()
1365 };
1366
1367 let predictable_peer_id = |bytes: &[u8; 32]| {
1368 Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id()
1369 };
1370
1371 let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001");
1372 let remote_addr: Multiaddr = "/memory/1".parse().unwrap();
1373 let another_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1374 let another_addr: Multiaddr = "/memory/2".parse().unwrap();
1375
1376 discovery.add_self_reported_address(
1378 &remote_peer_id,
1379 &[kademlia_protocol_name(unsupported_genesis_hash, None)],
1380 remote_addr.clone(),
1381 );
1382 discovery.add_self_reported_address(
1383 &another_peer_id,
1384 &[legacy_kademlia_protocol_name(&unsupported_protocol_id)],
1385 another_addr.clone(),
1386 );
1387
1388 {
1389 let kademlia = discovery.kademlia.as_mut().unwrap();
1390 assert!(
1391 kademlia
1392 .kbucket(remote_peer_id)
1393 .expect("Remote peer id not to be equal to local peer id.")
1394 .is_empty(),
1395 "Expect peer with unsupported protocol not to be added."
1396 );
1397 assert!(
1398 kademlia
1399 .kbucket(another_peer_id)
1400 .expect("Remote peer id not to be equal to local peer id.")
1401 .is_empty(),
1402 "Expect peer with unsupported protocol not to be added."
1403 );
1404 }
1405
1406 discovery.add_self_reported_address(
1408 &remote_peer_id,
1409 &[kademlia_protocol_name(supported_genesis_hash, None)],
1410 remote_addr.clone(),
1411 );
1412 {
1413 let kademlia = discovery.kademlia.as_mut().unwrap();
1414 assert!(
1415 !kademlia
1416 .kbucket(remote_peer_id)
1417 .expect("Remote peer id not to be equal to local peer id.")
1418 .is_empty(),
1419 "Expect peer with supported protocol to be added."
1420 );
1421 }
1422
1423 let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1424 let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();
1425
1426 {
1428 let kademlia = discovery.kademlia.as_mut().unwrap();
1429 assert!(
1430 kademlia
1431 .kbucket(unsupported_peer_id)
1432 .expect("Remote peer id not to be equal to local peer id.")
1433 .is_empty(),
1434 "Expect unsupported peer not to be added."
1435 );
1436 }
1437 discovery.add_self_reported_address(
1440 &unsupported_peer_id,
1441 &[legacy_kademlia_protocol_name(&supported_protocol_id)],
1442 unsupported_peer_addr.clone(),
1443 );
1444 {
1445 let kademlia = discovery.kademlia.as_mut().unwrap();
1446 assert!(
1447 kademlia
1448 .kbucket(unsupported_peer_id)
1449 .expect("Remote peer id not to be equal to local peer id.")
1450 .is_empty(),
1451 "Expect unsupported peer not to be added."
1452 );
1453 }
1454
1455 discovery.add_self_reported_address(
1457 &another_peer_id,
1458 &[
1459 legacy_kademlia_protocol_name(&supported_protocol_id),
1460 kademlia_protocol_name(supported_genesis_hash, None),
1461 ],
1462 another_addr.clone(),
1463 );
1464
1465 {
1466 let kademlia = discovery.kademlia.as_mut().unwrap();
1467 assert_eq!(
1468 2,
1469 kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
1470 "Expect peers with supported protocol to be added."
1471 );
1472 assert!(
1473 !kademlia
1474 .kbucket(another_peer_id)
1475 .expect("Remote peer id not to be equal to local peer id.")
1476 .is_empty(),
1477 "Expect peer with supported protocol to be added."
1478 );
1479 }
1480 }
1481}