1#![allow(clippy::needless_lifetimes)]
24
25mod test;
26
27use crate::K_VALUE;
28use crate::addresses::{Addresses, Remove};
29use crate::handler::{
30 KademliaHandlerProto,
31 KademliaHandlerConfig,
32 KademliaRequestId,
33 KademliaHandlerEvent,
34 KademliaHandlerIn
35};
36use crate::jobs::*;
37use crate::kbucket::{self, KBucketsTable, NodeStatus, KBucketRef, KeyBytes};
38use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
39use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState, WeightedPeer};
40use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
41use crate::contact::Contact;
42use fnv::{FnvHashMap, FnvHashSet};
43use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId, multiaddr, identity::PublicKey, identity::Keypair};
44use libp2p_swarm::{
45 DialPeerCondition,
46 NetworkBehaviour,
47 NetworkBehaviourAction,
48 NotifyHandler,
49 PollParameters,
50};
51use log::{info, debug, warn, LevelFilter};
52use smallvec::SmallVec;
53use std::{borrow::Cow, error, iter, time::Duration};
54use std::collections::{HashSet, VecDeque};
55use std::fmt;
56use std::num::NonZeroUsize;
57use std::task::{Context, Poll};
58use std::vec;
59use wasm_timer::Instant;
60use trust_graph::{Certificate};
61use derivative::Derivative;
62use crate::metrics::Metrics;
63
64pub use crate::query::QueryStats;
65
66type TrustGraph = trust_graph::TrustGraph<trust_graph::InMemoryStorage>;
67
68pub struct Kademlia<TStore> {
71 kbuckets: KBucketsTable<kbucket::Key<PeerId>, Contact>,
73
74 kbucket_inserts: KademliaBucketInserts,
76
77 protocol_config: KademliaProtocolConfig,
79
80 queries: QueryPool<QueryInner>,
82
83 connected_peers: FnvHashSet<PeerId>,
87
88 add_provider_job: Option<AddProviderJob>,
91
92 put_record_job: Option<PutRecordJob>,
95
96 record_ttl: Option<Duration>,
98
99 provider_record_ttl: Option<Duration>,
101
102 connection_idle_timeout: Duration,
104
105 queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>,
107
108 local_addrs: HashSet<Multiaddr>,
110
111 store: TStore,
113
114 pub trust: TrustGraph,
115 pub(super) metrics: Metrics,
116
117 }
121
122#[derive(Copy, Clone, Debug, PartialEq, Eq)]
126pub enum KademliaBucketInserts {
127 OnConnected,
135 Manual,
144}
145
146#[derive(Debug, Clone)]
150pub struct KademliaConfig {
151 kbucket_pending_timeout: Duration,
152 query_config: QueryConfig,
153 protocol_config: KademliaProtocolConfig,
154 record_ttl: Option<Duration>,
155 record_replication_interval: Option<Duration>,
156 record_publication_interval: Option<Duration>,
157 provider_record_ttl: Option<Duration>,
158 provider_publication_interval: Option<Duration>,
159 connection_idle_timeout: Duration,
160 kbucket_inserts: KademliaBucketInserts,
161}
162
163impl Default for KademliaConfig {
164 fn default() -> Self {
165 KademliaConfig {
166 kbucket_pending_timeout: Duration::from_secs(60),
167 query_config: QueryConfig::default(),
168 protocol_config: Default::default(),
169 record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
170 record_replication_interval: Some(Duration::from_secs(60 * 60)),
171 record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
172 provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
173 provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
174 connection_idle_timeout: Duration::from_secs(10),
175 kbucket_inserts: KademliaBucketInserts::OnConnected,
176 }
177 }
178}
179
180impl KademliaConfig {
181 pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
187 self.protocol_config.set_protocol_name(name);
188 self
189 }
190
191 pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
198 self.query_config.timeout = timeout;
199 self
200 }
201
202 pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
207 self.query_config.replication_factor = replication_factor;
208 self
209 }
210
211 pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
224 self.query_config.parallelism = parallelism;
225 self
226 }
227
228 pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
237 if enabled {
238 unimplemented!(
239 "TODO FIXME: disjoint paths are not working correctly with weighted \
240 and swamp buckets. Need to fix at least behaviour::test::put_record"
241 )
242 }
243 self.query_config.disjoint_query_paths = enabled;
244 self
245 }
246
247 pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
257 self.record_ttl = record_ttl;
258 self
259 }
260
261 pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
279 self.record_replication_interval = interval;
280 self
281 }
282
283 pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
297 self.record_publication_interval = interval;
298 self
299 }
300
301 pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
307 self.provider_record_ttl = ttl;
308 self
309 }
310
311 pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
319 self.provider_publication_interval = interval;
320 self
321 }
322
323 pub fn set_connection_idle_timeout(&mut self, duration: Duration) -> &mut Self {
325 self.connection_idle_timeout = duration;
326 self
327 }
328
329 pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
334 self.protocol_config.set_max_packet_size(size);
335 self
336 }
337
338 pub fn set_kbucket_inserts(&mut self, inserts: KademliaBucketInserts) -> &mut Self {
340 self.kbucket_inserts = inserts;
341 self
342 }
343}
344
345impl<TStore> Kademlia<TStore>
346where
347 for<'a> TStore: RecordStore<'a>
348{
349 pub fn new(kp: Keypair, id: PeerId, store: TStore, trust: TrustGraph) -> Self {
351 Self::with_config(kp, id, store, Default::default(), trust)
352 }
353
354 pub fn protocol_name(&self) -> &[u8] {
356 self.protocol_config.protocol_name()
357 }
358
359 pub fn with_config(kp: Keypair, id: PeerId, store: TStore, config: KademliaConfig, trust: TrustGraph) -> Self {
361 let local_key = kbucket::Key::from(id);
362
363 let put_record_job = config
364 .record_replication_interval
365 .or(config.record_publication_interval)
366 .map(|interval| PutRecordJob::new(
367 id,
368 interval,
369 config.record_publication_interval,
370 config.record_ttl,
371 ));
372
373 let add_provider_job = config
374 .provider_publication_interval
375 .map(AddProviderJob::new);
376
377 Kademlia {
378 store,
379 kbuckets: KBucketsTable::new(kp, local_key, config.kbucket_pending_timeout),
380 kbucket_inserts: config.kbucket_inserts,
381 protocol_config: config.protocol_config,
382 queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
383 queries: QueryPool::new(config.query_config),
384 connected_peers: Default::default(),
385 add_provider_job,
386 put_record_job,
387 record_ttl: config.record_ttl,
388 provider_record_ttl: config.provider_record_ttl,
389 connection_idle_timeout: config.connection_idle_timeout,
390 local_addrs: HashSet::new(),
391 trust,
392 metrics: Metrics::disabled(),
393 }
394 }
395
396 pub fn enable_metrics(&mut self, registry: &prometheus::Registry) {
398 self.metrics = Metrics::enabled(registry, self.kbuckets.local_key().preimage());
399 }
400
401 pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
403 self.queries.iter().filter_map(|query|
404 if !query.is_finished() {
405 Some(QueryRef { query })
406 } else {
407 None
408 })
409 }
410
411 pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
413 self.queries.iter_mut().filter_map(|query|
414 if !query.is_finished() {
415 Some(QueryMut { query })
416 } else {
417 None
418 })
419 }
420
421 pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
423 self.queries.get(id).and_then(|query|
424 if !query.is_finished() {
425 Some(QueryRef { query })
426 } else {
427 None
428 })
429 }
430
431 pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
433 self.queries.get_mut(id).and_then(|query|
434 if !query.is_finished() {
435 Some(QueryMut { query })
436 } else {
437 None
438 })
439 }
440
441 pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr, public_key: PublicKey) -> RoutingUpdate {
459 let key = kbucket::Key::from(*peer);
460 let result = match self.kbuckets.entry(&key) {
461 kbucket::Entry::Present(mut entry, _) => {
462 if entry.value().insert(address) {
463 self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
464 KademliaEvent::RoutingUpdated {
465 peer: *peer,
466 addresses: entry.value().clone().into(),
467 old_peer: None,
468 }
469 ))
470 }
471 RoutingUpdate::Success
472 }
473 kbucket::Entry::Pending(mut entry, _) => {
474 entry.value().insert(address);
475 RoutingUpdate::Pending
476 }
477 kbucket::Entry::Absent(entry) => {
478 debug!(
479 "Will insert newly connected node {} with key {}",
480 entry.key().clone().into_preimage(),
481 bs58::encode(entry.key().as_ref()).into_string()
482 );
483 let contact = Contact {
484 addresses: Addresses::new(address),
485 public_key
486 };
487 let status =
488 if self.connected_peers.contains(peer) {
489 NodeStatus::Connected
490 } else {
491 NodeStatus::Disconnected
492 };
493 let (status, events) = Self::insert_new_peer(entry, contact, status, &self.connected_peers, &self.trust);
494 events.into_iter().for_each(|e| self.queued_events.push_back(e));
495 status
496 },
497 kbucket::Entry::SelfEntry => RoutingUpdate::Failed,
498 };
499
500 self.print_bucket_table();
501 result
502 }
503
504 pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr)
515 -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Contact>>
516 {
517 let key = kbucket::Key::from(*peer);
518 match self.kbuckets.entry(&key) {
519 kbucket::Entry::Present(mut entry, _) => {
520 if entry.value().addresses.remove(address, Remove::Completely).is_err() {
521 Some(entry.remove()) } else {
523 None
524 }
525 }
526 kbucket::Entry::Pending(mut entry, _) => {
527 if entry.value().addresses.remove(address, Remove::Completely).is_err() {
528 Some(entry.remove()) } else {
530 None
531 }
532 }
533 kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
534 None
535 }
536 }
537 }
538
539 pub fn remove_peer(&mut self, peer: &PeerId)
544 -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Contact>>
545 {
546 let key = kbucket::Key::from(*peer);
547 match self.kbuckets.entry(&key) {
548 kbucket::Entry::Present(entry, _) => {
549 Some(entry.remove())
550 }
551 kbucket::Entry::Pending(entry, _) => {
552 Some(entry.remove())
553 }
554 kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
555 None
556 }
557 }
558 }
559
560 pub fn kbuckets(&mut self)
562 -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Contact>>
563 {
564 self.kbuckets.iter().filter(|b| !b.is_empty())
565 }
566
567 pub fn kbucket<K>(&mut self, key: K)
571 -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Contact>>
572 where
573 K: Into<kbucket::Key<K>> + Clone
574 {
575 self.kbuckets.bucket(&key.into())
576 }
577
578 pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
583 where
584 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone
585 {
586 let info = QueryInfo::GetClosestPeers { key: key.clone().into() };
587 let target: kbucket::Key<K> = key.into();
588 let peers = Self::closest_keys(&mut self.kbuckets, &target);
589 let inner = QueryInner::new(info);
590 self.queries.add_iter_closest(target.clone(), peers, inner)
591 }
592
593 pub fn local_closest_peers<'s, 'k : 's, K: 's>(&'s mut self, key: &'k kbucket::Key<K>)
595 -> impl Iterator<Item = WeightedPeer> + 's
596 where
597 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone {
598 Self::closest_keys(&mut self.kbuckets, key)
599 }
600
601 pub fn get_record(&mut self, key: &record::Key, quorum: Quorum) -> QueryId {
606 let quorum = quorum.eval(self.queries.config().replication_factor);
607 let mut records = Vec::with_capacity(quorum.get());
608
609 if let Some(record) = self.store.get(key) {
610 if record.is_expired(Instant::now()) {
611 self.store.remove(key);
612 self.metrics.record_removed();
613 } else {
614 records.push(PeerRecord{ peer: None, record: record.into_owned()});
615 }
616 }
617
618 let done = records.len() >= quorum.get();
619 let target = kbucket::Key::new(key.clone());
620 let info = QueryInfo::GetRecord { key: key.clone(), records, quorum, cache_at: None };
621 let peers = Self::closest_keys(&mut self.kbuckets, &target);
622 let inner = QueryInner::new(info);
623 let id = self.queries.add_iter_closest(target.clone(), peers, inner); if done {
627 self.queries.get_mut(&id).expect("by (*)").finish();
628 }
629
630 id
631 }
632
633 pub fn put_record(&mut self, mut record: Record, quorum: Quorum) -> Result<QueryId, store::Error> {
651 record.publisher = Some(*self.kbuckets.local_key().preimage());
652 self.store.put(record.clone())?;
653 self.metrics.store_put();
654 record.expires = record.expires.or_else(||
655 self.record_ttl.map(|ttl| Instant::now() + ttl));
656 let quorum = quorum.eval(self.queries.config().replication_factor);
657 let target = kbucket::Key::new(record.key.clone());
658
659 let peers = Self::closest_keys(&mut self.kbuckets, &target);
660 let context = PutRecordContext::Publish;
661 let info = QueryInfo::PutRecord {
662 context,
663 record,
664 quorum,
665 phase: PutRecordPhase::GetClosestPeers
666 };
667 let inner = QueryInner::new(info);
668 Ok(self.queries.add_iter_closest(target.clone(), peers, inner))
669 }
670
671 pub fn remove_record(&mut self, key: &record::Key) {
681 if let Some(r) = self.store.get(key) {
682 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
683 self.store.remove(key);
684 self.metrics.record_removed();
685 }
686 }
687 }
688
689 pub fn store_mut(&mut self) -> &mut TStore {
691 &mut self.store
692 }
693
694 pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
714 let local_key = self.kbuckets.local_key().clone();
715 let info = QueryInfo::Bootstrap {
716 peer: *local_key.preimage(),
717 remaining: None
718 };
719 let peers = Self::closest_keys(&mut self.kbuckets, &local_key).collect::<Vec<_>>();
720 if peers.is_empty() {
721 Err(NoKnownPeers())
722 } else {
723 let inner = QueryInner::new(info);
724 Ok(self.queries.add_iter_closest(local_key, peers, inner))
725 }
726 }
727
728 pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
751 self.print_bucket_table();
752
753 let local_addrs = Vec::new();
757 let record = ProviderRecord::new(
759 key.clone(),
760 *self.kbuckets.local_key().preimage(),
761 local_addrs);
762 self.store.add_provider(record)?;
763 let target = kbucket::Key::new(key.clone());
764 debug!(
765 "start_providing for key {} ; kademlia key {}",
766 bs58::encode(target.preimage().as_ref()).into_string(), bs58::encode(target.as_ref()).into_string(), );
769 let provider_key = self.kbuckets.local_public_key();
770 let certificates = self.get_certificates(&provider_key);
771 let peers = Self::closest_keys(&mut self.kbuckets, &target);
772 let context = AddProviderContext::Publish;
773 let info = QueryInfo::AddProvider {
774 context,
775 key,
776 phase: AddProviderPhase::GetClosestPeers,
777 provider_key,
778 certificates
779 };
780 let inner = QueryInner::new(info);
781 let id = self.queries.add_iter_closest(target.clone(), peers, inner);
782 Ok(id)
783 }
784
785 pub fn stop_providing(&mut self, key: &record::Key) {
790 let target = kbucket::Key::new(key.clone());
791 debug!(
792 "stop_providing for key {} ; kademlia key {}",
793 bs58::encode(key.as_ref()).into_string(), bs58::encode(target.as_ref()).into_string(), );
796 self.store.remove_provider(key, self.kbuckets.local_key().preimage());
797 }
798
799 pub fn get_providers(&mut self, key: record::Key) -> QueryId {
804 self.print_bucket_table();
805 let info = QueryInfo::GetProviders {
806 key: key.clone(),
807 providers: HashSet::new(),
808 };
809 let target = kbucket::Key::new(key);
810 debug!(
811 "get_providers for key {} ; kademlia key {}",
812 bs58::encode(target.preimage().as_ref()).into_string(), bs58::encode(target.as_ref()).into_string(), );
815 let peers = Self::closest_keys(&mut self.kbuckets, &target);
816 let inner = QueryInner::new(info);
817 self.queries.add_iter_closest(target.clone(), peers, inner)
818 }
819
820 pub fn replicate_record(&mut self, key: record::Key) {
822 if let Some(rec) = self.store.get(&key).map(|r| r.into_owned()) {
823 self.start_put_record(rec, Quorum::All, PutRecordContext::Replicate);
824 if let Some(job) = self.put_record_job.as_mut() {
825 job.skip(key)
827 }
828 }
829 }
830
831 fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
833 where
834 I: Iterator<Item = &'a KadPeer> + Clone
835 {
836 let cur_time = trust_graph::current_time();
838 for peer in peers.clone() {
839 for cert in peer.certificates.iter() {
840 match self.trust.add(cert, cur_time) {
841 Ok(_) => log::trace!("{} added cert {:?} from {}", self.kbuckets.local_key().preimage(), cert, source),
842 Err(err) => log::info!("Unable to add certificate for peer {}: {}", peer.node_id, err),
843 }
844 }
845 }
846
847 let local_id = self.kbuckets.local_key().preimage();
848 let others_iter = peers.filter(|p| &p.node_id != local_id);
849 let trust = &self.trust;
850 if let Some(query) = self.queries.get_mut(query_id) {
851 log::trace!("Request to {:?} in query {:?} succeeded.", source, query_id);
852 for peer in others_iter.clone() {
853 log::trace!("Peer {:?} reported by {:?} in query {:?}.", peer, source, query_id);
854 query.inner.contacts.insert(peer.node_id, peer.clone().into());
855 }
856 query.on_success(source, others_iter.map(|kp| WeightedPeer {
857 peer_id: kp.node_id.clone().into(),
858 weight: get_weight(trust, &kp.public_key),
859 }))
860 }
861 }
862
863 fn find_closest<T: Clone>(&mut self, target: &kbucket::Key<T>, source: &PeerId) -> Vec<KadPeer> {
867 if target == self.kbuckets.local_key() {
868 Vec::new()
869 } else {
870 let mut peers: Vec<_> = self.kbuckets
871 .closest(target)
872 .filter(|e| e.node.key.preimage() != source)
873 .take(self.queries.config().replication_factor.get())
874 .map(KadPeer::from)
875 .collect();
876 peers.iter_mut().for_each(|mut peer|
877 peer.certificates = self.get_certificates(&peer.public_key)
878 );
879 peers
880 }
881 }
882
883 fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
885 let kbuckets = &mut self.kbuckets;
886 let connected = &mut self.connected_peers;
887 let local_addrs = &self.local_addrs;
888 let trust = &self.trust;
889 self.store.providers(key)
890 .into_iter()
891 .filter_map(move |p| {
892 let provider_id = if log::max_level() >= LevelFilter::Debug {
893 p.provider.to_string()
894 } else {
895 String::new()
896 };
897
898 let kad_peer = if &p.provider != source {
899 let node_id = p.provider;
900 let connection_ty = if connected.contains(&node_id) {
901 KadConnectionType::Connected
902 } else {
903 KadConnectionType::NotConnected
904 };
905
906 if &node_id == kbuckets.local_key().preimage() {
907 let self_key = kbuckets.local_public_key();
910 Some(KadPeer {
911 node_id,
912 connection_ty,
913 multiaddrs: local_addrs.iter().cloned().collect::<Vec<_>>(),
914 certificates: get_certificates(&trust, &self_key),
915 public_key: self_key,
916 })
917 } else {
918 let key = kbucket::Key::from(node_id);
919 kbuckets.entry(&key).view().map(|e| {
920 let contact = e.node.value;
921 let multiaddrs = if p.addresses.is_empty() {
922 contact.addresses.clone().into_vec()
925 } else {
926 p.addresses
927 };
928 let certificates = {
929 match node_id.as_public_key() {
930 Some(pk) =>
931 get_certificates(&trust, &pk),
932 None => {
933 log::warn!("Provider {} has a non-inlined public key: {:?}", node_id, key);
934 vec![]
935 }
936 }
937 };
938
939 KadPeer {
940 node_id,
941 multiaddrs,
942 public_key: contact.public_key.clone(),
943 connection_ty: match e.status {
944 NodeStatus::Connected => KadConnectionType::Connected,
945 NodeStatus::Disconnected => KadConnectionType::NotConnected
946 },
947 certificates
948 }
949 })
950 }
951 } else {
952 None
953 };
954 debug!(
955 "Local provider for {}: {}; source: {}; found? {}",
956 bs58::encode(key).into_string(),
957 provider_id,
958 source,
959 kad_peer.is_some()
960 );
961 kad_peer
962 })
963 .take(self.queries.config().replication_factor.get())
964 .collect::<Vec<_>>()
965 }
966
967 fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
969 let provider_key = self.kbuckets.local_public_key();
970 let certificates = self.get_certificates(&provider_key);
971 let info = QueryInfo::AddProvider {
972 context,
973 key: key.clone(),
974 phase: AddProviderPhase::GetClosestPeers,
975 provider_key,
976 certificates
977 };
978 let target = kbucket::Key::new(key);
979 let peers = Self::closest_keys(&mut self.kbuckets, &target);
980 let inner = QueryInner::new(info);
981 self.queries.add_iter_closest(target.clone(), peers, inner);
982 }
983
984 fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
986 let quorum = quorum.eval(self.queries.config().replication_factor);
987 let target = kbucket::Key::new(record.key.clone());
988 let peers = Self::closest_keys(&mut self.kbuckets, &target);
989 let info = QueryInfo::PutRecord {
990 record, quorum, context, phase: PutRecordPhase::GetClosestPeers
991 };
992 let inner = QueryInner::new(info);
993 self.queries.add_iter_closest(target.clone(), peers, inner);
994 }
995
996 fn connection_updated(&mut self, peer: PeerId, contact: Option<Contact>, new_status: NodeStatus) {
998 let key = kbucket::Key::from(peer);
999 match self.kbuckets.entry(&key) {
1000 kbucket::Entry::Present(mut entry, old_status) => {
1001 if let Some(contact) = contact {
1002 if *entry.value() != contact { *entry.value() = contact; self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
1005 KademliaEvent::RoutingUpdated {
1006 peer,
1007 addresses: entry.value().addresses.clone(),
1008 old_peer: None,
1009 }
1010 ))
1011 }
1012 }
1013 if old_status != new_status {
1014 entry.update(new_status);
1015 }
1016 },
1017
1018 kbucket::Entry::Pending(mut entry, old_status) => {
1019 if let Some(contact) = contact {
1020 *entry.value() = contact;
1021 }
1022 if old_status != new_status {
1023 entry.update(new_status);
1024 }
1025 },
1026
1027 kbucket::Entry::Absent(entry) => {
1028 if new_status != NodeStatus::Connected {
1030 return
1031 }
1032
1033 match (contact, self.kbucket_inserts) {
1034 (None, _) => {
1035 self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
1036 KademliaEvent::UnroutablePeer { peer }
1037 ));
1038 }
1039 (Some(c), KademliaBucketInserts::Manual) => {
1040 let address = c.addresses.iter().last().expect("addresses can't be empty here").clone();
1041 self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
1042 KademliaEvent::RoutablePeer { peer, address }
1043 ));
1044 }
1045 (Some(contact), KademliaBucketInserts::OnConnected) => {
1046 Self::insert_new_peer(entry, contact, new_status, &self.connected_peers, &self.trust)
1048 .1
1049 .into_iter()
1050 .for_each(|e|
1051 self.queued_events.push_back(e)
1052 );
1053 }
1054 }
1055 },
1056 _ => {}
1057 }
1058
1059 self.print_bucket_table();
1060 }
1061
1062 fn insert_new_peer(
1063 entry: kbucket::AbsentEntry<kbucket::Key<PeerId>, Contact>,
1064 contact: Contact,
1065 status: NodeStatus,
1066 connected_peers: &FnvHashSet<PeerId>,
1067 trust: &TrustGraph
1068 ) -> (RoutingUpdate, Vec<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>)
1069 {
1070 let addresses = contact.addresses.clone();
1071 let peer = entry.key().preimage().clone();
1072 let weight = get_weight(&trust, &contact.public_key);
1073 debug!(
1074 "Calculated weight for {} pk {}: {}",
1075 entry.key().preimage(),
1076 bs58::encode(contact.public_key.clone().into_protobuf_encoding()).into_string(),
1077 weight
1078 );
1079 let address = contact.addresses.iter().last().expect("addresses can't be empty here").clone();
1081 match entry.insert(contact, status, weight) {
1082 kbucket::InsertResult::Inserted => {
1083 (
1084 RoutingUpdate::Success,
1085 vec![
1086 NetworkBehaviourAction::GenerateEvent(
1087 KademliaEvent::RoutingUpdated {
1088 peer,
1089 addresses,
1090 old_peer: None,
1091 }
1092 )
1093 ]
1094 )
1095 },
1096 kbucket::InsertResult::Full => {
1097 debug!("Bucket full. Peer not added to routing table: {}", peer);
1098 (
1099 RoutingUpdate::Failed,
1100 vec![NetworkBehaviourAction::GenerateEvent(
1101 KademliaEvent::RoutablePeer { peer, address }
1102 )]
1103 )
1104 },
1105 kbucket::InsertResult::Pending { disconnected } => { debug_assert!(!connected_peers.contains(disconnected.preimage()));
1107 let address = addresses.first().clone();
1108 (
1109 RoutingUpdate::Pending,
1110 vec![
1111 NetworkBehaviourAction::GenerateEvent(
1113 KademliaEvent::PendingRoutablePeer { peer, address }
1114 ),
1115 NetworkBehaviourAction::DialPeer {
1116 peer_id: disconnected.into_preimage(),
1117 condition: DialPeerCondition::Disconnected
1118 },
1119 ]
1120 )
1121 },
1122 }
1123 }
1124
1125 fn query_finished(&mut self, q: Query<QueryInner>, params: &mut impl PollParameters)
1127 -> Option<KademliaEvent>
1128 {
1129 let query_id = q.id();
1130 let result = q.into_result();
1131
1132 log::trace!("Query {} ({:?}) finished", format!("{:#?}", result.inner.info).lines().take(1).next().unwrap(), query_id);
1133 match result.inner.info {
1134 QueryInfo::Bootstrap { peer, remaining } => {
1135 self.print_bucket_table();
1136 let local_key = self.kbuckets.local_key().clone();
1137 let mut remaining = remaining.unwrap_or_else(|| {
1138 debug_assert_eq!(&peer, local_key.preimage());
1139 self.kbuckets.iter()
1144 .skip_while(|b| b.is_empty())
1145 .skip(1) .map(|b| {
1147 let mut target = kbucket::Key::from(PeerId::random());
1161 for _ in 0 .. 16 {
1162 let d = local_key.distance(&target);
1163 if b.contains(&d) {
1164 break;
1165 }
1166 target = kbucket::Key::from(PeerId::random());
1167 }
1168 target
1169 }).collect::<Vec<_>>().into_iter()
1170 });
1171
1172 let num_remaining = remaining.len().saturating_sub(1) as u32;
1173
1174 if let Some(target) = remaining.next() {
1175 let info = QueryInfo::Bootstrap {
1176 peer: target.clone().into_preimage(),
1177 remaining: Some(remaining)
1178 };
1179 let peers = Self::closest_keys(&mut self.kbuckets, &target);
1180 let inner = QueryInner::new(info);
1181 self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
1182 }
1183
1184 Some(KademliaEvent::QueryResult {
1185 id: query_id,
1186 stats: result.stats,
1187 result: QueryResult::Bootstrap(Ok(BootstrapOk { peer, num_remaining }))
1188 })
1189 }
1190
1191 QueryInfo::GetClosestPeers { key, .. } => {
1192 Some(KademliaEvent::QueryResult {
1193 id: query_id,
1194 stats: result.stats,
1195 result: QueryResult::GetClosestPeers(Ok(
1196 GetClosestPeersOk { key, peers: result.peers.collect() }
1197 ))
1198 })
1199 }
1200
1201 QueryInfo::GetProviders { key, providers } => {
1202 Some(KademliaEvent::QueryResult {
1203 id: query_id,
1204 stats: result.stats,
1205 result: QueryResult::GetProviders(Ok(
1206 GetProvidersOk {
1207 key,
1208 providers,
1209 closest_peers: result.peers.collect()
1210 }
1211 ))
1212 })
1213 }
1214
1215 QueryInfo::AddProvider {
1216 context,
1217 key,
1218 phase: AddProviderPhase::GetClosestPeers,
1219 ..
1220 } => {
1221 let provider_id = *params.local_peer_id();
1222 let external_addresses = params.external_addresses().map(|r| r.addr).collect();
1223 let provider_key = self.kbuckets.local_public_key();
1224 let certificates = self.get_certificates(&provider_key);
1225 let inner = QueryInner::new(QueryInfo::AddProvider {
1226 context,
1227 key,
1228 phase: AddProviderPhase::AddProvider {
1229 provider_id,
1230 external_addresses,
1231 get_closest_peers_stats: result.stats
1232 },
1233 provider_key,
1234 certificates
1235 });
1236 let contacts = &result.inner.contacts;
1237 let trust = &self.trust;
1238 let peers = result.peers.into_iter().map(|peer_id| {
1239 let weight = contacts
1240 .get(&peer_id)
1241 .map(|c| get_weight(&trust, &c.public_key))
1242 .unwrap_or_default();
1243 WeightedPeer {
1244 peer_id: peer_id.into(),
1245 weight,
1246 }
1247 });
1248 self.queries.continue_fixed(query_id, peers, inner);
1249 None
1250 }
1251
1252 QueryInfo::AddProvider {
1253 context,
1254 key,
1255 phase: AddProviderPhase::AddProvider { get_closest_peers_stats, .. },
1256 ..
1257 } => {
1258 log::debug!("AddProvider finished {:?}!", context);
1259 match context {
1260 AddProviderContext::Publish => {
1261 Some(KademliaEvent::QueryResult {
1262 id: query_id,
1263 stats: get_closest_peers_stats.merge(result.stats),
1264 result: QueryResult::StartProviding(Ok(AddProviderOk { key }))
1265 })
1266 }
1267 AddProviderContext::Republish => {
1268 Some(KademliaEvent::QueryResult {
1269 id: query_id,
1270 stats: get_closest_peers_stats.merge(result.stats),
1271 result: QueryResult::RepublishProvider(Ok(AddProviderOk { key }))
1272 })
1273 }
1274 }
1275 }
1276
1277 QueryInfo::GetRecord { key, records, quorum, cache_at } => {
1278 let results = if records.len() >= quorum.get() { if let Some(cache_key) = cache_at {
1280 let record = records.first().expect("[not empty]").record.clone();
1283 let quorum = NonZeroUsize::new(1).expect("1 > 0");
1284 let context = PutRecordContext::Cache;
1285 let info = QueryInfo::PutRecord {
1286 context,
1287 record,
1288 quorum,
1289 phase: PutRecordPhase::PutRecord {
1290 success: vec![],
1291 get_closest_peers_stats: QueryStats::empty()
1292 }
1293 };
1294 let inner = QueryInner::new(info);
1295 let peer_id = cache_key.preimage();
1296 let trust = &self.trust;
1297 let weight =
1298 result.inner.contacts.get(peer_id)
1299 .map(|c| get_weight(&trust, &c.public_key))
1300 .unwrap_or_default();
1301 let peer = WeightedPeer {
1302 weight,
1303 peer_id: cache_key
1304 };
1305 self.queries.add_fixed(iter::once(peer), inner);
1306 }
1307 Ok(GetRecordOk { records })
1308 } else if records.is_empty() {
1309 Err(GetRecordError::NotFound {
1310 key,
1311 closest_peers: result.peers.collect()
1312 })
1313 } else {
1314 Err(GetRecordError::QuorumFailed { key, records, quorum })
1315 };
1316 Some(KademliaEvent::QueryResult {
1317 id: query_id,
1318 stats: result.stats,
1319 result: QueryResult::GetRecord(results)
1320 })
1321 }
1322
1323 QueryInfo::PutRecord {
1324 context,
1325 record,
1326 quorum,
1327 phase: PutRecordPhase::GetClosestPeers
1328 } => {
1329 let info = QueryInfo::PutRecord {
1330 context,
1331 record,
1332 quorum,
1333 phase: PutRecordPhase::PutRecord {
1334 success: vec![],
1335 get_closest_peers_stats: result.stats
1336 }
1337 };
1338 let inner = QueryInner::new(info);
1339 let contacts = &result.inner.contacts;
1340 let trust = &self.trust;
1341 let peers = result.peers.into_iter().map(|peer_id| {
1342 let weight =
1343 contacts
1344 .get(&peer_id)
1345 .map(|c| get_weight(&trust, &c.public_key))
1346 .unwrap_or_default();
1347
1348 WeightedPeer {
1349 peer_id: peer_id.into(),
1350 weight,
1351 }
1352 });
1353 self.queries.continue_fixed(query_id, peers, inner);
1354 None
1355 }
1356
1357 QueryInfo::PutRecord {
1358 context,
1359 record,
1360 quorum,
1361 phase: PutRecordPhase::PutRecord { success, get_closest_peers_stats }
1362 } => {
1363 let mk_result = |key: record::Key| {
1364 if success.len() >= quorum.get() {
1365 Ok(PutRecordOk { key })
1366 } else {
1367 Err(PutRecordError::QuorumFailed { key, quorum, success })
1368 }
1369 };
1370 match context {
1371 PutRecordContext::Publish =>
1372 Some(KademliaEvent::QueryResult {
1373 id: query_id,
1374 stats: get_closest_peers_stats.merge(result.stats),
1375 result: QueryResult::PutRecord(mk_result(record.key))
1376 }),
1377 PutRecordContext::Republish =>
1378 Some(KademliaEvent::QueryResult {
1379 id: query_id,
1380 stats: get_closest_peers_stats.merge(result.stats),
1381 result: QueryResult::RepublishRecord(mk_result(record.key))
1382 }),
1383 PutRecordContext::Replicate => {
1384 debug!("Record replicated: {:?}", record.key);
1385 None
1386 }
1387 PutRecordContext::Cache => {
1388 debug!("Record cached: {:?}", record.key);
1389 None
1390 }
1391 }
1392 }
1393 }
1394 }
1395
1396 fn query_timeout(&mut self, query: Query<QueryInner>) -> Option<KademliaEvent> {
1398 let query_id = query.id();
1399 log::trace!("Query {:?} timed out.", query_id);
1400 let result = query.into_result();
1401 match result.inner.info {
1402 QueryInfo::Bootstrap { peer, mut remaining } => {
1403 let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1404
1405 if let Some(mut remaining) = remaining.take() {
1406 if let Some(target) = remaining.next() {
1408 let info = QueryInfo::Bootstrap {
1409 peer: target.clone().into_preimage(),
1410 remaining: Some(remaining)
1411 };
1412 let peers = Self::closest_keys(&mut self.kbuckets, &target);
1413 let inner = QueryInner::new(info);
1414 self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
1415 }
1416 }
1417
1418 Some(KademliaEvent::QueryResult {
1419 id: query_id,
1420 stats: result.stats,
1421 result: QueryResult::Bootstrap(Err(
1422 BootstrapError::Timeout { peer, num_remaining }
1423 ))
1424 })
1425 }
1426
1427 QueryInfo::AddProvider { context, key, .. } =>
1428 Some(match context {
1429 AddProviderContext::Publish =>
1430 KademliaEvent::QueryResult {
1431 id: query_id,
1432 stats: result.stats,
1433 result: QueryResult::StartProviding(Err(
1434 AddProviderError::Timeout { key }
1435 ))
1436 },
1437 AddProviderContext::Republish =>
1438 KademliaEvent::QueryResult {
1439 id: query_id,
1440 stats: result.stats,
1441 result: QueryResult::RepublishProvider(Err(
1442 AddProviderError::Timeout { key }
1443 ))
1444 }
1445 }),
1446
1447 QueryInfo::GetClosestPeers { key } => {
1448 Some(KademliaEvent::QueryResult {
1449 id: query_id,
1450 stats: result.stats,
1451 result: QueryResult::GetClosestPeers(Err(
1452 GetClosestPeersError::Timeout {
1453 key,
1454 peers: result.peers.collect()
1455 }
1456 ))
1457 })
1458 },
1459
1460 QueryInfo::PutRecord { record, quorum, context, phase } => {
1461 let err = Err(PutRecordError::Timeout {
1462 key: record.key,
1463 quorum,
1464 success: match phase {
1465 PutRecordPhase::GetClosestPeers => vec![],
1466 PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1467 }
1468 });
1469 match context {
1470 PutRecordContext::Publish =>
1471 Some(KademliaEvent::QueryResult {
1472 id: query_id,
1473 stats: result.stats,
1474 result: QueryResult::PutRecord(err)
1475 }),
1476 PutRecordContext::Republish =>
1477 Some(KademliaEvent::QueryResult {
1478 id: query_id,
1479 stats: result.stats,
1480 result: QueryResult::RepublishRecord(err)
1481 }),
1482 PutRecordContext::Replicate => match phase {
1483 PutRecordPhase::GetClosestPeers => {
1484 warn!("Locating closest peers for replication failed: {:?}", err);
1485 None
1486 }
1487 PutRecordPhase::PutRecord { .. } => {
1488 debug!("Replicating record failed: {:?}", err);
1489 None
1490 }
1491 }
1492 PutRecordContext::Cache => match phase {
1493 PutRecordPhase::GetClosestPeers => {
1494 unreachable!()
1498 }
1499 PutRecordPhase::PutRecord { .. } => {
1500 debug!("Caching record failed: {:?}", err);
1501 None
1502 }
1503 }
1504 }
1505 }
1506
1507 QueryInfo::GetRecord { key, records, quorum, .. } =>
1508 Some(KademliaEvent::QueryResult {
1509 id: query_id,
1510 stats: result.stats,
1511 result: QueryResult::GetRecord(Err(
1512 GetRecordError::Timeout { key, records, quorum },
1513 ))
1514 }),
1515
1516 QueryInfo::GetProviders { key, providers } =>
1517 Some(KademliaEvent::QueryResult {
1518 id: query_id,
1519 stats: result.stats,
1520 result: QueryResult::GetProviders(Err(
1521 GetProvidersError::Timeout {
1522 key,
1523 providers,
1524 closest_peers: result.peers.collect()
1525 }
1526 ))
1527 })
1528 }
1529 }
1530
1531 fn record_received(
1533 &mut self,
1534 source: PeerId,
1535 connection: ConnectionId,
1536 request_id: KademliaRequestId,
1537 mut record: Record
1538 ) {
1539 if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1540 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1544 peer_id: source,
1545 handler: NotifyHandler::One(connection),
1546 event: KademliaHandlerIn::PutRecordRes {
1547 key: record.key,
1548 value: record.value,
1549 request_id,
1550 },
1551 });
1552 return
1553 }
1554
1555 let now = Instant::now();
1556
1557 let target = kbucket::Key::new(record.key.clone());
1562 let num_between = self.kbuckets.count_nodes_between(&target);
1563 let k = self.queries.config().replication_factor.get();
1564 let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1565 let expiration = self.record_ttl.map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1566 record.expires = record.expires.or(expiration).min(expiration);
1569
1570 if let Some(job) = self.put_record_job.as_mut() {
1571 job.skip(record.key.clone())
1577 }
1578
1579 if !record.is_expired(now) {
1589 match self.store.put(record.clone()) {
1593 Ok(()) => {
1594 self.metrics.store_put();
1595 debug!("Record stored: {:?}; {} bytes", record.key, record.value.len());
1596 },
1597 Err(e) => {
1598 info!("Record not stored: {:?}", e);
1599 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1600 peer_id: source,
1601 handler: NotifyHandler::One(connection),
1602 event: KademliaHandlerIn::Reset(request_id)
1603 });
1604
1605 return
1606 }
1607 }
1608 }
1609
1610 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1618 peer_id: source,
1619 handler: NotifyHandler::One(connection),
1620 event: KademliaHandlerIn::PutRecordRes {
1621 key: record.key,
1622 value: record.value,
1623 request_id,
1624 },
1625 })
1626 }
1627
1628 fn closest_keys<'a, T>(table: &'a mut KBucketsTable<kbucket::Key<PeerId>, Contact>, target: &'a T)
1629 -> impl Iterator<Item = WeightedPeer> + 'a
1630 where
1631 T: Clone + AsRef<KeyBytes>,
1632 {
1633 table.closest(target).map(|e| WeightedPeer {
1634 peer_id: e.node.key,
1635 weight: e.node.weight
1637 })
1638 }
1639
1640 fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1642 let cur_time = trust_graph::current_time();
1644 for cert in provider.certificates.iter() {
1645 self.trust.add(cert, cur_time).unwrap_or_else(|err| {
1646 log::warn!("unable to add certificate for peer {}: {}", provider.node_id, err);
1647 });
1648 }
1649
1650 if &provider.node_id != self.kbuckets.local_key().preimage() {
1651 let record = ProviderRecord {
1653 key,
1654 provider: provider.node_id,
1655 expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1656 addresses: provider.multiaddrs,
1657 };
1658 if let Err(e) = self.store.add_provider(record) {
1659 info!("Provider record not stored: {:?}", e);
1660 }
1661 }
1662 }
1663
1664 fn print_bucket_table(&mut self) {
1665 use log::trace as log;
1666 use log::Level::Trace as TargetLevel;
1667
1668 if log::max_level() < TargetLevel {
1669 return
1670 }
1671
1672 let mut size = 0;
1673 let buckets = self.kbuckets.iter().filter_map(|KBucketRef { index, bucket }| {
1674 use multiaddr::Protocol::{Ip4, Ip6, Tcp};
1675 let elems = bucket.iter().collect::<Vec<_>>();
1676 if elems.len() == 0 {
1677 return None
1678 } else {
1679 size += elems.len();
1680 }
1681
1682 let header = format!("Bucket {:?}, elements: {}", index.get(), elems.len());
1683 let elems = elems.into_iter().map(|(node, status)| {
1684 let status_s = match status {
1685 NodeStatus::Connected => "C",
1686 NodeStatus::Disconnected => "D"
1687 };
1688
1689 let address_s = node.value.addresses
1690 .iter()
1691 .next()
1692 .map(|ma|
1693 ma.iter().fold(String::new(), |acc, proto|
1694 match proto {
1695 Ip4(addr) => format!("{}", addr),
1696 Ip6(addr) => format!("{}", addr),
1697 Tcp(port) => format!("{}:{}", acc, port),
1698 _ => acc
1699 }
1700 )
1701 ).unwrap_or("NOADDR".to_string());
1702
1703 let address_plus = node.value.addresses
1704 .len()
1705 .checked_sub(1)
1706 .filter(|l| *l != 0)
1707 .map(|l| format!(" (+{})", l))
1708 .unwrap_or("".to_string());
1709
1710 let kademlia_key = bs58::encode(node.key.as_ref()).into_string();
1711 let len = kademlia_key.len();
1712 let kademlia_key = &kademlia_key[len - 10..];
1713
1714 let peer_id = node.key.preimage().to_string();
1715 let len = peer_id.len();
1716 let peer_id = &peer_id[len - 10..];
1717
1718 format!(
1719 "[bcktdbg]\t{} {} {} {}{} {}\n",
1720 status_s,
1721 node.weight,
1722 peer_id,
1723 address_s,
1724 address_plus,
1725 kademlia_key
1726 )
1727 }).collect::<String>();
1728
1729 Some(format!("[bcktdbg] {}\n{}\n", header, elems))
1730 }).collect::<String>();
1731
1732 self.metrics.report_routing_table_size(size);
1733
1734 if size == 0 {
1735 log!("[bcktdbg] Bucket table is empty.")
1736 } else {
1737 log!("\n{}", buckets);
1738 }
1739 }
1740
1741 fn get_certificates(&self, key: &PublicKey) -> Vec<Certificate> {
1742 get_certificates(&self.trust, key)
1743 }
1744
1745 fn get_weight(&self, key: &PublicKey) -> u32 {
1746 get_weight(&self.trust, key)
1747 }
1748}
1749
1750fn get_certificates(trust: &TrustGraph, key: &PublicKey) -> Vec<Certificate> {
1751 trust.get_all_certs(fluence_identity::PublicKey::from(key.clone()), &[]).unwrap_or_default()
1752}
1753
1754fn get_weight(trust: &TrustGraph, key: &PublicKey) -> u32 {
1755 trust.weight(fluence_identity::PublicKey::from(key.clone())).unwrap_or_default().unwrap_or_default()
1756}
1757
1758fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
1760 Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
1761}
1762
1763impl<TStore> NetworkBehaviour for Kademlia<TStore>
1764where
1765 for<'a> TStore: RecordStore<'a>,
1766 TStore: Send + 'static,
1767{
1768 type ProtocolsHandler = KademliaHandlerProto<QueryId>;
1769 type OutEvent = KademliaEvent;
1770
1771 fn new_handler(&mut self) -> Self::ProtocolsHandler {
1772 KademliaHandlerProto::new(KademliaHandlerConfig {
1773 protocol_config: self.protocol_config.clone(),
1774 allow_listening: true,
1775 idle_timeout: self.connection_idle_timeout,
1776 })
1777 }
1778
1779 fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
1780 let key = kbucket::Key::from(*peer_id);
1783 let mut peer_addrs =
1784 if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
1785 let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
1786 debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
1787 addrs
1788 } else {
1789 Vec::new()
1790 };
1791
1792 for query in self.queries.iter() {
1794 if let Some(addrs) = query.inner.contacts.get(peer_id) {
1795 peer_addrs.extend(addrs.iter().cloned())
1796 }
1797 }
1798
1799 peer_addrs
1800 }
1801
1802 fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {
1803 }
1808
1809 fn inject_connected(&mut self, peer: &PeerId) {
1810 for (peer_id, event) in self.queries.iter_mut().filter_map(|q|
1813 q.inner.pending_rpcs.iter()
1814 .position(|(p, _)| p == peer)
1815 .map(|p| q.inner.pending_rpcs.remove(p)))
1816 {
1817 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1818 peer_id, event, handler: NotifyHandler::Any
1819 });
1820 }
1821
1822 self.connected_peers.insert(*peer);
1823 self.metrics.node_connected();
1824 }
1825
1826 fn inject_address_change(
1827 &mut self,
1828 peer: &PeerId,
1829 _: &ConnectionId,
1830 old: &ConnectedPoint,
1831 new: &ConnectedPoint
1832 ) {
1833 let (old, new) = (old.get_remote_address(), new.get_remote_address());
1834
1835 if let Some(contact) = self.kbuckets.entry(&kbucket::Key::from(*peer)).value() {
1837 if contact.addresses.replace(old, new) {
1838 debug!("Address '{}' replaced with '{}' for peer '{}'.", old, new, peer);
1839 } else {
1840 debug!(
1841 "Address '{}' not replaced with '{}' for peer '{}' as old address wasn't \
1842 present.",
1843 old, new, peer,
1844 );
1845 }
1846 } else {
1847 debug!(
1848 "Address '{}' not replaced with '{}' for peer '{}' as peer is not present in the \
1849 routing table.",
1850 old, new, peer,
1851 );
1852 }
1853
1854 for query in self.queries.iter_mut() {
1869 if let Some(contact) = query.inner.contacts.get_mut(peer) {
1870 for addr in contact.addresses.iter_mut() {
1871 if addr == old {
1872 *addr = new.clone();
1873 }
1874 }
1875 }
1876 }
1877 }
1878
1879 fn inject_addr_reach_failure(
1880 &mut self,
1881 peer_id: Option<&PeerId>,
1882 addr: &Multiaddr,
1883 err: &dyn error::Error
1884 ) {
1885 if let Some(peer_id) = peer_id {
1886 let key = kbucket::Key::from(*peer_id);
1887
1888 if let Some(contact) = self.kbuckets.entry(&key).value() {
1889 if contact.addresses.remove(addr, Remove::KeepLast).is_ok() {
1895 debug!("Address '{}' removed from peer '{}' due to error: {}.",
1896 addr, peer_id, err);
1897 } else {
1898 debug!("Last remaining address '{}' of peer '{}' is unreachable: {}.",
1908 addr, peer_id, err)
1909 }
1910 }
1911
1912 for query in self.queries.iter_mut() {
1913 if let Some(contact) = query.inner.contacts.get_mut(peer_id) {
1914 contact.addresses.remove(addr, Remove::Completely).unwrap();
1916 }
1917 }
1918 }
1919 }
1920
1921 fn inject_dial_failure(&mut self, peer_id: &PeerId) {
1922 for query in self.queries.iter_mut() {
1923 query.on_failure(peer_id);
1924 }
1925 }
1926
1927 fn inject_disconnected(&mut self, id: &PeerId) {
1928 for query in self.queries.iter_mut() {
1929 query.on_failure(id);
1930 }
1931 self.connection_updated(*id, None, NodeStatus::Disconnected);
1932 self.connected_peers.remove(id);
1933 }
1934
1935 fn inject_event(
1936 &mut self,
1937 source: PeerId,
1938 connection: ConnectionId,
1939 event: KademliaHandlerEvent<QueryId>
1940 ) {
1941 self.metrics.received(&event);
1942 match event {
1943 KademliaHandlerEvent::ProtocolConfirmed { endpoint } => {
1944 debug_assert!(self.connected_peers.contains(&source));
1945 let new_address = match endpoint {
1950 ConnectedPoint::Dialer { address } => Some(address),
1951 ConnectedPoint::Listener { .. } => None,
1952 };
1953
1954 let contact = self.queries
1955 .iter_mut()
1956 .find_map(|q| q.inner.contacts.get(&source))
1957 .cloned()
1958 .and_then(|mut c|
1959 new_address.as_ref().map(|addr| {
1960 c.insert(addr.clone());
1961 c
1962 }));
1963
1964 let contact = contact.or({
1965 let pk = source.as_public_key();
1966 let address = new_address.map(Addresses::new);
1967 address.zip(pk).map(|(addr, pk)| Contact::new(addr, pk))
1968 });
1969
1970 self.connection_updated(source, contact, NodeStatus::Connected);
1971 }
1972
1973 KademliaHandlerEvent::FindNodeReq { key, request_id } => {
1974 self.print_bucket_table();
1975 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
1976 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1977 peer_id: source,
1978 handler: NotifyHandler::One(connection),
1979 event: KademliaHandlerIn::FindNodeRes {
1980 closer_peers,
1981 request_id,
1982 },
1983 });
1984 }
1985
1986 KademliaHandlerEvent::FindNodeRes {
1987 closer_peers,
1988 user_data,
1989 } => {
1990 self.discovered(&user_data, &source, closer_peers.iter());
1991 }
1992
1993 KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
1994 self.print_bucket_table();
1995 let provider_peers = self.provider_peers(&key, &source);
1996 debug!(
1997 "provider peers: {}",
1998 provider_peers.iter().map(|p| p.node_id.to_string() + ", ").collect::<String>()
1999 );
2000 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2001 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
2002 peer_id: source,
2003 handler: NotifyHandler::One(connection),
2004 event: KademliaHandlerIn::GetProvidersRes {
2005 closer_peers,
2006 provider_peers,
2007 request_id,
2008 },
2009 });
2010 }
2011
2012 KademliaHandlerEvent::GetProvidersRes {
2013 closer_peers,
2014 provider_peers,
2015 user_data,
2016 } => {
2017 let peers = closer_peers.iter().chain(provider_peers.iter());
2018 self.discovered(&user_data, &source, peers);
2019 if let Some(query) = self.queries.get_mut(&user_data) {
2020 if let QueryInfo::GetProviders {
2021 providers, ..
2022 } = &mut query.inner.info {
2023 for peer in provider_peers {
2024 providers.insert(peer.node_id);
2025 }
2026 }
2027 }
2028 }
2029
2030 KademliaHandlerEvent::QueryError { user_data, error } => {
2031 log::debug!("Request to {:?} in query {:?} failed with {:?}",
2032 source, user_data, error);
2033 if let Some(query) = self.queries.get_mut(&user_data) {
2036 query.on_failure(&source)
2037 }
2038 }
2039
2040 KademliaHandlerEvent::AddProvider { key, provider } => {
2041 if provider.node_id != source {
2043 return
2044 }
2045
2046 self.provider_received(key, provider)
2047 }
2048
2049 KademliaHandlerEvent::GetRecord { key, request_id } => {
2050 let record = match self.store.get(&key) {
2052 Some(record) => {
2053 if record.is_expired(Instant::now()) {
2054 self.store.remove(&key);
2055 self.metrics.record_removed();
2056 None
2057 } else {
2058 Some(record.into_owned())
2059 }
2060 },
2061 None => None
2062 };
2063
2064 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2065
2066 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
2067 peer_id: source,
2068 handler: NotifyHandler::One(connection),
2069 event: KademliaHandlerIn::GetRecordRes {
2070 record,
2071 closer_peers,
2072 request_id,
2073 },
2074 });
2075 }
2076
2077 KademliaHandlerEvent::GetRecordRes {
2078 record,
2079 closer_peers,
2080 user_data,
2081 } => {
2082 if let Some(query) = self.queries.get_mut(&user_data) {
2083 if let QueryInfo::GetRecord {
2084 key, records, quorum, cache_at
2085 } = &mut query.inner.info {
2086 if let Some(record) = record {
2087 records.push(PeerRecord{ peer: Some(source), record });
2088
2089 let quorum = quorum.get();
2090 if records.len() >= quorum {
2091 let peers = records.iter()
2094 .filter_map(|PeerRecord{ peer, .. }| peer.as_ref())
2095 .cloned()
2096 .collect::<Vec<_>>();
2097 let finished = query.try_finish(peers.iter());
2098 if !finished {
2099 debug!(
2100 "GetRecord query ({:?}) reached quorum ({}/{}) with \
2101 response from peer {} but could not yet finish.",
2102 user_data, peers.len(), quorum, source,
2103 );
2104 }
2105 }
2106 } else if quorum.get() == 1 {
2107 let source_key = kbucket::Key::from(source);
2112 if let Some(cache_key) = cache_at {
2113 let key = kbucket::Key::new(key.clone());
2114 if source_key.distance(&key) < cache_key.distance(&key) {
2115 *cache_at = Some(source_key)
2116 }
2117 } else {
2118 *cache_at = Some(source_key)
2119 }
2120 }
2121 }
2122 }
2123
2124 self.discovered(&user_data, &source, closer_peers.iter());
2125 }
2126
2127 KademliaHandlerEvent::PutRecord {
2128 record,
2129 request_id
2130 } => {
2131 self.record_received(source, connection, request_id, record);
2132 }
2133
2134 KademliaHandlerEvent::PutRecordRes {
2135 user_data, ..
2136 } => {
2137 if let Some(query) = self.queries.get_mut(&user_data) {
2138 query.on_success(&source, vec![]);
2139 if let QueryInfo::PutRecord {
2140 phase: PutRecordPhase::PutRecord { success, .. }, quorum, ..
2141 } = &mut query.inner.info {
2142 success.push(source);
2143
2144 let quorum = quorum.get();
2145 if success.len() >= quorum {
2146 let peers = success.clone();
2147 let finished = query.try_finish(peers.iter());
2148 if !finished {
2149 debug!(
2150 "PutRecord query ({:?}) reached quorum ({}/{}) with response \
2151 from peer {} but could not yet finish.",
2152 user_data, peers.len(), quorum, source,
2153 );
2154 }
2155 }
2156 }
2157 }
2158 }
2159 };
2160 }
2161
2162 fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
2163 self.local_addrs.insert(addr.clone());
2164 }
2165
2166 fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
2167 self.local_addrs.remove(addr);
2168 }
2169
2170 fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
2171 if self.local_addrs.len() < MAX_LOCAL_EXTERNAL_ADDRS {
2172 self.local_addrs.insert(addr.clone());
2173 }
2174 }
2175
2176 fn poll(&mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters) -> Poll<
2177 NetworkBehaviourAction<
2178 KademliaHandlerIn<QueryId>,
2179 Self::OutEvent,
2180 >,
2181 > {
2182 let now = Instant::now();
2183
2184 let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2186
2187 if let Some(mut job) = self.add_provider_job.take() {
2189 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2190 for _ in 0 .. num {
2191 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2192 self.start_add_provider(r.key, AddProviderContext::Republish)
2193 } else {
2194 break
2195 }
2196 }
2197 jobs_query_capacity -= num;
2198 self.add_provider_job = Some(job);
2199 }
2200
2201 if let Some(mut job) = self.put_record_job.take() {
2203 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2204 for _ in 0 .. num {
2205 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2206 let context = if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2207 PutRecordContext::Republish
2208 } else {
2209 PutRecordContext::Replicate
2210 };
2211 self.start_put_record(r, Quorum::All, context)
2212 } else {
2213 break
2214 }
2215 }
2216 self.put_record_job = Some(job);
2217 }
2218
2219 loop {
2220 if let Some(event) = self.queued_events.pop_front() {
2222 self.metrics.polled_event(&event);
2223 return Poll::Ready(event);
2224 }
2225
2226 if let Some(entry) = self.kbuckets.take_applied_pending() {
2228 self.print_bucket_table();
2229 let kbucket::Node { key, value, .. } = entry.inserted;
2230 let event = KademliaEvent::RoutingUpdated {
2231 peer: key.into_preimage(),
2232 addresses: value.into(),
2233 old_peer: entry.evicted.map(|n| n.key.into_preimage())
2234 };
2235 let event = NetworkBehaviourAction::GenerateEvent(event);
2236 self.metrics.polled_event(&event);
2237 return Poll::Ready(event);
2238 }
2239
2240 loop {
2242 match self.queries.poll(now) {
2243 QueryPoolState::Finished(mut q) => {
2244 q.finish();
2245 if let Some(event) = self.query_finished(q, parameters) {
2246 let event = NetworkBehaviourAction::GenerateEvent(event);
2247 self.metrics.polled_event(&event);
2248 return Poll::Ready(event);
2249 }
2250 }
2251 QueryPoolState::Timeout(mut q) => {
2252 q.finish();
2253 if let Some(event) = self.query_timeout(q) {
2254 let event = NetworkBehaviourAction::GenerateEvent(event);
2255 self.metrics.polled_event(&event);
2256 return Poll::Ready(event);
2257 }
2258 }
2259 QueryPoolState::Waiting(Some((query, peer_id))) => {
2260 let event = query.inner.info.to_request(query.id());
2261 if let QueryInfo::AddProvider {
2267 phase: AddProviderPhase::AddProvider { .. },
2268 ..
2269 } = &query.inner.info {
2270 query.on_success(&peer_id, vec![])
2271 }
2272 if self.connected_peers.contains(&peer_id) {
2273 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
2274 peer_id, event, handler: NotifyHandler::Any
2275 });
2276 } else if &peer_id != self.kbuckets.local_key().preimage() {
2277 query.inner.pending_rpcs.push((peer_id, event));
2278 self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
2279 peer_id, condition: DialPeerCondition::Disconnected
2280 });
2281 }
2282 }
2283 QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2284 }
2285 }
2286
2287 if self.queued_events.is_empty() {
2291 return Poll::Pending
2292 }
2293 }
2294 }
2295}
2296
2297#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2301pub enum Quorum {
2302 One,
2303 Majority,
2304 All,
2305 N(NonZeroUsize)
2306}
2307
2308impl Quorum {
2309 fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2311 match self {
2312 Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2313 Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2314 Quorum::All => total,
2315 Quorum::N(n) => NonZeroUsize::min(total, *n)
2316 }
2317 }
2318}
2319
2320#[derive(Debug, Clone, PartialEq, Eq)]
2323pub struct PeerRecord {
2324 pub peer: Option<PeerId>,
2327 pub record: Record,
2328}
2329
2330#[derive(Derivative)]
2337#[derivative(Debug)]
2338pub enum KademliaEvent {
2339 QueryResult {
2341 id: QueryId,
2343 result: QueryResult,
2345 stats: QueryStats
2347 },
2348
2349 RoutingUpdated {
2352 peer: PeerId,
2354 addresses: Addresses,
2356 old_peer: Option<PeerId>,
2359 },
2360
2361 UnroutablePeer {
2366 peer: PeerId
2367 },
2368
2369 RoutablePeer {
2381 peer: PeerId,
2382 address: Multiaddr,
2383 },
2384
2385 PendingRoutablePeer {
2397 peer: PeerId,
2398 address: Multiaddr,
2399 }
2400}
2401
2402#[derive(Debug)]
2404pub enum QueryResult {
2405 Bootstrap(BootstrapResult),
2407
2408 GetClosestPeers(GetClosestPeersResult),
2410
2411 GetProviders(GetProvidersResult),
2413
2414 StartProviding(AddProviderResult),
2416
2417 RepublishProvider(AddProviderResult),
2419
2420 GetRecord(GetRecordResult),
2422
2423 PutRecord(PutRecordResult),
2425
2426 RepublishRecord(PutRecordResult),
2428}
2429
2430pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2432
2433#[derive(Debug, Clone)]
2435pub struct GetRecordOk {
2436 pub records: Vec<PeerRecord>
2437}
2438
2439#[derive(Debug, Clone)]
2441pub enum GetRecordError {
2442 NotFound {
2443 key: record::Key,
2444 closest_peers: Vec<PeerId>
2445 },
2446 QuorumFailed {
2447 key: record::Key,
2448 records: Vec<PeerRecord>,
2449 quorum: NonZeroUsize
2450 },
2451 Timeout {
2452 key: record::Key,
2453 records: Vec<PeerRecord>,
2454 quorum: NonZeroUsize
2455 }
2456}
2457
2458impl GetRecordError {
2459 pub fn key(&self) -> &record::Key {
2461 match self {
2462 GetRecordError::QuorumFailed { key, .. } => key,
2463 GetRecordError::Timeout { key, .. } => key,
2464 GetRecordError::NotFound { key, .. } => key,
2465 }
2466 }
2467
2468 pub fn into_key(self) -> record::Key {
2471 match self {
2472 GetRecordError::QuorumFailed { key, .. } => key,
2473 GetRecordError::Timeout { key, .. } => key,
2474 GetRecordError::NotFound { key, .. } => key,
2475 }
2476 }
2477}
2478
2479pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2481
2482#[derive(Debug, Clone)]
2484pub struct PutRecordOk {
2485 pub key: record::Key
2486}
2487
2488#[derive(Debug)]
2490pub enum PutRecordError {
2491 QuorumFailed {
2492 key: record::Key,
2493 success: Vec<PeerId>,
2495 quorum: NonZeroUsize
2496 },
2497 Timeout {
2498 key: record::Key,
2499 success: Vec<PeerId>,
2501 quorum: NonZeroUsize
2502 },
2503}
2504
2505impl PutRecordError {
2506 pub fn key(&self) -> &record::Key {
2508 match self {
2509 PutRecordError::QuorumFailed { key, .. } => key,
2510 PutRecordError::Timeout { key, .. } => key,
2511 }
2512 }
2513
2514 pub fn into_key(self) -> record::Key {
2517 match self {
2518 PutRecordError::QuorumFailed { key, .. } => key,
2519 PutRecordError::Timeout { key, .. } => key,
2520 }
2521 }
2522}
2523
2524pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
2526
2527#[derive(Debug, Clone)]
2529pub struct BootstrapOk {
2530 pub peer: PeerId,
2531 pub num_remaining: u32,
2532}
2533
2534#[derive(Debug, Clone)]
2536pub enum BootstrapError {
2537 Timeout {
2538 peer: PeerId,
2539 num_remaining: Option<u32>,
2540 }
2541}
2542
2543pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
2545
2546#[derive(Debug, Clone)]
2548pub struct GetClosestPeersOk {
2549 pub key: Vec<u8>,
2550 pub peers: Vec<PeerId>
2551}
2552
2553#[derive(Debug, Clone)]
2555pub enum GetClosestPeersError {
2556 Timeout {
2557 key: Vec<u8>,
2558 peers: Vec<PeerId>
2559 }
2560}
2561
2562impl GetClosestPeersError {
2563 pub fn key(&self) -> &Vec<u8> {
2565 match self {
2566 GetClosestPeersError::Timeout { key, .. } => key,
2567 }
2568 }
2569
2570 pub fn into_key(self) -> Vec<u8> {
2573 match self {
2574 GetClosestPeersError::Timeout { key, .. } => key,
2575 }
2576 }
2577}
2578
2579pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
2581
2582#[derive(Debug, Clone)]
2584pub struct GetProvidersOk {
2585 pub key: record::Key,
2586 pub providers: HashSet<PeerId>,
2587 pub closest_peers: Vec<PeerId>
2588}
2589
2590#[derive(Debug, Clone)]
2592pub enum GetProvidersError {
2593 Timeout {
2594 key: record::Key,
2595 providers: HashSet<PeerId>,
2596 closest_peers: Vec<PeerId>
2597 }
2598}
2599
2600impl GetProvidersError {
2601 pub fn key(&self) -> &record::Key {
2603 match self {
2604 GetProvidersError::Timeout { key, .. } => key,
2605 }
2606 }
2607
2608 pub fn into_key(self) -> record::Key {
2611 match self {
2612 GetProvidersError::Timeout { key, .. } => key,
2613 }
2614 }
2615}
2616
2617pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
2619
2620#[derive(Debug, Clone)]
2622pub struct AddProviderOk {
2623 pub key: record::Key,
2624}
2625
2626#[derive(Debug)]
2628pub enum AddProviderError {
2629 Timeout {
2631 key: record::Key,
2632 },
2633}
2634
2635impl AddProviderError {
2636 pub fn key(&self) -> &record::Key {
2638 match self {
2639 AddProviderError::Timeout { key, .. } => key,
2640 }
2641 }
2642
2643 pub fn into_key(self) -> record::Key {
2645 match self {
2646 AddProviderError::Timeout { key, .. } => key,
2647 }
2648 }
2649}
2650
2651impl From<kbucket::EntryView<kbucket::Key<PeerId>, Contact>> for KadPeer {
2652 fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Contact>) -> KadPeer {
2653 let Contact { addresses, public_key } = e.node.value;
2654 KadPeer {
2655 public_key,
2656 node_id: e.node.key.into_preimage(),
2657 multiaddrs: addresses.into_vec(),
2658 connection_ty: match e.status {
2659 NodeStatus::Connected => KadConnectionType::Connected,
2660 NodeStatus::Disconnected => KadConnectionType::NotConnected
2661 },
2662 certificates: vec![]
2663 }
2664 }
2665}
2666
2667struct QueryInner {
2671 info: QueryInfo,
2673 contacts: FnvHashMap<PeerId, Contact>,
2675 pending_rpcs: SmallVec<[(PeerId, KademliaHandlerIn<QueryId>); K_VALUE.get()]>
2680}
2681
2682impl QueryInner {
2683 fn new(info: QueryInfo) -> Self {
2684 QueryInner {
2685 info,
2686 contacts: Default::default(),
2687 pending_rpcs: SmallVec::default()
2688 }
2689 }
2690}
2691
2692#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2694pub enum AddProviderContext {
2695 Publish,
2696 Republish,
2697}
2698
2699#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2701pub enum PutRecordContext {
2702 Publish,
2703 Republish,
2704 Replicate,
2705 Cache,
2706}
2707
2708#[derive(Debug, Clone)]
2710pub enum QueryInfo {
2711 Bootstrap {
2713 peer: PeerId,
2715 remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>
2722 },
2723
2724 GetClosestPeers { key: Vec<u8> },
2726
2727 GetProviders {
2729 key: record::Key,
2731 providers: HashSet<PeerId>,
2733 },
2734
2735 AddProvider {
2737 key: record::Key,
2739 phase: AddProviderPhase,
2741 context: AddProviderContext,
2743 provider_key: PublicKey,
2745 certificates: Vec<Certificate>,
2747 },
2748
2749 PutRecord {
2751 record: Record,
2752 quorum: NonZeroUsize,
2754 phase: PutRecordPhase,
2756 context: PutRecordContext,
2758 },
2759
2760 GetRecord {
2762 key: record::Key,
2764 records: Vec<PeerRecord>,
2767 quorum: NonZeroUsize,
2769 cache_at: Option<kbucket::Key<PeerId>>,
2774 },
2775}
2776
2777impl QueryInfo {
2778 fn to_request(&self, query_id: QueryId) -> KademliaHandlerIn<QueryId> {
2781 match &self {
2782 QueryInfo::Bootstrap { peer, .. } => KademliaHandlerIn::FindNodeReq {
2783 key: peer.to_bytes(),
2784 user_data: query_id,
2785 },
2786 QueryInfo::GetClosestPeers { key, .. } => KademliaHandlerIn::FindNodeReq {
2787 key: key.clone(),
2788 user_data: query_id,
2789 },
2790 QueryInfo::GetProviders { key, .. } => KademliaHandlerIn::GetProvidersReq {
2791 key: key.clone(),
2792 user_data: query_id,
2793 },
2794 QueryInfo::AddProvider { key, phase, provider_key, certificates, .. } => match phase {
2795 AddProviderPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq {
2796 key: key.to_vec(),
2797 user_data: query_id,
2798 },
2799 AddProviderPhase::AddProvider {
2800 provider_id,
2801 external_addresses,
2802 ..
2803 } => {
2804 KademliaHandlerIn::AddProvider {
2805 key: key.clone(),
2806 provider: crate::protocol::KadPeer {
2807 public_key: provider_key.clone(),
2808 node_id: *provider_id,
2809 multiaddrs: external_addresses.clone(),
2810 connection_ty: crate::protocol::KadConnectionType::Connected,
2811 certificates: certificates.clone(),
2812 }
2813 }
2814 }
2815 },
2816 QueryInfo::GetRecord { key, .. } => KademliaHandlerIn::GetRecord {
2817 key: key.clone(),
2818 user_data: query_id,
2819 },
2820 QueryInfo::PutRecord { record, phase, .. } => match phase {
2821 PutRecordPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq {
2822 key: record.key.to_vec(),
2823 user_data: query_id,
2824 },
2825 PutRecordPhase::PutRecord { .. } => KademliaHandlerIn::PutRecord {
2826 record: record.clone(),
2827 user_data: query_id
2828 }
2829 }
2830 }
2831 }
2832}
2833
2834#[derive(Debug, Clone)]
2836pub enum AddProviderPhase {
2837 GetClosestPeers,
2839
2840 AddProvider {
2843 provider_id: PeerId,
2845 external_addresses: Vec<Multiaddr>,
2847 get_closest_peers_stats: QueryStats,
2849 },
2850}
2851
2852#[derive(Debug, Clone, PartialEq, Eq)]
2854pub enum PutRecordPhase {
2855 GetClosestPeers,
2857
2858 PutRecord {
2860 success: Vec<PeerId>,
2862 get_closest_peers_stats: QueryStats,
2864 },
2865}
2866
2867pub struct QueryMut<'a> {
2869 query: &'a mut Query<QueryInner>,
2870}
2871
2872impl<'a> QueryMut<'a> {
2873 pub fn id(&self) -> QueryId {
2874 self.query.id()
2875 }
2876
2877 pub fn info(&self) -> &QueryInfo {
2879 &self.query.inner.info
2880 }
2881
2882 pub fn stats(&self) -> &QueryStats {
2887 self.query.stats()
2888 }
2889
2890 pub fn finish(&mut self) {
2893 self.query.finish()
2894 }
2895}
2896
2897pub struct QueryRef<'a> {
2899 query: &'a Query<QueryInner>,
2900}
2901
2902impl<'a> QueryRef<'a> {
2903 pub fn id(&self) -> QueryId {
2904 self.query.id()
2905 }
2906
2907 pub fn info(&self) -> &QueryInfo {
2909 &self.query.inner.info
2910 }
2911
2912 pub fn stats(&self) -> &QueryStats {
2917 self.query.stats()
2918 }
2919}
2920
2921#[derive(Debug, Clone)]
2923pub struct NoKnownPeers();
2924
2925impl fmt::Display for NoKnownPeers {
2926 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2927 write!(f, "No known peers.")
2928 }
2929}
2930
2931impl std::error::Error for NoKnownPeers {}
2932
2933pub enum RoutingUpdate {
2935 Success,
2938 Pending,
2944 Failed,
2950}
2951
2952const MAX_LOCAL_EXTERNAL_ADDRS: usize = 20;