1use crate::{
38 config::{
39 ProtocolId, KADEMLIA_MAX_PROVIDER_KEYS, KADEMLIA_PROVIDER_RECORD_TTL,
40 KADEMLIA_PROVIDER_REPUBLISH_INTERVAL,
41 },
42 utils::LruHashSet,
43};
44
45use array_bytes::bytes2hex;
46use futures::prelude::*;
47use futures_timer::Delay;
48use ip_network::IpNetwork;
49use libp2p::{
50 core::{transport::PortUse, Endpoint, Multiaddr},
51 kad::{
52 self,
53 store::{MemoryStore, MemoryStoreConfig, RecordStore},
54 Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
55 Event, GetClosestPeersError, GetClosestPeersOk, GetProvidersError, GetProvidersOk,
56 GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record, RecordKey,
57 },
58 mdns::{self, tokio::Behaviour as TokioMdns},
59 multiaddr::Protocol,
60 swarm::{
61 behaviour::{
62 toggle::{Toggle, ToggleConnectionHandler},
63 DialFailure, ExternalAddrConfirmed, FromSwarm,
64 },
65 ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, StreamProtocol, THandler,
66 THandlerInEvent, THandlerOutEvent, ToSwarm,
67 },
68 PeerId,
69};
70use linked_hash_set::LinkedHashSet;
71use log::{debug, error, info, trace, warn};
72use std::{
73 cmp,
74 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
75 num::NonZeroUsize,
76 task::{Context, Poll},
77 time::{Duration, Instant},
78};
79use subsoil::core::hexdisplay::HexDisplay;
80
81const LOG_TARGET: &str = "sub-libp2p::discovery";
83
84const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
88
89pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
92
93const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
95
96const KAD_QUERY_TIMEOUT: Duration = Duration::from_secs(300);
99
100pub struct DiscoveryConfig {
106 local_peer_id: PeerId,
107 permanent_addresses: Vec<(PeerId, Multiaddr)>,
108 dht_random_walk: bool,
109 allow_private_ip: bool,
110 allow_non_globals_in_dht: bool,
111 discovery_only_if_under_num: u64,
112 enable_mdns: bool,
113 kademlia_disjoint_query_paths: bool,
114 kademlia_protocol: Option<StreamProtocol>,
115 kademlia_legacy_protocol: Option<StreamProtocol>,
116 kademlia_replication_factor: NonZeroUsize,
117}
118
119impl DiscoveryConfig {
120 pub fn new(local_peer_id: PeerId) -> Self {
122 Self {
123 local_peer_id,
124 permanent_addresses: Vec::new(),
125 dht_random_walk: true,
126 allow_private_ip: true,
127 allow_non_globals_in_dht: false,
128 discovery_only_if_under_num: std::u64::MAX,
129 enable_mdns: false,
130 kademlia_disjoint_query_paths: false,
131 kademlia_protocol: None,
132 kademlia_legacy_protocol: None,
133 kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
134 .expect("value is a constant; constant is non-zero; qed."),
135 }
136 }
137
138 pub fn discovery_limit(&mut self, limit: u64) -> &mut Self {
140 self.discovery_only_if_under_num = limit;
141 self
142 }
143
144 pub fn with_permanent_addresses<I>(&mut self, permanent_addresses: I) -> &mut Self
146 where
147 I: IntoIterator<Item = (PeerId, Multiaddr)>,
148 {
149 self.permanent_addresses.extend(permanent_addresses);
150 self
151 }
152
153 pub fn with_dht_random_walk(&mut self, value: bool) -> &mut Self {
156 self.dht_random_walk = value;
157 self
158 }
159
160 pub fn allow_private_ip(&mut self, value: bool) -> &mut Self {
162 self.allow_private_ip = value;
163 self
164 }
165
166 pub fn allow_non_globals_in_dht(&mut self, value: bool) -> &mut Self {
168 self.allow_non_globals_in_dht = value;
169 self
170 }
171
172 pub fn with_mdns(&mut self, value: bool) -> &mut Self {
174 self.enable_mdns = value;
175 self
176 }
177
178 pub fn with_kademlia<Hash: AsRef<[u8]>>(
183 &mut self,
184 genesis_hash: Hash,
185 fork_id: Option<&str>,
186 protocol_id: &ProtocolId,
187 ) -> &mut Self {
188 self.kademlia_protocol = Some(kademlia_protocol_name(genesis_hash, fork_id));
189 self.kademlia_legacy_protocol = Some(legacy_kademlia_protocol_name(protocol_id));
190 self
191 }
192
193 pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
196 self.kademlia_disjoint_query_paths = value;
197 self
198 }
199
200 pub fn with_kademlia_replication_factor(&mut self, value: NonZeroUsize) -> &mut Self {
202 self.kademlia_replication_factor = value;
203 self
204 }
205
206 pub fn finish(self) -> DiscoveryBehaviour {
208 let Self {
209 local_peer_id,
210 permanent_addresses,
211 dht_random_walk,
212 allow_private_ip,
213 allow_non_globals_in_dht,
214 discovery_only_if_under_num,
215 enable_mdns,
216 kademlia_disjoint_query_paths,
217 kademlia_protocol,
218 kademlia_legacy_protocol: _,
219 kademlia_replication_factor,
220 } = self;
221
222 let kademlia = if let Some(ref kademlia_protocol) = kademlia_protocol {
223 let mut config = KademliaConfig::new(kademlia_protocol.clone());
224
225 config.set_replication_factor(kademlia_replication_factor);
226
227 config.set_record_filtering(libp2p::kad::StoreInserts::FilterBoth);
228
229 config.set_query_timeout(KAD_QUERY_TIMEOUT);
230
231 config.set_kbucket_inserts(BucketInserts::Manual);
235 config.disjoint_query_paths(kademlia_disjoint_query_paths);
236
237 config.set_provider_record_ttl(Some(KADEMLIA_PROVIDER_RECORD_TTL));
238 config.set_provider_publication_interval(Some(KADEMLIA_PROVIDER_REPUBLISH_INTERVAL));
239
240 let store = MemoryStore::with_config(
241 local_peer_id,
242 MemoryStoreConfig {
243 max_provided_keys: KADEMLIA_MAX_PROVIDER_KEYS,
244 ..Default::default()
245 },
246 );
247
248 let mut kad = Kademlia::with_config(local_peer_id, store, config);
249 kad.set_mode(Some(kad::Mode::Server));
250
251 for (peer_id, addr) in &permanent_addresses {
252 kad.add_address(peer_id, addr.clone());
253 }
254
255 Some(kad)
256 } else {
257 None
258 };
259
260 DiscoveryBehaviour {
261 permanent_addresses,
262 ephemeral_addresses: HashMap::new(),
263 kademlia: Toggle::from(kademlia),
264 next_kad_random_query: if dht_random_walk {
265 Some(Delay::new(Duration::new(0, 0)))
266 } else {
267 None
268 },
269 duration_to_next_kad: Duration::from_secs(1),
270 pending_events: VecDeque::new(),
271 local_peer_id,
272 num_connections: 0,
273 allow_private_ip,
274 discovery_only_if_under_num,
275 mdns: if enable_mdns {
276 match TokioMdns::new(mdns::Config::default(), local_peer_id) {
277 Ok(mdns) => Toggle::from(Some(mdns)),
278 Err(err) => {
279 warn!(target: LOG_TARGET, "Failed to initialize mDNS: {:?}", err);
280 Toggle::from(None)
281 },
282 }
283 } else {
284 Toggle::from(None)
285 },
286 allow_non_globals_in_dht,
287 known_external_addresses: LruHashSet::new(
288 NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
289 .expect("value is a constant; constant is non-zero; qed."),
290 ),
291 records_to_publish: Default::default(),
292 kademlia_protocol,
293 provider_keys_requested: HashMap::new(),
294 }
295 }
296}
297
298pub struct DiscoveryBehaviour {
300 permanent_addresses: Vec<(PeerId, Multiaddr)>,
303 ephemeral_addresses: HashMap<PeerId, Vec<Multiaddr>>,
306 kademlia: Toggle<Kademlia<MemoryStore>>,
309 mdns: Toggle<TokioMdns>,
311 next_kad_random_query: Option<Delay>,
314 duration_to_next_kad: Duration,
316 pending_events: VecDeque<DiscoveryOut>,
318 local_peer_id: PeerId,
320 num_connections: u64,
322 allow_private_ip: bool,
325 discovery_only_if_under_num: u64,
327 allow_non_globals_in_dht: bool,
329 known_external_addresses: LruHashSet<Multiaddr>,
331 records_to_publish: HashMap<QueryId, Record>,
337 kademlia_protocol: Option<StreamProtocol>,
342 provider_keys_requested: HashMap<QueryId, RecordKey>,
344}
345
346impl DiscoveryBehaviour {
347 pub fn known_peers(&mut self) -> HashSet<PeerId> {
349 let mut peers = HashSet::new();
350 if let Some(k) = self.kademlia.as_mut() {
351 for b in k.kbuckets() {
352 for e in b.iter() {
353 if !peers.contains(e.node.key.preimage()) {
354 peers.insert(*e.node.key.preimage());
355 }
356 }
357 }
358 }
359 peers
360 }
361
362 pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
368 let addrs_list = self.ephemeral_addresses.entry(peer_id).or_default();
369 if addrs_list.contains(&addr) {
370 return;
371 }
372
373 if let Some(k) = self.kademlia.as_mut() {
374 k.add_address(&peer_id, addr.clone());
375 }
376
377 self.pending_events.push_back(DiscoveryOut::Discovered(peer_id));
378 addrs_list.push(addr);
379 }
380
381 pub fn add_self_reported_address(
387 &mut self,
388 peer_id: &PeerId,
389 supported_protocols: &[StreamProtocol],
390 addr: Multiaddr,
391 ) {
392 if let Some(kademlia) = self.kademlia.as_mut() {
393 if !self.allow_non_globals_in_dht && !Self::can_add_to_dht(&addr) {
394 trace!(
395 target: LOG_TARGET,
396 "Ignoring self-reported non-global address {} from {}.", addr, peer_id
397 );
398 return;
399 }
400
401 if !supported_protocols.iter().any(|p| {
407 p == self
408 .kademlia_protocol
409 .as_ref()
410 .expect("kademlia protocol was checked above to be enabled; qed")
411 }) {
412 trace!(
413 target: LOG_TARGET,
414 "Ignoring self-reported address {} from {} as remote node is not part of the \
415 Kademlia DHT supported by the local node.", addr, peer_id,
416 );
417 return;
418 }
419
420 trace!(
421 target: LOG_TARGET,
422 "Adding self-reported address {} from {} to Kademlia DHT.",
423 addr, peer_id
424 );
425 kademlia.add_address(peer_id, addr.clone());
426 }
427 }
428
429 pub fn find_closest_peers(&mut self, target: PeerId) {
433 if let Some(k) = self.kademlia.as_mut() {
434 k.get_closest_peers(target);
435 }
436 }
437
438 pub fn get_value(&mut self, key: RecordKey) {
442 if let Some(k) = self.kademlia.as_mut() {
443 k.get_record(key.clone());
444 }
445 }
446
447 pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
452 if let Some(k) = self.kademlia.as_mut() {
453 if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
454 warn!(target: LOG_TARGET, "Libp2p => Failed to put record: {:?}", e);
455 self.pending_events
456 .push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
457 }
458 }
459 }
460
461 pub fn put_record_to(
465 &mut self,
466 record: Record,
467 peers: HashSet<crate::types::PeerId>,
468 update_local_storage: bool,
469 ) {
470 if let Some(kad) = self.kademlia.as_mut() {
471 if update_local_storage {
472 if let Err(_e) = kad.store_mut().put(record.clone()) {
473 warn!(target: LOG_TARGET, "Failed to update local starage");
474 }
475 }
476
477 if !peers.is_empty() {
478 kad.put_record_to(
479 record,
480 peers.into_iter().map(|peer_id| peer_id.into()),
481 Quorum::All,
482 );
483 }
484 }
485 }
486
487 pub fn start_providing(&mut self, key: RecordKey) {
489 if let Some(kad) = self.kademlia.as_mut() {
490 if let Err(e) = kad.start_providing(key.clone()) {
491 warn!(target: LOG_TARGET, "Libp2p => Failed to start providing {key:?}: {e}.");
492 self.pending_events
493 .push_back(DiscoveryOut::StartProvidingFailed(key, Duration::from_secs(0)));
494 }
495 }
496 }
497
498 pub fn stop_providing(&mut self, key: &RecordKey) {
500 if let Some(kad) = self.kademlia.as_mut() {
501 kad.stop_providing(key);
502 }
503 }
504
505 pub fn get_providers(&mut self, key: RecordKey) {
507 if let Some(kad) = self.kademlia.as_mut() {
508 let query_id = kad.get_providers(key.clone());
509 self.provider_keys_requested.insert(query_id, key);
510 }
511 }
512
513 pub fn store_record(
515 &mut self,
516 record_key: RecordKey,
517 record_value: Vec<u8>,
518 publisher: Option<PeerId>,
519 expires: Option<Instant>,
520 ) {
521 if let Some(k) = self.kademlia.as_mut() {
522 if let Err(err) = k.store_mut().put(Record {
523 key: record_key,
524 value: record_value,
525 publisher: publisher.map(|publisher| publisher.into()),
526 expires,
527 }) {
528 debug!(
529 target: LOG_TARGET,
530 "Failed to store record with key: {:?}",
531 err
532 );
533 }
534 }
535 }
536
537 pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
542 self.kademlia.as_mut().map(|kad| {
543 kad.kbuckets()
544 .map(|bucket| (bucket.range().0.ilog2().unwrap_or(0), bucket.iter().count()))
545 .collect()
546 })
547 }
548
549 pub fn num_kademlia_records(&mut self) -> Option<usize> {
551 self.kademlia.as_mut().map(|kad| kad.store_mut().records().count())
553 }
554
555 pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
557 self.kademlia
560 .as_mut()
561 .map(|kad| kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len()))
562 }
563
564 pub fn can_add_to_dht(addr: &Multiaddr) -> bool {
571 let ip = match addr.iter().next() {
572 Some(Protocol::Ip4(ip)) => IpNetwork::from(ip),
573 Some(Protocol::Ip6(ip)) => IpNetwork::from(ip),
574 Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) => {
575 return true
576 },
577 _ => return false,
578 };
579 ip.is_global()
580 }
581}
582
583#[derive(Debug)]
585pub enum DiscoveryOut {
586 Discovered(PeerId),
589
590 UnroutablePeer(PeerId),
597
598 ClosestPeersFound(PeerId, Vec<(PeerId, Vec<Multiaddr>)>, Duration),
602
603 ClosestPeersNotFound(PeerId, Duration),
605
606 ValueFound(PeerRecord, Duration),
610
611 PutRecordRequest(RecordKey, Vec<u8>, Option<crate::types::PeerId>, Option<std::time::Instant>),
613
614 ValueNotFound(RecordKey, Duration),
618
619 ValuePut(RecordKey, Duration),
623
624 ValuePutFailed(RecordKey, Duration),
628
629 StartedProviding(RecordKey, Duration),
631
632 StartProvidingFailed(RecordKey, Duration),
634
635 ProvidersFound(RecordKey, HashSet<PeerId>, Duration),
637
638 NoMoreProviders(RecordKey, Duration),
640
641 ProvidersNotFound(RecordKey, Duration),
643
644 RandomKademliaStarted,
648}
649
650impl NetworkBehaviour for DiscoveryBehaviour {
651 type ConnectionHandler =
652 ToggleConnectionHandler<<Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler>;
653 type ToSwarm = DiscoveryOut;
654
655 fn handle_established_inbound_connection(
656 &mut self,
657 connection_id: ConnectionId,
658 peer: PeerId,
659 local_addr: &Multiaddr,
660 remote_addr: &Multiaddr,
661 ) -> Result<THandler<Self>, ConnectionDenied> {
662 self.kademlia.handle_established_inbound_connection(
663 connection_id,
664 peer,
665 local_addr,
666 remote_addr,
667 )
668 }
669
670 fn handle_established_outbound_connection(
671 &mut self,
672 connection_id: ConnectionId,
673 peer: PeerId,
674 addr: &Multiaddr,
675 role_override: Endpoint,
676 port_use: PortUse,
677 ) -> Result<THandler<Self>, ConnectionDenied> {
678 self.kademlia.handle_established_outbound_connection(
679 connection_id,
680 peer,
681 addr,
682 role_override,
683 port_use,
684 )
685 }
686
687 fn handle_pending_inbound_connection(
688 &mut self,
689 connection_id: ConnectionId,
690 local_addr: &Multiaddr,
691 remote_addr: &Multiaddr,
692 ) -> Result<(), ConnectionDenied> {
693 self.kademlia
694 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
695 }
696
697 fn handle_pending_outbound_connection(
698 &mut self,
699 connection_id: ConnectionId,
700 maybe_peer: Option<PeerId>,
701 addresses: &[Multiaddr],
702 effective_role: Endpoint,
703 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
704 let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) };
705
706 let mut list: LinkedHashSet<_> = self
711 .permanent_addresses
712 .iter()
713 .filter_map(|(p, a)| (*p == peer_id).then(|| a.clone()))
714 .collect();
715
716 if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
717 ephemeral_addresses.iter().for_each(|address| {
718 list.insert_if_absent(address.clone());
719 });
720 }
721
722 {
723 let mut list_to_filter = self.kademlia.handle_pending_outbound_connection(
724 connection_id,
725 maybe_peer,
726 addresses,
727 effective_role,
728 )?;
729
730 list_to_filter.extend(self.mdns.handle_pending_outbound_connection(
731 connection_id,
732 maybe_peer,
733 addresses,
734 effective_role,
735 )?);
736
737 if !self.allow_private_ip {
738 list_to_filter.retain(|addr| match addr.iter().next() {
739 Some(Protocol::Ip4(addr)) if !IpNetwork::from(addr).is_global() => false,
740 Some(Protocol::Ip6(addr)) if !IpNetwork::from(addr).is_global() => false,
741 _ => true,
742 });
743 }
744
745 list_to_filter.into_iter().for_each(|address| {
746 list.insert_if_absent(address);
747 });
748 }
749
750 trace!(target: LOG_TARGET, "Addresses of {:?}: {:?}", peer_id, list);
751
752 Ok(list.into_iter().collect())
753 }
754
755 fn on_swarm_event(&mut self, event: FromSwarm) {
756 match event {
757 FromSwarm::ConnectionEstablished(e) => {
758 self.num_connections += 1;
759 self.kademlia.on_swarm_event(FromSwarm::ConnectionEstablished(e));
760 },
761 FromSwarm::ConnectionClosed(e) => {
762 self.num_connections -= 1;
763 self.kademlia.on_swarm_event(FromSwarm::ConnectionClosed(e));
764 },
765 FromSwarm::DialFailure(e @ DialFailure { peer_id, error, .. }) => {
766 if let Some(peer_id) = peer_id {
767 if let DialError::Transport(errors) = error {
768 if let Entry::Occupied(mut entry) = self.ephemeral_addresses.entry(peer_id)
769 {
770 for (addr, _error) in errors {
771 entry.get_mut().retain(|a| a != addr);
772 }
773 if entry.get().is_empty() {
774 entry.remove();
775 }
776 }
777 }
778 }
779
780 self.kademlia.on_swarm_event(FromSwarm::DialFailure(e));
781 },
782 FromSwarm::ListenerClosed(e) => {
783 self.kademlia.on_swarm_event(FromSwarm::ListenerClosed(e));
784 },
785 FromSwarm::ListenFailure(e) => {
786 self.kademlia.on_swarm_event(FromSwarm::ListenFailure(e));
787 },
788 FromSwarm::ListenerError(e) => {
789 self.kademlia.on_swarm_event(FromSwarm::ListenerError(e));
790 },
791 FromSwarm::ExternalAddrExpired(e) => {
792 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e));
796 },
797 FromSwarm::NewListener(e) => {
798 self.kademlia.on_swarm_event(FromSwarm::NewListener(e));
799 },
800 FromSwarm::ExpiredListenAddr(e) => {
801 self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e));
802 },
803 FromSwarm::NewExternalAddrCandidate(e) => {
804 self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e));
805 },
806 FromSwarm::AddressChange(e) => {
807 self.kademlia.on_swarm_event(FromSwarm::AddressChange(e));
808 },
809 FromSwarm::NewListenAddr(e) => {
810 self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e));
811 self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
812 },
813 FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
814 let mut address = addr.clone();
815
816 if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
817 if peer_id != self.local_peer_id {
818 warn!(
819 target: LOG_TARGET,
820 "🔍 Discovered external address for a peer that is not us: {addr}",
821 );
822 return;
824 }
825 } else {
826 address.push(Protocol::P2p(self.local_peer_id));
827 }
828
829 if Self::can_add_to_dht(&address) {
830 if self.known_external_addresses.insert(address.clone()) {
833 info!(
834 target: LOG_TARGET,
835 "🔍 Discovered new external address for our node: {address}",
836 );
837 }
838 }
839
840 self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e));
841 },
842 FromSwarm::NewExternalAddrOfPeer(e) => {
843 self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
844 self.mdns.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(e));
845 },
846 event => {
847 debug!(target: LOG_TARGET, "New unknown `FromSwarm` libp2p event: {event:?}");
848 self.kademlia.on_swarm_event(event);
849 self.mdns.on_swarm_event(event);
850 },
851 }
852 }
853
854 fn on_connection_handler_event(
855 &mut self,
856 peer_id: PeerId,
857 connection_id: ConnectionId,
858 event: THandlerOutEvent<Self>,
859 ) {
860 self.kademlia.on_connection_handler_event(peer_id, connection_id, event);
861 }
862
863 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
864 if let Some(ev) = self.pending_events.pop_front() {
866 return Poll::Ready(ToSwarm::GenerateEvent(ev));
867 }
868
869 if let Some(kademlia) = self.kademlia.as_mut() {
871 if let Some(next_kad_random_query) = self.next_kad_random_query.as_mut() {
872 while next_kad_random_query.poll_unpin(cx).is_ready() {
873 let actually_started =
874 if self.num_connections < self.discovery_only_if_under_num {
875 let random_peer_id = PeerId::random();
876 debug!(
877 target: LOG_TARGET,
878 "Libp2p <= Starting random Kademlia request for {:?}",
879 random_peer_id,
880 );
881 kademlia.get_closest_peers(random_peer_id);
882 true
883 } else {
884 debug!(
885 target: LOG_TARGET,
886 "Kademlia paused due to high number of connections ({})",
887 self.num_connections
888 );
889 false
890 };
891
892 *next_kad_random_query = Delay::new(self.duration_to_next_kad);
895 self.duration_to_next_kad =
896 cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60));
897
898 if actually_started {
899 let ev = DiscoveryOut::RandomKademliaStarted;
900 return Poll::Ready(ToSwarm::GenerateEvent(ev));
901 }
902 }
903 }
904 }
905
906 while let Poll::Ready(ev) = self.kademlia.poll(cx) {
907 match ev {
908 ToSwarm::GenerateEvent(ev) => match ev {
909 KademliaEvent::RoutingUpdated { peer, .. } => {
910 let ev = DiscoveryOut::Discovered(peer);
911 return Poll::Ready(ToSwarm::GenerateEvent(ev));
912 },
913 KademliaEvent::UnroutablePeer { peer, .. } => {
914 let ev = DiscoveryOut::UnroutablePeer(peer);
915 return Poll::Ready(ToSwarm::GenerateEvent(ev));
916 },
917 KademliaEvent::RoutablePeer { .. } => {
918 },
921 KademliaEvent::PendingRoutablePeer { .. } => {
922 },
924 KademliaEvent::InboundRequest { request } => match request {
925 libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } => {
926 return Poll::Ready(ToSwarm::GenerateEvent(
927 DiscoveryOut::PutRecordRequest(
928 record.key,
929 record.value,
930 record.publisher.map(Into::into),
931 record.expires,
932 ),
933 ))
934 },
935 _ => {},
936 },
937 KademliaEvent::OutboundQueryProgressed {
938 result: QueryResult::GetClosestPeers(res),
939 stats,
940 ..
941 } => {
942 let (key, peers, timeout) = match res {
943 Ok(GetClosestPeersOk { key, peers }) => (key, peers, false),
944 Err(GetClosestPeersError::Timeout { key, peers }) => (key, peers, true),
945 };
946
947 let target = match PeerId::from_bytes(&key.clone()) {
948 Ok(peer_id) => peer_id,
949 Err(_) => {
950 warn!(
951 target: LOG_TARGET,
952 "Libp2p => FIND_NODE query finished for target that is not \
953 a peer ID: {:?}",
954 HexDisplay::from(&key),
955 );
956 continue;
957 },
958 };
959
960 if timeout {
961 debug!(
962 target: LOG_TARGET,
963 "Libp2p => Query for target {target:?} timed out and yielded {} peers",
964 peers.len(),
965 );
966 } else {
967 debug!(
968 target: LOG_TARGET,
969 "Libp2p => Query for target {target:?} yielded {} peers",
970 peers.len(),
971 );
972 }
973
974 let ev = if peers.is_empty() {
975 DiscoveryOut::ClosestPeersNotFound(
976 target,
977 stats.duration().unwrap_or_default(),
978 )
979 } else {
980 DiscoveryOut::ClosestPeersFound(
981 target,
982 peers.into_iter().map(|p| (p.peer_id, p.addrs)).collect(),
983 stats.duration().unwrap_or_default(),
984 )
985 };
986
987 return Poll::Ready(ToSwarm::GenerateEvent(ev));
988 },
989 KademliaEvent::OutboundQueryProgressed {
990 result: QueryResult::GetRecord(res),
991 stats,
992 id,
993 ..
994 } => {
995 let ev = match res {
996 Ok(GetRecordOk::FoundRecord(r)) => {
997 debug!(
998 target: LOG_TARGET,
999 "Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
1000 r.record.key,
1001 r.record.value,
1002 id,
1003 stats,
1004 );
1005
1006 if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
1013 if let Some(kad) = self.kademlia.as_mut() {
1014 if let Some(mut query) = kad.query_mut(&id) {
1015 query.finish();
1016 }
1017 }
1018 }
1019
1020 self.records_to_publish.insert(id, r.record.clone());
1023
1024 DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
1025 },
1026 Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
1027 cache_candidates,
1028 }) => {
1029 debug!(
1030 target: LOG_TARGET,
1031 "Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
1032 id,
1033 stats,
1034 stats.duration().map(|val| val.as_millis())
1035 );
1036 if let Some(record) = self.records_to_publish.remove(&id) {
1038 if cache_candidates.is_empty() {
1039 continue;
1040 }
1041
1042 if let Some(kad) = self.kademlia.as_mut() {
1045 kad.put_record_to(
1046 record,
1047 cache_candidates.into_iter().map(|v| v.1),
1048 Quorum::One,
1049 );
1050 }
1051 }
1052
1053 continue;
1054 },
1055 Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
1056 trace!(
1057 target: LOG_TARGET,
1058 "Libp2p => Failed to get record: {:?}",
1059 e,
1060 );
1061 DiscoveryOut::ValueNotFound(
1062 e.into_key(),
1063 stats.duration().unwrap_or_default(),
1064 )
1065 },
1066 Err(e) => {
1067 debug!(
1068 target: LOG_TARGET,
1069 "Libp2p => Failed to get record: {:?}",
1070 e,
1071 );
1072 DiscoveryOut::ValueNotFound(
1073 e.into_key(),
1074 stats.duration().unwrap_or_default(),
1075 )
1076 },
1077 };
1078 return Poll::Ready(ToSwarm::GenerateEvent(ev));
1079 },
1080 KademliaEvent::OutboundQueryProgressed {
1081 result: QueryResult::GetProviders(res),
1082 stats,
1083 id,
1084 ..
1085 } => {
1086 let ev = match res {
1087 Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1088 debug!(
1089 target: LOG_TARGET,
1090 "Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}",
1091 providers,
1092 key,
1093 id,
1094 stats,
1095 );
1096
1097 DiscoveryOut::ProvidersFound(
1098 key,
1099 providers,
1100 stats.duration().unwrap_or_default(),
1101 )
1102 },
1103 Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
1104 closest_peers: _,
1105 }) => {
1106 debug!(
1107 target: LOG_TARGET,
1108 "Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms",
1109 id,
1110 stats,
1111 stats.duration().map(|val| val.as_millis())
1112 );
1113
1114 if let Some(key) = self.provider_keys_requested.remove(&id) {
1115 DiscoveryOut::NoMoreProviders(
1116 key,
1117 stats.duration().unwrap_or_default(),
1118 )
1119 } else {
1120 error!(
1121 target: LOG_TARGET,
1122 "No key found for `GET_PROVIDERS` query {id:?}. This is a bug.",
1123 );
1124 continue;
1125 }
1126 },
1127 Err(GetProvidersError::Timeout { key, closest_peers: _ }) => {
1128 debug!(
1129 target: LOG_TARGET,
1130 "Libp2p => Failed to get providers for {key:?} due to timeout.",
1131 );
1132
1133 self.provider_keys_requested.remove(&id);
1134
1135 DiscoveryOut::ProvidersNotFound(
1136 key,
1137 stats.duration().unwrap_or_default(),
1138 )
1139 },
1140 };
1141 return Poll::Ready(ToSwarm::GenerateEvent(ev));
1142 },
1143 KademliaEvent::OutboundQueryProgressed {
1144 result: QueryResult::PutRecord(res),
1145 stats,
1146 ..
1147 } => {
1148 let ev = match res {
1149 Ok(ok) => {
1150 trace!(
1151 target: LOG_TARGET,
1152 "Libp2p => Put record for key: {:?}",
1153 ok.key,
1154 );
1155 DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_default())
1156 },
1157 Err(e) => {
1158 debug!(
1159 target: LOG_TARGET,
1160 "Libp2p => Failed to put record for key {:?}: {:?}",
1161 e.key(),
1162 e,
1163 );
1164 DiscoveryOut::ValuePutFailed(
1165 e.into_key(),
1166 stats.duration().unwrap_or_default(),
1167 )
1168 },
1169 };
1170 return Poll::Ready(ToSwarm::GenerateEvent(ev));
1171 },
1172 KademliaEvent::OutboundQueryProgressed {
1173 result: QueryResult::RepublishRecord(res),
1174 ..
1175 } => match res {
1176 Ok(ok) => debug!(
1177 target: LOG_TARGET,
1178 "Libp2p => Record republished: {:?}",
1179 ok.key,
1180 ),
1181 Err(e) => debug!(
1182 target: LOG_TARGET,
1183 "Libp2p => Republishing of record {:?} failed with: {:?}",
1184 e.key(), e,
1185 ),
1186 },
1187 KademliaEvent::OutboundQueryProgressed {
1188 result: QueryResult::StartProviding(res),
1189 stats,
1190 ..
1191 } => {
1192 let ev = match res {
1193 Ok(ok) => {
1194 trace!(
1195 target: LOG_TARGET,
1196 "Libp2p => Started providing key {:?}",
1197 ok.key,
1198 );
1199 DiscoveryOut::StartedProviding(
1200 ok.key,
1201 stats.duration().unwrap_or_default(),
1202 )
1203 },
1204 Err(e) => {
1205 debug!(
1206 target: LOG_TARGET,
1207 "Libp2p => Failed to start providing key {:?}: {:?}",
1208 e.key(),
1209 e,
1210 );
1211 DiscoveryOut::StartProvidingFailed(
1212 e.into_key(),
1213 stats.duration().unwrap_or_default(),
1214 )
1215 },
1216 };
1217 return Poll::Ready(ToSwarm::GenerateEvent(ev));
1218 },
1219 KademliaEvent::OutboundQueryProgressed {
1220 result: QueryResult::Bootstrap(res),
1221 ..
1222 } => match res {
1223 Ok(ok) => debug!(
1224 target: LOG_TARGET,
1225 "Libp2p => DHT bootstrap progressed: {ok:?}",
1226 ),
1227 Err(e) => warn!(
1228 target: LOG_TARGET,
1229 "Libp2p => DHT bootstrap error: {e:?}",
1230 ),
1231 },
1232 KademliaEvent::OutboundQueryProgressed { result: e, .. } => {
1234 warn!(target: LOG_TARGET, "Libp2p => Unhandled Kademlia event: {:?}", e)
1235 },
1236 Event::ModeChanged { new_mode } => {
1237 debug!(target: LOG_TARGET, "Libp2p => Kademlia mode changed: {new_mode}")
1238 },
1239 },
1240 ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }),
1241 event => {
1242 return Poll::Ready(event.map_out(|_| {
1243 unreachable!("`GenerateEvent` is handled in a branch above; qed")
1244 }));
1245 },
1246 }
1247 }
1248
1249 while let Poll::Ready(ev) = self.mdns.poll(cx) {
1251 match ev {
1252 ToSwarm::GenerateEvent(event) => match event {
1253 mdns::Event::Discovered(list) => {
1254 if self.num_connections >= self.discovery_only_if_under_num {
1255 continue;
1256 }
1257
1258 self.pending_events.extend(
1259 list.into_iter().map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)),
1260 );
1261 if let Some(ev) = self.pending_events.pop_front() {
1262 return Poll::Ready(ToSwarm::GenerateEvent(ev));
1263 }
1264 },
1265 mdns::Event::Expired(_) => {},
1266 },
1267 ToSwarm::Dial { .. } => {
1268 unreachable!("mDNS never dials!");
1269 },
1270 ToSwarm::NotifyHandler { event, .. } => match event {},
1272 event => {
1273 return Poll::Ready(
1274 event
1275 .map_in(|_| {
1276 unreachable!("`NotifyHandler` is handled in a branch above; qed")
1277 })
1278 .map_out(|_| {
1279 unreachable!("`GenerateEvent` is handled in a branch above; qed")
1280 }),
1281 );
1282 },
1283 }
1284 }
1285
1286 Poll::Pending
1287 }
1288}
1289
1290fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol {
1292 let name = format!("/{}/kad", id.as_ref());
1293 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1294}
1295
1296fn kademlia_protocol_name<Hash: AsRef<[u8]>>(
1298 genesis_hash: Hash,
1299 fork_id: Option<&str>,
1300) -> StreamProtocol {
1301 let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref());
1302 let name = if let Some(fork_id) = fork_id {
1303 format!("/{genesis_hash_hex}/{fork_id}/kad")
1304 } else {
1305 format!("/{genesis_hash_hex}/kad")
1306 };
1307
1308 StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed")
1309}
1310
1311#[cfg(test)]
1312mod tests {
1313 use super::{kademlia_protocol_name, legacy_kademlia_protocol_name, DiscoveryConfig};
1314 use crate::config::ProtocolId;
1315 use libp2p::{identity::Keypair, Multiaddr};
1316 use subsoil::core::hash::H256;
1317
1318 #[cfg(ignore_flaky_test)] #[tokio::test]
1320 async fn discovery_working() {
1321 use super::DiscoveryOut;
1322 use futures::prelude::*;
1323 use libp2p::{
1324 core::{
1325 transport::{MemoryTransport, Transport},
1326 upgrade,
1327 },
1328 noise,
1329 swarm::{Swarm, SwarmEvent},
1330 yamux,
1331 };
1332 use std::{collections::HashSet, task::Poll, time::Duration};
1333 let mut first_swarm_peer_id_and_addr = None;
1334
1335 let genesis_hash = H256::from_low_u64_be(1);
1336 let fork_id = Some("test-fork-id");
1337 let protocol_id = ProtocolId::from("dot");
1338
1339 let mut swarms = (0..25)
1342 .map(|i| {
1343 let mut swarm = libp2p::SwarmBuilder::with_new_identity()
1344 .with_tokio()
1345 .with_other_transport(|keypair| {
1346 MemoryTransport::new()
1347 .upgrade(upgrade::Version::V1)
1348 .authenticate(noise::Config::new(&keypair).unwrap())
1349 .multiplex(yamux::Config::default())
1350 .boxed()
1351 })
1352 .unwrap()
1353 .with_behaviour(|keypair| {
1354 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1355 config
1356 .with_permanent_addresses(first_swarm_peer_id_and_addr.clone())
1357 .allow_private_ip(true)
1358 .allow_non_globals_in_dht(true)
1359 .discovery_limit(50)
1360 .with_kademlia(genesis_hash, fork_id, &protocol_id);
1361
1362 config.finish()
1363 })
1364 .unwrap()
1365 .with_swarm_config(|config| {
1366 config.with_idle_connection_timeout(Duration::from_secs(10))
1368 })
1369 .build();
1370
1371 let listen_addr: Multiaddr =
1372 format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1373
1374 if i == 0 {
1375 first_swarm_peer_id_and_addr =
1376 Some((*swarm.local_peer_id(), listen_addr.clone()))
1377 }
1378
1379 swarm.listen_on(listen_addr.clone()).unwrap();
1380 (swarm, listen_addr)
1381 })
1382 .collect::<Vec<_>>();
1383
1384 let mut to_discover = (0..swarms.len())
1386 .map(|n| {
1387 (0..swarms.len())
1388 .skip(1)
1390 .filter(|p| *p != n)
1391 .map(|p| *Swarm::local_peer_id(&swarms[p].0))
1392 .collect::<HashSet<_>>()
1393 })
1394 .collect::<Vec<_>>();
1395
1396 let fut = futures::future::poll_fn(move |cx| {
1397 'polling: loop {
1398 for swarm_n in 0..swarms.len() {
1399 match swarms[swarm_n].0.poll_next_unpin(cx) {
1400 Poll::Ready(Some(e)) => {
1401 match e {
1402 SwarmEvent::Behaviour(behavior) => {
1403 match behavior {
1404 DiscoveryOut::UnroutablePeer(other)
1405 | DiscoveryOut::Discovered(other) => {
1406 let addr = swarms
1409 .iter()
1410 .find_map(|(s, a)| {
1411 if s.behaviour().local_peer_id == other {
1412 Some(a.clone())
1413 } else {
1414 None
1415 }
1416 })
1417 .unwrap();
1418 let protocol_names = if swarm_n % 2 == 0 {
1421 vec![kademlia_protocol_name(genesis_hash, fork_id)]
1422 } else {
1423 vec![
1424 legacy_kademlia_protocol_name(&protocol_id),
1425 kademlia_protocol_name(genesis_hash, fork_id),
1426 ]
1427 };
1428 swarms[swarm_n]
1429 .0
1430 .behaviour_mut()
1431 .add_self_reported_address(
1432 &other,
1433 protocol_names.as_slice(),
1434 addr,
1435 );
1436
1437 to_discover[swarm_n].remove(&other);
1438 },
1439 DiscoveryOut::RandomKademliaStarted => {},
1440 DiscoveryOut::ClosestPeersFound(..) => {},
1441 DiscoveryOut::ClosestPeersNotFound(..) => {},
1444 e => {
1445 panic!("Unexpected event: {:?}", e)
1446 },
1447 }
1448 },
1449 _ => {},
1451 }
1452 continue 'polling;
1453 },
1454 _ => {},
1455 }
1456 }
1457 break;
1458 }
1459
1460 if to_discover.iter().all(|l| l.is_empty()) {
1461 Poll::Ready(())
1462 } else {
1463 Poll::Pending
1464 }
1465 });
1466
1467 fut.await
1468 }
1469
1470 #[test]
1471 fn discovery_ignores_peers_with_unknown_protocols() {
1472 let supported_genesis_hash = H256::from_low_u64_be(1);
1473 let unsupported_genesis_hash = H256::from_low_u64_be(2);
1474 let supported_protocol_id = ProtocolId::from("a");
1475 let unsupported_protocol_id = ProtocolId::from("b");
1476
1477 let mut discovery = {
1478 let keypair = Keypair::generate_ed25519();
1479 let mut config = DiscoveryConfig::new(keypair.public().to_peer_id());
1480 config
1481 .allow_private_ip(true)
1482 .allow_non_globals_in_dht(true)
1483 .discovery_limit(50)
1484 .with_kademlia(supported_genesis_hash, None, &supported_protocol_id);
1485 config.finish()
1486 };
1487
1488 let predictable_peer_id = |bytes: &[u8; 32]| {
1489 Keypair::ed25519_from_bytes(bytes.to_owned()).unwrap().public().to_peer_id()
1490 };
1491
1492 let remote_peer_id = predictable_peer_id(b"00000000000000000000000000000001");
1493 let remote_addr: Multiaddr = "/memory/1".parse().unwrap();
1494 let another_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1495 let another_addr: Multiaddr = "/memory/2".parse().unwrap();
1496
1497 discovery.add_self_reported_address(
1499 &remote_peer_id,
1500 &[kademlia_protocol_name(unsupported_genesis_hash, None)],
1501 remote_addr.clone(),
1502 );
1503 discovery.add_self_reported_address(
1504 &another_peer_id,
1505 &[legacy_kademlia_protocol_name(&unsupported_protocol_id)],
1506 another_addr.clone(),
1507 );
1508
1509 {
1510 let kademlia = discovery.kademlia.as_mut().unwrap();
1511 assert!(
1512 kademlia
1513 .kbucket(remote_peer_id)
1514 .expect("Remote peer id not to be equal to local peer id.")
1515 .is_empty(),
1516 "Expect peer with unsupported protocol not to be added."
1517 );
1518 assert!(
1519 kademlia
1520 .kbucket(another_peer_id)
1521 .expect("Remote peer id not to be equal to local peer id.")
1522 .is_empty(),
1523 "Expect peer with unsupported protocol not to be added."
1524 );
1525 }
1526
1527 discovery.add_self_reported_address(
1529 &remote_peer_id,
1530 &[kademlia_protocol_name(supported_genesis_hash, None)],
1531 remote_addr.clone(),
1532 );
1533 {
1534 let kademlia = discovery.kademlia.as_mut().unwrap();
1535 assert!(
1536 !kademlia
1537 .kbucket(remote_peer_id)
1538 .expect("Remote peer id not to be equal to local peer id.")
1539 .is_empty(),
1540 "Expect peer with supported protocol to be added."
1541 );
1542 }
1543
1544 let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
1545 let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();
1546
1547 {
1549 let kademlia = discovery.kademlia.as_mut().unwrap();
1550 assert!(
1551 kademlia
1552 .kbucket(unsupported_peer_id)
1553 .expect("Remote peer id not to be equal to local peer id.")
1554 .is_empty(),
1555 "Expect unsupported peer not to be added."
1556 );
1557 }
1558 discovery.add_self_reported_address(
1561 &unsupported_peer_id,
1562 &[legacy_kademlia_protocol_name(&supported_protocol_id)],
1563 unsupported_peer_addr.clone(),
1564 );
1565 {
1566 let kademlia = discovery.kademlia.as_mut().unwrap();
1567 assert!(
1568 kademlia
1569 .kbucket(unsupported_peer_id)
1570 .expect("Remote peer id not to be equal to local peer id.")
1571 .is_empty(),
1572 "Expect unsupported peer not to be added."
1573 );
1574 }
1575
1576 discovery.add_self_reported_address(
1578 &another_peer_id,
1579 &[
1580 legacy_kademlia_protocol_name(&supported_protocol_id),
1581 kademlia_protocol_name(supported_genesis_hash, None),
1582 ],
1583 another_addr.clone(),
1584 );
1585
1586 {
1587 let kademlia = discovery.kademlia.as_mut().unwrap();
1588 assert_eq!(
1589 2,
1590 kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
1591 "Expect peers with supported protocol to be added."
1592 );
1593 assert!(
1594 !kademlia
1595 .kbucket(another_peer_id)
1596 .expect("Remote peer id not to be equal to local peer id.")
1597 .is_empty(),
1598 "Expect peer with supported protocol to be added."
1599 );
1600 }
1601 }
1602}