1use crate::{config::ProtocolId, utils::LruHashSet};
50
51use array_bytes::bytes2hex;
52use futures::prelude::*;
53use futures_timer::Delay;
54use ip_network::IpNetwork;
55use libp2p::{
56 core::{transport::PortUse, Endpoint, Multiaddr},
57 kad::{
58 self,
59 store::{MemoryStore, RecordStore},
60 Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
61 Event, GetClosestPeersError, GetClosestPeersOk, GetProvidersError, GetProvidersOk,
62 GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record, RecordKey,
63 },
64 mdns::{self, tokio::Behaviour as TokioMdns},
65 multiaddr::Protocol,
66 swarm::{
67 behaviour::{
68 toggle::{Toggle, ToggleConnectionHandler},
69 DialFailure, ExternalAddrConfirmed, FromSwarm,
70 },
71 ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, THandler,
72 THandlerInEvent, THandlerOutEvent, ToSwarm,
73 },
74 PeerId,
75};
76use linked_hash_set::LinkedHashSet;
77use log::{debug, error, info, trace, warn};
78use sp_core::hexdisplay::HexDisplay;
79use std::{
80 cmp,
81 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
82 num::NonZeroUsize,
83 task::{Context, Poll},
84 time::{Duration, Instant},
85};
86
87const LOG_TARGET: &str = "sub-libp2p::discovery";
89
90const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
94
95pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
98
99const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
101
102const KAD_QUERY_TIMEOUT: Duration = Duration::from_secs(300);
105
106pub struct DiscoveryConfig {
112 local_peer_id: PeerId,
113 permanent_addresses: Vec<(PeerId, Multiaddr)>,
114 dht_random_walk: bool,
115 allow_private_ip: bool,
116 allow_non_globals_in_dht: bool,
117 discovery_only_if_under_num: u64,
118 enable_mdns: bool,
119 kademlia_disjoint_query_paths: bool,
120 kademlia_protocol: Option<StreamProtocol>,
121 kademlia_legacy_protocol: Option<StreamProtocol>,
122 kademlia_replication_factor: NonZeroUsize,
123}
124
125impl DiscoveryConfig {
126 pub fn new(local_peer_id: PeerId) -> Self {
128 Self {
129 local_peer_id,
130 permanent_addresses: Vec::new(),
131 dht_random_walk: true,
132 allow_private_ip: true,
133 allow_non_globals_in_dht: false,
134 discovery_only_if_under_num: std::u64::MAX,
135 enable_mdns: false,
136 kademlia_disjoint_query_paths: false,
137 kademlia_protocol: None,
138 kademlia_legacy_protocol: None,
139 kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
140 .expect("value is a constant; constant is non-zero; qed."),
141 }
142 }
143
144 pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
146 self.discovery_only_if_under_num = limit;
147 self
148 }
149
150 pub fn with_permanent_addresses<I>(&mut self, permanent_addresses: I) -> &mut Self
152 where
153 I: IntoIterator<Item = (PeerId, Multiaddr)>,
154 {
155 self.permanent_addresses.extend(permanent_addresses);
156 self
157 }
158
159 pub fn with_dht_random_walk(&mut self, value: bool) -> &mut Self {
162 self.dht_random_walk = value;
163 self
164 }
165
166 pub fn allow_private_ip(&mut self, value: bool) -> &mut Self {
168 self.allow_private_ip = value;
169 self
170 }
171
172 pub fn allow_non_globals_in_dht(&mut self, value: bool) -> &mut Self {
174 self.allow_non_globals_in_dht = value;
175 self
176 }
177
178 pub fn with_mdns(&mut self, value: bool) -> &mut Self {
180 self.enable_mdns = value;
181 self
182 }
183
184 pub fn with_kademlia<Hash: AsRef<[u8]>>(
189 &mut self,
190 genesis_hash: Hash,
191 fork_id: Option<&str>,
192 protocol_id: &ProtocolId,
193 ) -> &mut Self {
194 self.kademlia_protocol = Some(kademlia_protocol_name(genesis_hash, fork_id));
195 self.kademlia_legacy_protocol = Some(legacy_kademlia_protocol_name(protocol_id));
196 self
197 }
198
199 pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
202 self.kademlia_disjoint_query_paths = value;
203 self
204 }
205
206 pub fn with_kademlia_replication_factor(&mut self, value: NonZeroUsize) -> &mut Self {
208 self.kademlia_replication_factor = value;
209 self
210 }
211
212 pub fn finish(self) -> DiscoveryBehaviour {
214 let Self {
215 local_peer_id,
216 permanent_addresses,
217 dht_random_walk,
218 allow_private_ip,
219 allow_non_globals_in_dht,
220 discovery_only_if_under_num,
221 enable_mdns,
222 kademlia_disjoint_query_paths,
223 kademlia_protocol,
224 kademlia_legacy_protocol: _,
225 kademlia_replication_factor,
226 } = self;
227
228 let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
229 let mut config = KademliaConfig::new(kademlia_protocol.clone());
230
231 config.set_replication_factor(kademlia_replication_factor);
232
233 config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
234
235 config.set_query_timeout(KAD_QUERY_TIMEOUT);
236
237 config.set_kbucket_inserts(BucketInserts::Manual);
241 config.disjoint_query_paths(kademlia_disjoint_query_paths);
242 let store = MemoryStore::new(local_peer_id);
243 let mut kad = Kademlia::with_config(local_peer_id, store, config);
244 kad.set_mode(Some(kad::Mode::Server));
245
246 for (peer_id, addr) in &permanent_addresses {
247 kad.add_address(peer_id, addr.clone());
248 }
249
250 Some(kad)
251 } else {
252 None
253 };
254
255 DiscoveryBehaviour {
256 permanent_addresses,
257 ephemeral_addresses: HashMap::new(),
258 kademlia: Toggle::from(kademlia),
259 next_kad_random_query: if dht_random_walk {
260 Some(Delay::new(Duration::new(0, 0)))
261 } else {
262 None
263 },
264 duration_to_next_kad: Duration::from_secs(1),
265 pending_events: VecDeque::new(),
266 local_peer_id,
267 num_connections: 0,
268 allow_private_ip,
269 discovery_only_if_under_num,
270 mdns: if enable_mdns {
271 match TokioMdns::new(mdns::Config::default(), local_peer_id) {
272 Ok(mdns) => Toggle::from(Some(mdns)),
273 Err(err) => {
274 warn!(target: LOG_TARGET, "Failed to initialize mDNS: {:?}", err);
275 Toggle::from(None)
276 },
277 }
278 } else {
279 Toggle::from(None)
280 },
281 allow_non_globals_in_dht,
282 known_external_addresses: LruHashSet::new(
283 NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
284 .expect("value is a constant; constant is non-zero; qed."),
285 ),
286 records_to_publish: Default::default(),
287 kademlia_protocol,
288 provider_keys_requested: HashMap::new(),
289 }
290 }
291}
292
293pub struct DiscoveryBehaviour {
295 permanent_addresses: Vec<(PeerId, Multiaddr)>,
298 ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
301 kademlia: Toggle<Kademlia<MemoryStore>>,
304 mdns: Toggle<TokioMdns>,
306 next_kad_random_query: Option<Delay>,
309 duration_to_next_kad: Duration,
311 pending_events: VecDeque<DiscoveryOut>,
313 local_peer_id: PeerId,
315 num_connections: u64,
317 allow_private_ip: bool,
320 discovery_only_if_under_num: u64,
322 allow_non_globals_in_dht: bool,
324 known_external_addresses: LruHashSet<Multiaddr>,
326 records_to_publish: HashMap<QueryId, Record>,
332 kademlia_protocol: Option<StreamProtocol>,
337 provider_keys_requested: HashMap<QueryId, RecordKey>,
339}
340
341impl DiscoveryBehaviour {
342 pub fn known_peers(&mut self) -> HashSet<PeerId> {
344 let mut peers = HashSet::new();
345 if let Some(k) = self.kademlia.as_mut() {
346 for b in k.kbuckets() {
347 for e in b.iter() {
348 if !peers.contains(e.node.key.preimage()) {
349 peers.insert(*e.node.key.preimage());
350 }
351 }
352 }
353 }
354 peers
355 }
356
357 pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
363 let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
364 if addrs_list.contains(&addr) {
365 return
366 }
367
368 if let Some(k) = self.kademlia.as_mut() {
369 k.add_address(&peer_id, addr.clone());
370 }
371
372 self.pending_events.push_back(DiscoveryOut::Discovered(peer_id));
373 addrs_list.push(addr);
374 }
375
376 pub fn add_self_reported_address(
382 &mut self,
383 peer_id: &PeerId,
384 supported_protocols: &[StreamProtocol],
385 addr: Multiaddr,
386 ) {
387 if let Some(kademlia) = self.kademlia.as_mut() {
388 if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
389 trace!(
390 target: LOG_TARGET,
391 "Ignoring self-reported non-global address {} from {}.", addr, peer_id
392 );
393 return
394 }
395
396 if !supported_protocols.iter().any(|p| {
402 p == self
403 .kademlia_protocol
404 .as_ref()
405 .expect("kademlia protocol was checked above to be enabled; qed")
406 }) {
407 trace!(
408 target: LOG_TARGET,
409 "Ignoring self-reported address {} from {} as remote node is not part of the \
410 Kademlia DHT supported by the local node.", addr, peer_id,
411 );
412 return
413 }
414
415 trace!(
416 target: LOG_TARGET,
417 "Adding self-reported address {} from {} to Kademlia DHT.",
418 addr, peer_id
419 );
420 kademlia.add_address(peer_id, addr.clone());
421 }
422 }
423
424 pub fn find_closest_peers(&mut self, target: PeerId) {
428 if let Some(k) = self.kademlia.as_mut() {
429 k.get_closest_peers(target);
430 }
431 }
432
433 pub fn get_value(&mut self, key: RecordKey) {
437 if let Some(k) = self.kademlia.as_mut() {
438 k.get_record(key.clone());
439 }
440 }
441
442 pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
447 if let Some(k) = self.kademlia.as_mut() {
448 if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
449 warn!(target: LOG_TARGET, "Libp2p => Failed to put record: {:?}", e);
450 self.pending_events
451 .push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
452 }
453 }
454 }
455
456 pub fn put_record_to(
460 &mut self,
461 record: Record,
462 peers: HashSet<sc_network_types::PeerId>,
463 update_local_storage: bool,
464 ) {
465 if let Some(kad) = self.kademlia.as_mut() {
466 if update_local_storage {
467 if let Err(_e) = kad.store_mut().put(record.clone()) {
468 warn!(target: LOG_TARGET, "Failed to update local starage");
469 }
470 }
471
472 if !peers.is_empty() {
473 kad.put_record_to(
474 record,
475 peers.into_iter().map(|peer_id| peer_id.into()),
476 Quorum::All,
477 );
478 }
479 }
480 }
481
482 pub fn start_providing(&mut self, key: RecordKey) {
484 if let Some(kad) = self.kademlia.as_mut() {
485 if let Err(e) = kad.start_providing(key.clone()) {
486 warn!(target: LOG_TARGET, "Libp2p => Failed to start providing {key:?}: {e}.");
487 self.pending_events
488 .push_back(DiscoveryOut::StartProvidingFailed(key, Duration::from_secs(0)));
489 }
490 }
491 }
492
493 pub fn stop_providing(&mut self, key: &RecordKey) {
495 if let Some(kad) = self.kademlia.as_mut() {
496 kad.stop_providing(key);
497 }
498 }
499
500 pub fn get_providers(&mut self, key: RecordKey) {
502 if let Some(kad) = self.kademlia.as_mut() {
503 let query_id = kad.get_providers(key.clone());
504 self.provider_keys_requested.insert(query_id, key);
505 }
506 }
507
508 pub fn store_record(
510 &mut self,
511 record_key: RecordKey,
512 record_value: Vec<u8>,
513 publisher: Option<PeerId>,
514 expires: Option<Instant>,
515 ) {
516 if let Some(k) = self.kademlia.as_mut() {
517 if let Err(err) = k.store_mut().put(Record {
518 key: record_key,
519 value: record_value,
520 publisher: publisher.map(|publisher| publisher.into()),
521 expires,
522 }) {
523 debug!(
524 target: LOG_TARGET,
525 "Failed to store record with key: {:?}",
526 err
527 );
528 }
529 }
530 }
531
532 pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
537 self.kademlia.as_mut().map(|kad| {
538 kad.kbuckets()
539 .map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
540 .collect()
541 })
542 }
543
544 pub fn num_kademlia_records(&mut self) -> Option<usize> {
546 self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
548 }
549
550 pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
552 self.kademlia
555 .as_mut()
556 .map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
557 }
558
559 pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
566 let ip = match addr.iter().next() {
567 Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
568 Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
569 Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) =>
570 return true,
571 _ => return false,
572 };
573 ip.is_global()
574 }
575}
576
577#[derive(Debug)]
579pub enum DiscoveryOut {
580 Discovered(PeerId),
583
584 UnroutablePeer(PeerId),
591
592 ClosestPeersFound(PeerId, Vec<(PeerId, Vec<Multiaddr>)>, Duration),
596
597 ClosestPeersNotFound(PeerId, Duration),
599
600 ValueFound(PeerRecord, Duration),
604
605 PutRecordRequest(
607 RecordKey,
608 Vec<u8>,
609 Option<sc_network_types::PeerId>,
610 Option<std::time::Instant>,
611 ),
612
613 ValueNotFound(RecordKey, Duration),
617
618 ValuePut(RecordKey, Duration),
622
623 ValuePutFailed(RecordKey, Duration),
627
628 StartedProviding(RecordKey, Duration),
630
631 StartProvidingFailed(RecordKey, Duration),
633
634 ProvidersFound(RecordKey, HashSet<PeerId>, Duration),
636
637 NoMoreProviders(RecordKey, Duration),
639
640 ProvidersNotFound(RecordKey, Duration),
642
643 RandomKademliaStarted,
647}
648
649impl NetworkBehaviour for DiscoveryBehaviour {
650 type ConnectionHandler =
651 ToggleConnectionHandler<<Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler>;
652 type ToSwarm = DiscoveryOut;
653
654 fn handle_established_inbound_connection(
655 &mut self,
656 connection_id: ConnectionId,
657 peer: PeerId,
658 local_addr: &Multiaddr,
659 remote_addr: &Multiaddr,
660 ) -> Result<THandler<Self>, ConnectionDenied> {
661 self.kademlia.handle_established_inbound_connection(
662 connection_id,
663 peer,
664 local_addr,
665 remote_addr,
666 )
667 }
668
669 fn handle_established_outbound_connection(
670 &mut self,
671 connection_id: ConnectionId,
672 peer: PeerId,
673 addr: &Multiaddr,
674 role_override: Endpoint,
675 port_use: PortUse,
676 ) -> Result<THandler<Self>, ConnectionDenied> {
677 self.kademlia.handle_established_outbound_connection(
678 connection_id,
679 peer,
680 addr,
681 role_override,
682 port_use,
683 )
684 }
685
686 fn handle_pending_inbound_connection(
687 &mut self,
688 connection_id: ConnectionId,
689 local_addr: &Multiaddr,
690 remote_addr: &Multiaddr,
691 ) -> Result<(), ConnectionDenied> {
692 self.kademlia
693 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
694 }
695
696 fn handle_pending_outbound_connection(
697 &mut self,
698 connection_id: ConnectionId,
699 maybe_peer: Option<PeerId>,
700 addresses: &[Multiaddr],
701 effective_role: Endpoint,
702 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
703 let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
704
705 let mut list: LinkedHashSet<_> = self
710 .permanent_addresses
711 .iter()
712 .filter_map(|(p, a)| (*p == peer_id).then(|| a.clone()))
713 .collect();
714
715 if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
716 ephemeral_addresses.iter().for_each(|address| {
717 list.insert_if_absent(address.clone());
718 });
719 }
720
721 {
722 let mut list_to_filter = self.kademlia.handle_pending_outbound_connection(
723 connection_id,
724 maybe_peer,
725 addresses,
726 effective_role,
727 )?;
728
729 list_to_filter.extend(self.mdns.handle_pending_outbound_connection(
730 connection_id,
731 maybe_peer,
732 addresses,
733 effective_role,
734 )?);
735
736 if !self.allow_private_ip {
737 list_to_filter.retain(|addr| match addr.iter().next() {
738 Some(Protocol::Ip4(addr)) if !IpNetwork::from(addr).is_global() => false,
739 Some(Protocol::Ip6(addr)) if !IpNetwork::from(addr).is_global() => false,
740 _ => true,
741 });
742 }
743
744 list_to_filter.into_iter().for_each(|address| {
745 list.insert_if_absent(address);
746 });
747 }
748
749 trace!(target: LOG_TARGET, "Addresses of {:?}: {:?}", peer_id, list);
750
751 Ok(list.into_iter().collect())
752 }
753
754 fn on_swarm_event(&mut self, event: FromSwarm) {
755 match event {
756 FromSwarm::ConnectionEstablished(e) => {
757 self.num_connections += 1;
758 self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
759 },
760 FromSwarm::ConnectionClosed(e) => {
761 self.num_connections -= 1;
762 self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
763 },
764 FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
765 if let Some(peer_id) = peer_id {
766 if let DialError::Transport(errors) = error {
767 if let Entry::Occupied(mut entry) = self.ephemeral_addresses.entry(peer_id)
768 {
769 for (addr, _error) in errors {
770 entry.get_mut().retain(|a| a != addr);
771 }
772 if entry.get().is_empty() {
773 entry.remove();
774 }
775 }
776 }
777 }
778
779 self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
780 },
781 FromSwarm::ListenerClosed(e) => {
782 self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
783 },
784 FromSwarm::ListenFailure(e) => {
785 self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
786 },
787 FromSwarm::ListenerError(e) => {
788 self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
789 },
790 FromSwarm::ExternalAddrExpired(e) => {
791 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
795 },
796 FromSwarm::NewListener(e) => {
797 self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
798 },
799 FromSwarm::ExpiredListenAddr(e) => {
800 self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
801 },
802 FromSwarm::NewExternalAddrCandidate(e) => {
803 self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
804 },
805 FromSwarm::AddressChange(e) => {
806 self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
807 },
808 FromSwarm::NewListenAddr(e) => {
809 self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
810 self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
811 },
812 FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
813 let mut address = addr.clone();
814
815 if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
816 if peer_id != self.local_peer_id {
817 warn!(
818 target: LOG_TARGET,
819 "🔍 Discovered external address for a peer that is not us: {addr}",
820 );
821 return
823 }
824 } else {
825 address.push(Protocol::P2p(self.local_peer_id));
826 }
827
828 if Self::can_add_to_dht(&address) {
829 if self.known_external_addresses.insert(address.clone()) {
832 info!(
833 target: LOG_TARGET,
834 "🔍 Discovered new external address for our node: {address}",
835 );
836 }
837 }
838
839 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
840 },
841 FromSwarm::NewExternalAddrOfPeer(e) => {
842 self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
843 self.mdns.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
844 },
845 event => {
846 debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
847 self.kademlia.on_swarm_event(event);
848 self.mdns.on_swarm_event(event);
849 },
850 }
851 }
852
853 fn on_connection_handler_event(
854 &mut self,
855 peer_id: PeerId,
856 connection_id: ConnectionId,
857 event: THandlerOutEvent<Self>,
858 ) {
859 self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
860 }
861
862 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
863 if let Some(ev) = self.pending_events.pop_front() {
865 return Poll::Ready(ToSwarm::GenerateEvent(ev))
866 }
867
868 if let Some(kademlia) = self.kademlia.as_mut() {
870 if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
871 while next_kad_random_query.poll_unpin(cx).is_ready() {
872 let actually_started =
873 if self.num_connections < self.discovery_only_if_under_num {
874 let random_peer_id = PeerId::random();
875 debug!(
876 target: LOG_TARGET,
877 "Libp2p <= Starting random Kademlia request for {:?}",
878 random_peer_id,
879 );
880 kademlia.get_closest_peers(random_peer_id);
881 true
882 } else {
883 debug!(
884 target: LOG_TARGET,
885 "Kademlia paused due to high number of connections ({})",
886 self.num_connections
887 );
888 false
889 };
890
891 *next_kad_random_query = Delay::new(self.duration_to_next_kad);
894 self.duration_to_next_kad =
895 cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
896
897 if actually_started {
898 let ev = DiscoveryOut::RandomKademliaStarted;
899 return Poll::Ready(ToSwarm::GenerateEvent(ev))
900 }
901 }
902 }
903 }
904
905 while let Poll::Ready(ev) = self.kademlia.poll(cx) {
906 match ev {
907 ToSwarm::GenerateEvent(ev) => match ev {
908 KademliaEvent::RoutingUpdated { peer, .. } => {
909 let ev = DiscoveryOut::Discovered(peer);
910 return Poll::Ready(ToSwarm::GenerateEvent(ev))
911 },
912 KademliaEvent::UnroutablePeer { peer, .. } => {
913 let ev = DiscoveryOut::UnroutablePeer(peer);
914 return Poll::Ready(ToSwarm::GenerateEvent(ev))
915 },
916 KademliaEvent::RoutablePeer { .. } => {
917 },
920 KademliaEvent::PendingRoutablePeer { .. } => {
921 },
923 KademliaEvent::InboundRequest { request } => match request {
924 libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } =>
925 return Poll::Ready(ToSwarm::GenerateEvent(
926 DiscoveryOut::PutRecordRequest(
927 record.key,
928 record.value,
929 record.publisher.map(Into::into),
930 record.expires,
931 ),
932 )),
933 _ => {},
934 },
935 KademliaEvent::OutboundQueryProgressed {
936 result: QueryResult::GetClosestPeers(res),
937 stats,
938 ..
939 } => {
940 let (key, peers, timeout) = match res {
941 Ok(GetClosestPeersOk { key, peers }) => (key, peers, false),
942 Err(GetClosestPeersError::Timeout { key, peers }) => (key, peers, true),
943 };
944
945 let target = match PeerId::from_bytes(&key.clone()) {
946 Ok(peer_id) => peer_id,
947 Err(_) => {
948 warn!(
949 target: LOG_TARGET,
950 "Libp2p => FIND_NODE query finished for target that is not \
951 a peer ID: {:?}",
952 HexDisplay::from(&key),
953 );
954 continue
955 },
956 };
957
958 if timeout {
959 debug!(
960 target: LOG_TARGET,
961 "Libp2p => Query for target {target:?} timed out and yielded {} peers",
962 peers.len(),
963 );
964 } else {
965 debug!(
966 target: LOG_TARGET,
967 "Libp2p => Query for target {target:?} yielded {} peers",
968 peers.len(),
969 );
970 }
971
972 let ev = if peers.is_empty() {
973 DiscoveryOut::ClosestPeersNotFound(
974 target,
975 stats.duration().unwrap_or_default(),
976 )
977 } else {
978 DiscoveryOut::ClosestPeersFound(
979 target,
980 peers.into_iter().map(|p| (p.peer_id, p.addrs)).collect(),
981 stats.duration().unwrap_or_default(),
982 )
983 };
984
985 return Poll::Ready(ToSwarm::GenerateEvent(ev))
986 },
987 KademliaEvent::OutboundQueryProgressed {
988 result: QueryResult::GetRecord(res),
989 stats,
990 id,
991 ..
992 } => {
993 let ev = match res {
994 Ok(GetRecordOk::FoundRecord(r)) => {
995 debug!(
996 target: LOG_TARGET,
997 "Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
998 r.record.key,
999 r.record.value,
1000 id,
1001 stats,
1002 );
1003
1004 if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
1011 if let Some(kad) = self.kademlia.as_mut() {
1012 if let Some(mut query) = kad.query_mut(&id) {
1013 query.finish();
1014 }
1015 }
1016 }
1017
1018 self.records_to_publish.insert(id, r.record.clone());
1021
1022 DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
1023 },
1024 Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
1025 cache_candidates,
1026 }) => {
1027 debug!(
1028 target: LOG_TARGET,
1029 "Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
1030 id,
1031 stats,
1032 stats.duration().map(|val| val.as_millis())
1033 );
1034 if let Some(record) = self.records_to_publish.remove(&id) {
1036 if cache_candidates.is_empty() {
1037 continue
1038 }
1039
1040 if let Some(kad) = self.kademlia.as_mut() {
1043 kad.put_record_to(
1044 record,
1045 cache_candidates.into_iter().map(|v| v.1),
1046 Quorum::One,
1047 );
1048 }
1049 }
1050
1051 continue
1052 },
1053 Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
1054 trace!(
1055 target: LOG_TARGET,
1056 "Libp2p => Failed to get record: {:?}",
1057 e,
1058 );
1059 DiscoveryOut::ValueNotFound(
1060 e.into_key(),
1061 stats.duration().unwrap_or_default(),
1062 )
1063 },
1064 Err(e) => {
1065 debug!(
1066 target: LOG_TARGET,
1067 "Libp2p => Failed to get record: {:?}",
1068 e,
1069 );
1070 DiscoveryOut::ValueNotFound(
1071 e.into_key(),
1072 stats.duration().unwrap_or_default(),
1073 )
1074 },
1075 };
1076 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1077 },
1078 KademliaEvent::OutboundQueryProgressed {
1079 result: QueryResult::GetProviders(res),
1080 stats,
1081 id,
1082 ..
1083 } => {
1084 let ev = match res {
1085 Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1086 debug!(
1087 target: LOG_TARGET,
1088 "Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}",
1089 providers,
1090 key,
1091 id,
1092 stats,
1093 );
1094
1095 DiscoveryOut::ProvidersFound(
1096 key,
1097 providers,
1098 stats.duration().unwrap_or_default(),
1099 )
1100 },
1101 Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
1102 closest_peers: _,
1103 }) => {
1104 debug!(
1105 target: LOG_TARGET,
1106 "Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms",
1107 id,
1108 stats,
1109 stats.duration().map(|val| val.as_millis())
1110 );
1111
1112 if let Some(key) = self.provider_keys_requested.remove(&id) {
1113 DiscoveryOut::NoMoreProviders(
1114 key,
1115 stats.duration().unwrap_or_default(),
1116 )
1117 } else {
1118 error!(
1119 target: LOG_TARGET,
1120 "No key found for `GET_PROVIDERS` query {id:?}. This is a bug.",
1121 );
1122 continue
1123 }
1124 },
1125 Err(GetProvidersError::Timeout { key, closest_peers: _ }) => {
1126 debug!(
1127 target: LOG_TARGET,
1128 "Libp2p => Failed to get providers for {key:?} due to timeout.",
1129 );
1130
1131 self.provider_keys_requested.remove(&id);
1132
1133 DiscoveryOut::ProvidersNotFound(
1134 key,
1135 stats.duration().unwrap_or_default(),
1136 )
1137 },
1138 };
1139 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1140 },
1141 KademliaEvent::OutboundQueryProgressed {
1142 result: QueryResult::PutRecord(res),
1143 stats,
1144 ..
1145 } => {
1146 let ev = match res {
1147 Ok(ok) => {
1148 trace!(
1149 target: LOG_TARGET,
1150 "Libp2p => Put record for key: {:?}",
1151 ok.key,
1152 );
1153 DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default())
1154 },
1155 Err(e) => {
1156 debug!(
1157 target: LOG_TARGET,
1158 "Libp2p => Failed to put record for key {:?}: {:?}",
1159 e.key(),
1160 e,
1161 );
1162 DiscoveryOut::ValuePutFailed(
1163 e.into_key(),
1164 stats.duration().unwrap_or_default(),
1165 )
1166 },
1167 };
1168 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1169 },
1170 KademliaEvent::OutboundQueryProgressed {
1171 result: QueryResult::RepublishRecord(res),
1172 ..
1173 } => match res {
1174 Ok(ok) => debug!(
1175 target: LOG_TARGET,
1176 "Libp2p => Record republished: {:?}",
1177 ok.key,
1178 ),
1179 Err(e) => debug!(
1180 target: LOG_TARGET,
1181 "Libp2p => Republishing of record {:?} failed with: {:?}",
1182 e.key(), e,
1183 ),
1184 },
1185 KademliaEvent::OutboundQueryProgressed {
1186 result: QueryResult::StartProviding(res),
1187 stats,
1188 ..
1189 } => {
1190 let ev = match res {
1191 Ok(ok) => {
1192 trace!(
1193 target: LOG_TARGET,
1194 "Libp2p => Started providing key {:?}",
1195 ok.key,
1196 );
1197 DiscoveryOut::StartedProviding(
1198 ok.key,
1199 stats.duration().unwrap_or_default(),
1200 )
1201 },
1202 Err(e) => {
1203 debug!(
1204 target: LOG_TARGET,
1205 "Libp2p => Failed to start providing key {:?}: {:?}",
1206 e.key(),
1207 e,
1208 );
1209 DiscoveryOut::StartProvidingFailed(
1210 e.into_key(),
1211 stats.duration().unwrap_or_default(),
1212 )
1213 },
1214 };
1215 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1216 },
1217 KademliaEvent::OutboundQueryProgressed {
1218 result: QueryResult::Bootstrap(res),
1219 ..
1220 } => match res {
1221 Ok(ok) => debug!(
1222 target: LOG_TARGET,
1223 "Libp2p => DHT bootstrap progressed: {ok:?}",
1224 ),
1225 Err(e) => warn!(
1226 target: LOG_TARGET,
1227 "Libp2p => DHT bootstrap error: {e:?}",
1228 ),
1229 },
1230 KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
1232 warn!(target: LOG_TARGET, "Libp2p => Unhandled Kademlia event: {:?}", e)
1233 },
1234 Event::ModeChanged { new_mode } => {
1235 debug!(target: LOG_TARGET, "Libp2p => Kademlia mode changed: {new_mode}")
1236 },
1237 },
1238 ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
1239 event => {
1240 return Poll::Ready(event.map_out(|_| {
1241 unreachable!("`GenerateEvent` is handled in a branch above; qed")
1242 }));
1243 },
1244 }
1245 }
1246
1247 while let Poll::Ready(ev) = self.mdns.poll(cx) {
1249 match ev {
1250 ToSwarm::GenerateEvent(event) => match event {
1251 mdns::Event::Discovered(list) => {
1252 if self.num_connections >= self.discovery_only_if_under_num {
1253 continue
1254 }
1255
1256 self.pending_events.extend(
1257 list.into_iter().map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)),
1258 );
1259 if let Some(ev) = self.pending_events.pop_front() {
1260 return Poll::Ready(ToSwarm::GenerateEvent(ev))
1261 }
1262 },
1263 mdns::Event::Expired(_) => {},
1264 },
1265 ToSwarm::Dial { .. } => {
1266 unreachable!("mDNS never dials!");
1267 },
1268 ToSwarm::NotifyHandler { event, .. } => match event {},
1270 event => {
1271 return Poll::Ready(
1272 event
1273 .map_in(|_| {
1274 unreachable!("`NotifyHandler` is handled in a branch above; qed")
1275 })
1276 .map_out(|_| {
1277 unreachable!("`GenerateEvent` is handled in a branch above; qed")
1278 }),
1279 );
1280 },
1281 }
1282 }
1283
1284 Poll::Pending
1285 }
1286}
1287
1288fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol {
1290 let name = format!("/{}/kad", id.as_ref());
1291 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1292}
1293
1294fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
1296 genesis_hash: Hash,
1297 fork_id: Option<&str>,
1298) -> StreamProtocol {
1299 let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
1300 let name = if let Some(fork_id) = fork_id {
1301 format!("/{genesis_hash_hex}/{fork_id}/kad")
1302 } else {
1303 format!("/{genesis_hash_hex}/kad")
1304 };
1305
1306 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1307}
1308
1309#[cfg(test)]
1310mod tests {
1311 use super::{kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig};
1312 use crate::config::ProtocolId;
1313 use libp2p::{identity::Keypair, Multiaddr};
1314 use sp_core::hash::H256;
1315
1316 #[cfg(ignore_flaky_test)] #[tokio::test]
1318 async fn discovery_working() {
1319 use super::DiscoveryOut;
1320 use futures::prelude::*;
1321 use libp2p::{
1322 core::{
1323 transport::{MemoryTransport, Transport},
1324 upgrade,
1325 },
1326 noise,
1327 swarm::{Swarm, SwarmEvent},
1328 yamux,
1329 };
1330 use std::{collections::HashSet, task::Poll, time::Duration};
1331 let mut first_swarm_peer_id_and_addr = None;
1332
1333 let genesis_hash = H256::from_low_u64_be(1);
1334 let fork_id = Some("test-fork-id");
1335 let protocol_id = ProtocolId::from("dot");
1336
1337 let mut swarms = (0..25)
1340 .map(|i| {
1341 let mut swarm = libp2p::SwarmBuilder::with_new_identity()
1342 .with_tokio()
1343 .with_other_transport(|keypair| {
1344 MemoryTransport::new()
1345 .upgrade(upgrade::Version::V1)
1346 .authenticate(noise::Config::new(&keypair).unwrap())
1347 .multiplex(yamux::Config::default())
1348 .boxed()
1349 })
1350 .unwrap()
1351 .with_behaviour(|keypair| {
1352 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1353 config
1354 .with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
1355 .allow_private_ip(true)
1356 .allow_non_globals_in_dht(true)
1357 .discovery_limit(50)
1358 .with_kademlia(genesis_hash, fork_id, &protocol_id);
1359
1360 config.finish()
1361 })
1362 .unwrap()
1363 .with_swarm_config(|config| {
1364 config.with_idle_connection_timeout(Duration::from_secs(10))
1366 })
1367 .build();
1368
1369 let listen_addr: Multiaddr =
1370 format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1371
1372 if i == 0 {
1373 first_swarm_peer_id_and_addr =
1374 Some((*swarm.local_peer_id(), listen_addr.clone()))
1375 }
1376
1377 swarm.listen_on(listen_addr.clone()).unwrap();
1378 (swarm, listen_addr)
1379 })
1380 .collect::<Vec<_>>();
1381
1382 let mut to_discover = (0..swarms.len())
1384 .map(|n| {
1385 (0..swarms.len())
1386 .skip(1)
1388 .filter(|p| *p != n)
1389 .map(|p| *Swarm::local_peer_id(&swarms[p].0))
1390 .collect::<HashSet<_>>()
1391 })
1392 .collect::<Vec<_>>();
1393
1394 let fut = futures::future::poll_fn(move |cx| {
1395 'polling: loop {
1396 for swarm_n in 0..swarms.len() {
1397 match swarms[swarm_n].0.poll_next_unpin(cx) {
1398 Poll::Ready(Some(e)) => {
1399 match e {
1400 SwarmEvent::Behaviour(behavior) => {
1401 match behavior {
1402 DiscoveryOut::UnroutablePeer(other) |
1403 DiscoveryOut::Discovered(other) => {
1404 let addr = swarms
1407 .iter()
1408 .find_map(|(s, a)| {
1409 if s.behaviour().local_peer_id == other {
1410 Some(a.clone())
1411 } else {
1412 None
1413 }
1414 })
1415 .unwrap();
1416 let protocol_names = if swarm_n % 2 == 0 {
1419 vec![kademlia_protocol_name(genesis_hash, fork_id)]
1420 } else {
1421 vec![
1422 legacy_kademlia_protocol_name(&protocol_id),
1423 kademlia_protocol_name(genesis_hash, fork_id),
1424 ]
1425 };
1426 swarms[swarm_n]
1427 .0
1428 .behaviour_mut()
1429 .add_self_reported_address(
1430 &other,
1431 protocol_names.as_slice(),
1432 addr,
1433 );
1434
1435 to_discover[swarm_n].remove(&other);
1436 },
1437 DiscoveryOut::RandomKademliaStarted => {},
1438 DiscoveryOut::ClosestPeersFound(..) => {},
1439 DiscoveryOut::ClosestPeersNotFound(..) => {},
1442 e => {
1443 panic!("Unexpected event: {:?}", e)
1444 },
1445 }
1446 },
1447 _ => {},
1449 }
1450 continue 'polling
1451 },
1452 _ => {},
1453 }
1454 }
1455 break
1456 }
1457
1458 if to_discover.iter().all(|l| l.is_empty()) {
1459 Poll::Ready(())
1460 } else {
1461 Poll::Pending
1462 }
1463 });
1464
1465 fut.await
1466 }
1467
1468 #[test]
1469 fn discovery_ignores_peers_with_unknown_protocols() {
1470 let supported_genesis_hash = H256::from_low_u64_be(1);
1471 let unsupported_genesis_hash = H256::from_low_u64_be(2);
1472 let supported_protocol_id = ProtocolId::from("a");
1473 let unsupported_protocol_id = ProtocolId::from("b");
1474
1475 let mut discovery = {
1476 let keypair = Keypair::generate_ed25519();
1477 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1478 config
1479 .allow_private_ip(true)
1480 .allow_non_globals_in_dht(true)
1481 .discovery_limit(50)
1482 .with_kademlia(supported_genesis_hash, None, &supported_protocol_id);
1483 config.finish()
1484 };
1485
1486 let predictable_peer_id = |bytes: &[u8; 32]| {
1487 Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id()
1488 };
1489
1490 let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001");
1491 let remote_addr: Multiaddr = "/memory/1".parse().unwrap();
1492 let another_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1493 let another_addr: Multiaddr = "/memory/2".parse().unwrap();
1494
1495 discovery.add_self_reported_address(
1497 &remote_peer_id,
1498 &[kademlia_protocol_name(unsupported_genesis_hash, None)],
1499 remote_addr.clone(),
1500 );
1501 discovery.add_self_reported_address(
1502 &another_peer_id,
1503 &[legacy_kademlia_protocol_name(&unsupported_protocol_id)],
1504 another_addr.clone(),
1505 );
1506
1507 {
1508 let kademlia = discovery.kademlia.as_mut().unwrap();
1509 assert!(
1510 kademlia
1511 .kbucket(remote_peer_id)
1512 .expect("Remote peer id not to be equal to local peer id.")
1513 .is_empty(),
1514 "Expect peer with unsupported protocol not to be added."
1515 );
1516 assert!(
1517 kademlia
1518 .kbucket(another_peer_id)
1519 .expect("Remote peer id not to be equal to local peer id.")
1520 .is_empty(),
1521 "Expect peer with unsupported protocol not to be added."
1522 );
1523 }
1524
1525 discovery.add_self_reported_address(
1527 &remote_peer_id,
1528 &[kademlia_protocol_name(supported_genesis_hash, None)],
1529 remote_addr.clone(),
1530 );
1531 {
1532 let kademlia = discovery.kademlia.as_mut().unwrap();
1533 assert!(
1534 !kademlia
1535 .kbucket(remote_peer_id)
1536 .expect("Remote peer id not to be equal to local peer id.")
1537 .is_empty(),
1538 "Expect peer with supported protocol to be added."
1539 );
1540 }
1541
1542 let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1543 let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();
1544
1545 {
1547 let kademlia = discovery.kademlia.as_mut().unwrap();
1548 assert!(
1549 kademlia
1550 .kbucket(unsupported_peer_id)
1551 .expect("Remote peer id not to be equal to local peer id.")
1552 .is_empty(),
1553 "Expect unsupported peer not to be added."
1554 );
1555 }
1556 discovery.add_self_reported_address(
1559 &unsupported_peer_id,
1560 &[legacy_kademlia_protocol_name(&supported_protocol_id)],
1561 unsupported_peer_addr.clone(),
1562 );
1563 {
1564 let kademlia = discovery.kademlia.as_mut().unwrap();
1565 assert!(
1566 kademlia
1567 .kbucket(unsupported_peer_id)
1568 .expect("Remote peer id not to be equal to local peer id.")
1569 .is_empty(),
1570 "Expect unsupported peer not to be added."
1571 );
1572 }
1573
1574 discovery.add_self_reported_address(
1576 &another_peer_id,
1577 &[
1578 legacy_kademlia_protocol_name(&supported_protocol_id),
1579 kademlia_protocol_name(supported_genesis_hash, None),
1580 ],
1581 another_addr.clone(),
1582 );
1583
1584 {
1585 let kademlia = discovery.kademlia.as_mut().unwrap();
1586 assert_eq!(
1587 2,
1588 kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
1589 "Expect peers with supported protocol to be added."
1590 );
1591 assert!(
1592 !kademlia
1593 .kbucket(another_peer_id)
1594 .expect("Remote peer id not to be equal to local peer id.")
1595 .is_empty(),
1596 "Expect peer with supported protocol to be added."
1597 );
1598 }
1599 }
1600}