cs_mwc_libp2p_kad/behaviour.rs
1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use crate::K_VALUE;
26use crate::addresses::Addresses;
27use crate::handler::{
28 KademliaHandlerProto,
29 KademliaHandlerConfig,
30 KademliaRequestId,
31 KademliaHandlerEvent,
32 KademliaHandlerIn
33};
34use crate::jobs::*;
35use crate::kbucket::{self, KBucketsTable, NodeStatus};
36use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
37use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
38use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
39use fnv::{FnvHashMap, FnvHashSet};
40use mwc_libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId};
41use mwc_libp2p_swarm::{
42 DialPeerCondition,
43 NetworkBehaviour,
44 NetworkBehaviourAction,
45 NotifyHandler,
46 PollParameters,
47};
48use log::{info, debug, warn};
49use smallvec::SmallVec;
50use std::{borrow::Cow, error, iter, time::Duration};
51use std::collections::{HashSet, VecDeque};
52use std::fmt;
53use std::num::NonZeroUsize;
54use std::task::{Context, Poll};
55use std::vec;
56use wasm_timer::Instant;
57
58pub use crate::query::QueryStats;
59
60/// `Kademlia` is a `NetworkBehaviour` that implements the libp2p
61/// Kademlia protocol.
62pub struct Kademlia<TStore> {
63 /// The Kademlia routing table.
64 kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
65
66 /// The k-bucket insertion strategy.
67 kbucket_inserts: KademliaBucketInserts,
68
69 /// Configuration of the wire protocol.
70 protocol_config: KademliaProtocolConfig,
71
72 /// The currently active (i.e. in-progress) queries.
73 queries: QueryPool<QueryInner>,
74
75 /// The currently connected peers.
76 ///
77 /// This is a superset of the connected peers currently in the routing table.
78 connected_peers: FnvHashSet<PeerId>,
79
80 /// Periodic job for re-publication of provider records for keys
81 /// provided by the local node.
82 add_provider_job: Option<AddProviderJob>,
83
84 /// Periodic job for (re-)replication and (re-)publishing of
85 /// regular (value-)records.
86 put_record_job: Option<PutRecordJob>,
87
88 /// The TTL of regular (value-)records.
89 record_ttl: Option<Duration>,
90
91 /// The TTL of provider records.
92 provider_record_ttl: Option<Duration>,
93
94 /// How long to keep connections alive when they're idle.
95 connection_idle_timeout: Duration,
96
97 /// Queued events to return when the behaviour is being polled.
98 queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>,
99
100 /// The currently known addresses of the local node.
101 local_addrs: HashSet<Multiaddr>,
102
103 /// The record storage.
104 store: TStore,
105}
106
107/// The configurable strategies for the insertion of peers
108/// and their addresses into the k-buckets of the Kademlia
109/// routing table.
110#[derive(Copy, Clone, Debug, PartialEq, Eq)]
111pub enum KademliaBucketInserts {
112 /// Whenever a connection to a peer is established as a
113 /// result of a dialing attempt and that peer is not yet
114 /// in the routing table, it is inserted as long as there
115 /// is a free slot in the corresponding k-bucket. If the
116 /// k-bucket is full but still has a free pending slot,
117 /// it may be inserted into the routing table at a later time if an unresponsive
118 /// disconnected peer is evicted from the bucket.
119 OnConnected,
120 /// New peers and addresses are only added to the routing table via
121 /// explicit calls to [`Kademlia::add_address`].
122 ///
123 /// > **Note**: Even though peers can only get into the
124 /// > routing table as a result of [`Kademlia::add_address`],
125 /// > routing table entries are still updated as peers
126 /// > connect and disconnect (i.e. the order of the entries
127 /// > as well as the network addresses).
128 Manual,
129}
130
131/// The configuration for the `Kademlia` behaviour.
132///
133/// The configuration is consumed by [`Kademlia::new`].
134#[derive(Debug, Clone)]
135pub struct KademliaConfig {
136 kbucket_pending_timeout: Duration,
137 query_config: QueryConfig,
138 protocol_config: KademliaProtocolConfig,
139 record_ttl: Option<Duration>,
140 record_replication_interval: Option<Duration>,
141 record_publication_interval: Option<Duration>,
142 provider_record_ttl: Option<Duration>,
143 provider_publication_interval: Option<Duration>,
144 connection_idle_timeout: Duration,
145 kbucket_inserts: KademliaBucketInserts,
146}
147
148impl Default for KademliaConfig {
149 fn default() -> Self {
150 KademliaConfig {
151 kbucket_pending_timeout: Duration::from_secs(60),
152 query_config: QueryConfig::default(),
153 protocol_config: Default::default(),
154 record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
155 record_replication_interval: Some(Duration::from_secs(60 * 60)),
156 record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
157 provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
158 provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
159 connection_idle_timeout: Duration::from_secs(10),
160 kbucket_inserts: KademliaBucketInserts::OnConnected,
161 }
162 }
163}
164
165impl KademliaConfig {
166 /// Sets a custom protocol name.
167 ///
168 /// Kademlia nodes only communicate with other nodes using the same protocol
169 /// name. Using a custom name therefore allows to segregate the DHT from
170 /// others, if that is desired.
171 pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
172 self.protocol_config.set_protocol_name(name);
173 self
174 }
175
176 /// Sets the timeout for a single query.
177 ///
178 /// > **Note**: A single query usually comprises at least as many requests
179 /// > as the replication factor, i.e. this is not a request timeout.
180 ///
181 /// The default is 60 seconds.
182 pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
183 self.query_config.timeout = timeout;
184 self
185 }
186
187 /// Sets the replication factor to use.
188 ///
189 /// The replication factor determines to how many closest peers
190 /// a record is replicated. The default is [`K_VALUE`].
191 pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
192 self.query_config.replication_factor = replication_factor;
193 self
194 }
195
196 /// Sets the allowed level of parallelism for iterative queries.
197 ///
198 /// The `α` parameter in the Kademlia paper. The maximum number of peers
199 /// that an iterative query is allowed to wait for in parallel while
200 /// iterating towards the closest nodes to a target. Defaults to
201 /// `ALPHA_VALUE`.
202 ///
203 /// This only controls the level of parallelism of an iterative query, not
204 /// the level of parallelism of a query to a fixed set of peers.
205 ///
206 /// When used with [`KademliaConfig::disjoint_query_paths`] it equals
207 /// the amount of disjoint paths used.
208 pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
209 self.query_config.parallelism = parallelism;
210 self
211 }
212
213 /// Require iterative queries to use disjoint paths for increased resiliency
214 /// in the presence of potentially adversarial nodes.
215 ///
216 /// When enabled the number of disjoint paths used equals the configured
217 /// parallelism.
218 ///
219 /// See the S/Kademlia paper for more information on the high level design
220 /// as well as its security improvements.
221 pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
222 self.query_config.disjoint_query_paths = enabled;
223 self
224 }
225
226 /// Sets the TTL for stored records.
227 ///
228 /// The TTL should be significantly longer than the (re-)publication
229 /// interval, to avoid premature expiration of records. The default is 36
230 /// hours.
231 ///
232 /// `None` means records never expire.
233 ///
234 /// Does not apply to provider records.
235 pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
236 self.record_ttl = record_ttl;
237 self
238 }
239
240 /// Sets the (re-)replication interval for stored records.
241 ///
242 /// Periodic replication of stored records ensures that the records
243 /// are always replicated to the available nodes closest to the key in the
244 /// context of DHT topology changes (i.e. nodes joining and leaving), thus
245 /// ensuring persistence until the record expires. Replication does not
246 /// prolong the regular lifetime of a record (for otherwise it would live
247 /// forever regardless of the configured TTL). The expiry of a record
248 /// is only extended through re-publication.
249 ///
250 /// This interval should be significantly shorter than the publication
251 /// interval, to ensure persistence between re-publications. The default
252 /// is 1 hour.
253 ///
254 /// `None` means that stored records are never re-replicated.
255 ///
256 /// Does not apply to provider records.
257 pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
258 self.record_replication_interval = interval;
259 self
260 }
261
262 /// Sets the (re-)publication interval of stored records.
263 ///
264 /// Records persist in the DHT until they expire. By default, published
265 /// records are re-published in regular intervals for as long as the record
266 /// exists in the local storage of the original publisher, thereby extending
267 /// the records lifetime.
268 ///
269 /// This interval should be significantly shorter than the record TTL, to
270 /// ensure records do not expire prematurely. The default is 24 hours.
271 ///
272 /// `None` means that stored records are never automatically re-published.
273 ///
274 /// Does not apply to provider records.
275 pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
276 self.record_publication_interval = interval;
277 self
278 }
279
280 /// Sets the TTL for provider records.
281 ///
282 /// `None` means that stored provider records never expire.
283 ///
284 /// Must be significantly larger than the provider publication interval.
285 pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
286 self.provider_record_ttl = ttl;
287 self
288 }
289
290 /// Sets the interval at which provider records for keys provided
291 /// by the local node are re-published.
292 ///
293 /// `None` means that stored provider records are never automatically
294 /// re-published.
295 ///
296 /// Must be significantly less than the provider record TTL.
297 pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
298 self.provider_publication_interval = interval;
299 self
300 }
301
302 /// Sets the amount of time to keep connections alive when they're idle.
303 pub fn set_connection_idle_timeout(&mut self, duration: Duration) -> &mut Self {
304 self.connection_idle_timeout = duration;
305 self
306 }
307
308 /// Modifies the maximum allowed size of individual Kademlia packets.
309 ///
310 /// It might be necessary to increase this value if trying to put large
311 /// records.
312 pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
313 self.protocol_config.set_max_packet_size(size);
314 self
315 }
316
317 /// Sets the k-bucket insertion strategy for the Kademlia routing table.
318 pub fn set_kbucket_inserts(&mut self, inserts: KademliaBucketInserts) -> &mut Self {
319 self.kbucket_inserts = inserts;
320 self
321 }
322}
323
324impl<TStore> Kademlia<TStore>
325where
326 for<'a> TStore: RecordStore<'a>
327{
328 /// Creates a new `Kademlia` network behaviour with a default configuration.
329 pub fn new(id: PeerId, store: TStore) -> Self {
330 Self::with_config(id, store, Default::default())
331 }
332
333 /// Get the protocol name of this kademlia instance.
334 pub fn protocol_name(&self) -> &[u8] {
335 self.protocol_config.protocol_name()
336 }
337
338 /// Creates a new `Kademlia` network behaviour with the given configuration.
339 pub fn with_config(id: PeerId, store: TStore, config: KademliaConfig) -> Self {
340 let local_key = kbucket::Key::from(id);
341
342 let put_record_job = config
343 .record_replication_interval
344 .or(config.record_publication_interval)
345 .map(|interval| PutRecordJob::new(
346 id,
347 interval,
348 config.record_publication_interval,
349 config.record_ttl,
350 ));
351
352 let add_provider_job = config
353 .provider_publication_interval
354 .map(AddProviderJob::new);
355
356 Kademlia {
357 store,
358 kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
359 kbucket_inserts: config.kbucket_inserts,
360 protocol_config: config.protocol_config,
361 queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
362 queries: QueryPool::new(config.query_config),
363 connected_peers: Default::default(),
364 add_provider_job,
365 put_record_job,
366 record_ttl: config.record_ttl,
367 provider_record_ttl: config.provider_record_ttl,
368 connection_idle_timeout: config.connection_idle_timeout,
369 local_addrs: HashSet::new()
370 }
371 }
372
373 /// Gets an iterator over immutable references to all running queries.
374 pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
375 self.queries.iter().filter_map(|query|
376 if !query.is_finished() {
377 Some(QueryRef { query })
378 } else {
379 None
380 })
381 }
382
383 /// Gets an iterator over mutable references to all running queries.
384 pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
385 self.queries.iter_mut().filter_map(|query|
386 if !query.is_finished() {
387 Some(QueryMut { query })
388 } else {
389 None
390 })
391 }
392
393 /// Gets an immutable reference to a running query, if it exists.
394 pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
395 self.queries.get(id).and_then(|query|
396 if !query.is_finished() {
397 Some(QueryRef { query })
398 } else {
399 None
400 })
401 }
402
403 /// Gets a mutable reference to a running query, if it exists.
404 pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
405 self.queries.get_mut(id).and_then(|query|
406 if !query.is_finished() {
407 Some(QueryMut { query })
408 } else {
409 None
410 })
411 }
412
413 /// Adds a known listen address of a peer participating in the DHT to the
414 /// routing table.
415 ///
416 /// Explicitly adding addresses of peers serves two purposes:
417 ///
418 /// 1. In order for a node to join the DHT, it must know about at least
419 /// one other node of the DHT.
420 ///
421 /// 2. When a remote peer initiates a connection and that peer is not
422 /// yet in the routing table, the `Kademlia` behaviour must be
423 /// informed of an address on which that peer is listening for
424 /// connections before it can be added to the routing table
425 /// from where it can subsequently be discovered by all peers
426 /// in the DHT.
427 ///
428 /// If the routing table has been updated as a result of this operation,
429 /// a [`KademliaEvent::RoutingUpdated`] event is emitted.
430 pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
431 let key = kbucket::Key::from(*peer);
432 match self.kbuckets.entry(&key) {
433 kbucket::Entry::Present(mut entry, _) => {
434 if entry.value().insert(address) {
435 self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
436 KademliaEvent::RoutingUpdated {
437 peer: *peer,
438 addresses: entry.value().clone(),
439 old_peer: None,
440 }
441 ))
442 }
443 RoutingUpdate::Success
444 }
445 kbucket::Entry::Pending(mut entry, _) => {
446 entry.value().insert(address);
447 RoutingUpdate::Pending
448 }
449 kbucket::Entry::Absent(entry) => {
450 let addresses = Addresses::new(address);
451 let status =
452 if self.connected_peers.contains(peer) {
453 NodeStatus::Connected
454 } else {
455 NodeStatus::Disconnected
456 };
457 match entry.insert(addresses.clone(), status) {
458 kbucket::InsertResult::Inserted => {
459 self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
460 KademliaEvent::RoutingUpdated {
461 peer: *peer,
462 addresses,
463 old_peer: None,
464 }
465 ));
466 RoutingUpdate::Success
467 },
468 kbucket::InsertResult::Full => {
469 debug!("Bucket full. Peer not added to routing table: {}", peer);
470 RoutingUpdate::Failed
471 },
472 kbucket::InsertResult::Pending { disconnected } => {
473 self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
474 peer_id: disconnected.into_preimage(),
475 condition: DialPeerCondition::Disconnected
476 });
477 RoutingUpdate::Pending
478 },
479 }
480 },
481 kbucket::Entry::SelfEntry => RoutingUpdate::Failed,
482 }
483 }
484
485 /// Removes an address of a peer from the routing table.
486 ///
487 /// If the given address is the last address of the peer in the
488 /// routing table, the peer is removed from the routing table
489 /// and `Some` is returned with a view of the removed entry.
490 /// The same applies if the peer is currently pending insertion
491 /// into the routing table.
492 ///
493 /// If the given peer or address is not in the routing table,
494 /// this is a no-op.
495 pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr)
496 -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>>
497 {
498 let key = kbucket::Key::from(*peer);
499 match self.kbuckets.entry(&key) {
500 kbucket::Entry::Present(mut entry, _) => {
501 if entry.value().remove(address).is_err() {
502 Some(entry.remove()) // it is the last address, thus remove the peer.
503 } else {
504 None
505 }
506 }
507 kbucket::Entry::Pending(mut entry, _) => {
508 if entry.value().remove(address).is_err() {
509 Some(entry.remove()) // it is the last address, thus remove the peer.
510 } else {
511 None
512 }
513 }
514 kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
515 None
516 }
517 }
518 }
519
520 /// Removes a peer from the routing table.
521 ///
522 /// Returns `None` if the peer was not in the routing table,
523 /// not even pending insertion.
524 pub fn remove_peer(&mut self, peer: &PeerId)
525 -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>>
526 {
527 let key = kbucket::Key::from(*peer);
528 match self.kbuckets.entry(&key) {
529 kbucket::Entry::Present(entry, _) => {
530 Some(entry.remove())
531 }
532 kbucket::Entry::Pending(entry, _) => {
533 Some(entry.remove())
534 }
535 kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
536 None
537 }
538 }
539 }
540
541 /// Returns an iterator over all non-empty buckets in the routing table.
542 pub fn kbuckets(&mut self)
543 -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
544 {
545 self.kbuckets.iter().filter(|b| !b.is_empty())
546 }
547
548 /// Returns the k-bucket for the distance to the given key.
549 ///
550 /// Returns `None` if the given key refers to the local key.
551 pub fn kbucket<K>(&mut self, key: K)
552 -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
553 where
554 K: Into<kbucket::Key<K>> + Clone
555 {
556 self.kbuckets.bucket(&key.into())
557 }
558
559 /// Initiates an iterative query for the closest peers to the given key.
560 ///
561 /// The result of the query is delivered in a
562 /// [`KademliaEvent::QueryResult{QueryResult::GetClosestPeers}`].
563 pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
564 where
565 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone
566 {
567 let info = QueryInfo::GetClosestPeers { key: key.clone().into() };
568 let target: kbucket::Key<K> = key.into();
569 let peers = self.kbuckets.closest_keys(&target);
570 let inner = QueryInner::new(info);
571 self.queries.add_iter_closest(target.clone(), peers, inner)
572 }
573
574 /// Performs a lookup for a record in the DHT.
575 ///
576 /// The result of this operation is delivered in a
577 /// [`KademliaEvent::QueryResult{QueryResult::GetRecord}`].
578 pub fn get_record(&mut self, key: &record::Key, quorum: Quorum) -> QueryId {
579 let quorum = quorum.eval(self.queries.config().replication_factor);
580 let mut records = Vec::with_capacity(quorum.get());
581
582 if let Some(record) = self.store.get(key) {
583 if record.is_expired(Instant::now()) {
584 self.store.remove(key)
585 } else {
586 records.push(PeerRecord{ peer: None, record: record.into_owned()});
587 }
588 }
589
590 let done = records.len() >= quorum.get();
591 let target = kbucket::Key::new(key.clone());
592 let info = QueryInfo::GetRecord { key: key.clone(), records, quorum, cache_at: None };
593 let peers = self.kbuckets.closest_keys(&target);
594 let inner = QueryInner::new(info);
595 let id = self.queries.add_iter_closest(target.clone(), peers, inner); // (*)
596
597 // Instantly finish the query if we already have enough records.
598 if done {
599 self.queries.get_mut(&id).expect("by (*)").finish();
600 }
601
602 id
603 }
604
605 /// Stores a record in the DHT.
606 ///
607 /// Returns `Ok` if a record has been stored locally, providing the
608 /// `QueryId` of the initial query that replicates the record in the DHT.
609 /// The result of the query is eventually reported as a
610 /// [`KademliaEvent::QueryResult{QueryResult::PutRecord}`].
611 ///
612 /// The record is always stored locally with the given expiration. If the record's
613 /// expiration is `None`, the common case, it does not expire in local storage
614 /// but is still replicated with the configured record TTL. To remove the record
615 /// locally and stop it from being re-published in the DHT, see [`Kademlia::remove_record`].
616 ///
617 /// After the initial publication of the record, it is subject to (re-)replication
618 /// and (re-)publication as per the configured intervals. Periodic (re-)publication
619 /// does not update the record's expiration in local storage, thus a given record
620 /// with an explicit expiration will always expire at that instant and until then
621 /// is subject to regular (re-)replication and (re-)publication.
622 pub fn put_record(&mut self, mut record: Record, quorum: Quorum) -> Result<QueryId, store::Error> {
623 record.publisher = Some(*self.kbuckets.local_key().preimage());
624 self.store.put(record.clone())?;
625 record.expires = record.expires.or_else(||
626 self.record_ttl.map(|ttl| Instant::now() + ttl));
627 let quorum = quorum.eval(self.queries.config().replication_factor);
628 let target = kbucket::Key::new(record.key.clone());
629 let peers = self.kbuckets.closest_keys(&target);
630 let context = PutRecordContext::Publish;
631 let info = QueryInfo::PutRecord {
632 context,
633 record,
634 quorum,
635 phase: PutRecordPhase::GetClosestPeers
636 };
637 let inner = QueryInner::new(info);
638 Ok(self.queries.add_iter_closest(target.clone(), peers, inner))
639 }
640
641 /// Removes the record with the given key from _local_ storage,
642 /// if the local node is the publisher of the record.
643 ///
644 /// Has no effect if a record for the given key is stored locally but
645 /// the local node is not a publisher of the record.
646 ///
647 /// This is a _local_ operation. However, it also has the effect that
648 /// the record will no longer be periodically re-published, allowing the
649 /// record to eventually expire throughout the DHT.
650 pub fn remove_record(&mut self, key: &record::Key) {
651 if let Some(r) = self.store.get(key) {
652 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
653 self.store.remove(key)
654 }
655 }
656 }
657
658 /// Gets a mutable reference to the record store.
659 pub fn store_mut(&mut self) -> &mut TStore {
660 &mut self.store
661 }
662
663 /// Bootstraps the local node to join the DHT.
664 ///
665 /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
666 /// own ID in the DHT. This introduces the local node to the other nodes
667 /// in the DHT and populates its routing table with the closest neighbours.
668 ///
669 /// Subsequently, all buckets farther from the bucket of the closest neighbour are
670 /// refreshed by initiating an additional bootstrapping query for each such
671 /// bucket with random keys.
672 ///
673 /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
674 /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
675 /// reported via [`KademliaEvent::QueryResult{QueryResult::Bootstrap}`] events,
676 /// with one such event per bootstrapping query.
677 ///
678 /// Returns `Err` if bootstrapping is impossible due an empty routing table.
679 ///
680 /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
681 /// > See [`Kademlia::add_address`].
682 pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
683 let local_key = self.kbuckets.local_key().clone();
684 let info = QueryInfo::Bootstrap {
685 peer: *local_key.preimage(),
686 remaining: None
687 };
688 let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
689 if peers.is_empty() {
690 Err(NoKnownPeers())
691 } else {
692 let inner = QueryInner::new(info);
693 Ok(self.queries.add_iter_closest(local_key, peers, inner))
694 }
695 }
696
697 /// Establishes the local node as a provider of a value for the given key.
698 ///
699 /// This operation publishes a provider record with the given key and
700 /// identity of the local node to the peers closest to the key, thus establishing
701 /// the local node as a provider.
702 ///
703 /// Returns `Ok` if a provider record has been stored locally, providing the
704 /// `QueryId` of the initial query that announces the local node as a provider.
705 ///
706 /// The publication of the provider records is periodically repeated as per the
707 /// configured interval, to renew the expiry and account for changes to the DHT
708 /// topology. A provider record may be removed from local storage and
709 /// thus no longer re-published by calling [`Kademlia::stop_providing`].
710 ///
711 /// In contrast to the standard Kademlia push-based model for content distribution
712 /// implemented by [`Kademlia::put_record`], the provider API implements a
713 /// pull-based model that may be used in addition or as an alternative.
714 /// The means by which the actual value is obtained from a provider is out of scope
715 /// of the libp2p Kademlia provider API.
716 ///
717 /// The results of the (repeated) provider announcements sent by this node are
718 /// reported via [`KademliaEvent::QueryResult{QueryResult::StartProviding}`].
719 pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
720 // Note: We store our own provider records locally without local addresses
721 // to avoid redundant storage and outdated addresses. Instead these are
722 // acquired on demand when returning a `ProviderRecord` for the local node.
723 let local_addrs = Vec::new();
724 let record = ProviderRecord::new(
725 key.clone(),
726 *self.kbuckets.local_key().preimage(),
727 local_addrs);
728 self.store.add_provider(record)?;
729 let target = kbucket::Key::new(key.clone());
730 let peers = self.kbuckets.closest_keys(&target);
731 let context = AddProviderContext::Publish;
732 let info = QueryInfo::AddProvider {
733 context,
734 key,
735 phase: AddProviderPhase::GetClosestPeers
736 };
737 let inner = QueryInner::new(info);
738 let id = self.queries.add_iter_closest(target.clone(), peers, inner);
739 Ok(id)
740 }
741
742 /// Stops the local node from announcing that it is a provider for the given key.
743 ///
744 /// This is a local operation. The local node will still be considered as a
745 /// provider for the key by other nodes until these provider records expire.
746 pub fn stop_providing(&mut self, key: &record::Key) {
747 self.store.remove_provider(key, self.kbuckets.local_key().preimage());
748 }
749
750 /// Performs a lookup for providers of a value to the given key.
751 ///
752 /// The result of this operation is delivered in a
753 /// reported via [`KademliaEvent::QueryResult{QueryResult::GetProviders}`].
754 pub fn get_providers(&mut self, key: record::Key) -> QueryId {
755 let info = QueryInfo::GetProviders {
756 key: key.clone(),
757 providers: HashSet::new(),
758 };
759 let target = kbucket::Key::new(key);
760 let peers = self.kbuckets.closest_keys(&target);
761 let inner = QueryInner::new(info);
762 self.queries.add_iter_closest(target.clone(), peers, inner)
763 }
764
765 /// Processes discovered peers from a successful request in an iterative `Query`.
766 fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
767 where
768 I: Iterator<Item = &'a KadPeer> + Clone
769 {
770 let local_id = self.kbuckets.local_key().preimage();
771 let others_iter = peers.filter(|p| &p.node_id != local_id);
772 if let Some(query) = self.queries.get_mut(query_id) {
773 log::trace!("Request to {:?} in query {:?} succeeded.", source, query_id);
774 for peer in others_iter.clone() {
775 log::trace!("Peer {:?} reported by {:?} in query {:?}.",
776 peer, source, query_id);
777 let addrs = peer.multiaddrs.iter().cloned().collect();
778 query.inner.addresses.insert(peer.node_id, addrs);
779 }
780 query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
781 }
782 }
783
784 /// Finds the closest peers to a `target` in the context of a request by
785 /// the `source` peer, such that the `source` peer is never included in the
786 /// result.
787 fn find_closest<T: Clone>(&mut self, target: &kbucket::Key<T>, source: &PeerId) -> Vec<KadPeer> {
788 if target == self.kbuckets.local_key() {
789 Vec::new()
790 } else {
791 self.kbuckets
792 .closest(target)
793 .filter(|e| e.node.key.preimage() != source)
794 .take(self.queries.config().replication_factor.get())
795 .map(KadPeer::from)
796 .collect()
797 }
798 }
799
800 /// Collects all peers who are known to be providers of the value for a given `Multihash`.
801 fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
802 let kbuckets = &mut self.kbuckets;
803 let connected = &mut self.connected_peers;
804 let local_addrs = &self.local_addrs;
805 self.store.providers(key)
806 .into_iter()
807 .filter_map(move |p|
808 if &p.provider != source {
809 let node_id = p.provider;
810 let multiaddrs = p.addresses;
811 let connection_ty = if connected.contains(&node_id) {
812 KadConnectionType::Connected
813 } else {
814 KadConnectionType::NotConnected
815 };
816 if multiaddrs.is_empty() {
817 // The provider is either the local node and we fill in
818 // the local addresses on demand, or it is a legacy
819 // provider record without addresses, in which case we
820 // try to find addresses in the routing table, as was
821 // done before provider records were stored along with
822 // their addresses.
823 if &node_id == kbuckets.local_key().preimage() {
824 Some(local_addrs.iter().cloned().collect::<Vec<_>>())
825 } else {
826 let key = kbucket::Key::from(node_id);
827 kbuckets.entry(&key).view().map(|e| e.node.value.clone().into_vec())
828 }
829 } else {
830 Some(multiaddrs)
831 }
832 .map(|multiaddrs| {
833 KadPeer {
834 node_id,
835 multiaddrs,
836 connection_ty,
837 }
838 })
839 } else {
840 None
841 })
842 .take(self.queries.config().replication_factor.get())
843 .collect()
844 }
845
846 /// Starts an iterative `ADD_PROVIDER` query for the given key.
847 fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
848 let info = QueryInfo::AddProvider {
849 context,
850 key: key.clone(),
851 phase: AddProviderPhase::GetClosestPeers
852 };
853 let target = kbucket::Key::new(key);
854 let peers = self.kbuckets.closest_keys(&target);
855 let inner = QueryInner::new(info);
856 self.queries.add_iter_closest(target.clone(), peers, inner);
857 }
858
859 /// Starts an iterative `PUT_VALUE` query for the given record.
860 fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
861 let quorum = quorum.eval(self.queries.config().replication_factor);
862 let target = kbucket::Key::new(record.key.clone());
863 let peers = self.kbuckets.closest_keys(&target);
864 let info = QueryInfo::PutRecord {
865 record, quorum, context, phase: PutRecordPhase::GetClosestPeers
866 };
867 let inner = QueryInner::new(info);
868 self.queries.add_iter_closest(target.clone(), peers, inner);
869 }
870
871 /// Updates the routing table with a new connection status and address of a peer.
872 fn connection_updated(&mut self, peer: PeerId, address: Option<Multiaddr>, new_status: NodeStatus) {
873 let key = kbucket::Key::from(peer);
874 match self.kbuckets.entry(&key) {
875 kbucket::Entry::Present(mut entry, old_status) => {
876 if let Some(address) = address {
877 if entry.value().insert(address) {
878 self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
879 KademliaEvent::RoutingUpdated {
880 peer,
881 addresses: entry.value().clone(),
882 old_peer: None,
883 }
884 ))
885 }
886 }
887 if old_status != new_status {
888 entry.update(new_status);
889 }
890 },
891
892 kbucket::Entry::Pending(mut entry, old_status) => {
893 if let Some(address) = address {
894 entry.value().insert(address);
895 }
896 if old_status != new_status {
897 entry.update(new_status);
898 }
899 },
900
901 kbucket::Entry::Absent(entry) => {
902 // Only connected nodes with a known address are newly inserted.
903 if new_status != NodeStatus::Connected {
904 return
905 }
906 match (address, self.kbucket_inserts) {
907 (None, _) => {
908 self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
909 KademliaEvent::UnroutablePeer { peer }
910 ));
911 }
912 (Some(a), KademliaBucketInserts::Manual) => {
913 self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
914 KademliaEvent::RoutablePeer { peer, address: a }
915 ));
916 }
917 (Some(a), KademliaBucketInserts::OnConnected) => {
918 let addresses = Addresses::new(a);
919 match entry.insert(addresses.clone(), new_status) {
920 kbucket::InsertResult::Inserted => {
921 let event = KademliaEvent::RoutingUpdated {
922 peer,
923 addresses,
924 old_peer: None,
925 };
926 self.queued_events.push_back(
927 NetworkBehaviourAction::GenerateEvent(event));
928 },
929 kbucket::InsertResult::Full => {
930 debug!("Bucket full. Peer not added to routing table: {}", peer);
931 let address = addresses.first().clone();
932 self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
933 KademliaEvent::RoutablePeer { peer, address }
934 ));
935 },
936 kbucket::InsertResult::Pending { disconnected } => {
937 debug_assert!(!self.connected_peers.contains(disconnected.preimage()));
938 let address = addresses.first().clone();
939 self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
940 KademliaEvent::PendingRoutablePeer { peer, address }
941 ));
942 self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
943 peer_id: disconnected.into_preimage(),
944 condition: DialPeerCondition::Disconnected
945 })
946 },
947 }
948 }
949 }
950 },
951 _ => {}
952 }
953 }
954
955 /// Handles a finished (i.e. successful) query.
956 fn query_finished(&mut self, q: Query<QueryInner>, params: &mut impl PollParameters)
957 -> Option<KademliaEvent>
958 {
959 let query_id = q.id();
960 log::trace!("Query {:?} finished.", query_id);
961 let result = q.into_result();
962 match result.inner.info {
963 QueryInfo::Bootstrap { peer, remaining } => {
964 let local_key = self.kbuckets.local_key().clone();
965 let mut remaining = remaining.unwrap_or_else(|| {
966 debug_assert_eq!(&peer, local_key.preimage());
967 // The lookup for the local key finished. To complete the bootstrap process,
968 // a bucket refresh should be performed for every bucket farther away than
969 // the first non-empty bucket (which are most likely no more than the last
970 // few, i.e. farthest, buckets).
971 self.kbuckets.iter()
972 .skip_while(|b| b.is_empty())
973 .skip(1) // Skip the bucket with the closest neighbour.
974 .map(|b| {
975 // Try to find a key that falls into the bucket. While such keys can
976 // be generated fully deterministically, the current libp2p kademlia
977 // wire protocol requires transmission of the preimages of the actual
978 // keys in the DHT keyspace, hence for now this is just a "best effort"
979 // to find a key that hashes into a specific bucket. The probabilities
980 // of finding a key in the bucket `b` with as most 16 trials are as
981 // follows:
982 //
983 // Pr(bucket-255) = 1 - (1/2)^16 ~= 1
984 // Pr(bucket-254) = 1 - (3/4)^16 ~= 1
985 // Pr(bucket-253) = 1 - (7/8)^16 ~= 0.88
986 // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
987 // ...
988 let mut target = kbucket::Key::from(PeerId::random());
989 for _ in 0 .. 16 {
990 let d = local_key.distance(&target);
991 if b.contains(&d) {
992 break;
993 }
994 target = kbucket::Key::from(PeerId::random());
995 }
996 target
997 }).collect::<Vec<_>>().into_iter()
998 });
999
1000 let num_remaining = remaining.len().saturating_sub(1) as u32;
1001
1002 if let Some(target) = remaining.next() {
1003 let info = QueryInfo::Bootstrap {
1004 peer: target.clone().into_preimage(),
1005 remaining: Some(remaining)
1006 };
1007 let peers = self.kbuckets.closest_keys(&target);
1008 let inner = QueryInner::new(info);
1009 self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
1010 }
1011
1012 Some(KademliaEvent::QueryResult {
1013 id: query_id,
1014 stats: result.stats,
1015 result: QueryResult::Bootstrap(Ok(BootstrapOk { peer, num_remaining }))
1016 })
1017 }
1018
1019 QueryInfo::GetClosestPeers { key, .. } => {
1020 Some(KademliaEvent::QueryResult {
1021 id: query_id,
1022 stats: result.stats,
1023 result: QueryResult::GetClosestPeers(Ok(
1024 GetClosestPeersOk { key, peers: result.peers.collect() }
1025 ))
1026 })
1027 }
1028
1029 QueryInfo::GetProviders { key, providers } => {
1030 Some(KademliaEvent::QueryResult {
1031 id: query_id,
1032 stats: result.stats,
1033 result: QueryResult::GetProviders(Ok(
1034 GetProvidersOk {
1035 key,
1036 providers,
1037 closest_peers: result.peers.collect()
1038 }
1039 ))
1040 })
1041 }
1042
1043 QueryInfo::AddProvider {
1044 context,
1045 key,
1046 phase: AddProviderPhase::GetClosestPeers
1047 } => {
1048 let provider_id = *params.local_peer_id();
1049 let external_addresses = params.external_addresses().map(|r| r.addr).collect();
1050 let inner = QueryInner::new(QueryInfo::AddProvider {
1051 context,
1052 key,
1053 phase: AddProviderPhase::AddProvider {
1054 provider_id,
1055 external_addresses,
1056 get_closest_peers_stats: result.stats
1057 }
1058 });
1059 self.queries.continue_fixed(query_id, result.peers, inner);
1060 None
1061 }
1062
1063 QueryInfo::AddProvider {
1064 context,
1065 key,
1066 phase: AddProviderPhase::AddProvider { get_closest_peers_stats, .. }
1067 } => {
1068 match context {
1069 AddProviderContext::Publish => {
1070 Some(KademliaEvent::QueryResult {
1071 id: query_id,
1072 stats: get_closest_peers_stats.merge(result.stats),
1073 result: QueryResult::StartProviding(Ok(AddProviderOk { key }))
1074 })
1075 }
1076 AddProviderContext::Republish => {
1077 Some(KademliaEvent::QueryResult {
1078 id: query_id,
1079 stats: get_closest_peers_stats.merge(result.stats),
1080 result: QueryResult::RepublishProvider(Ok(AddProviderOk { key }))
1081 })
1082 }
1083 }
1084 }
1085
1086 QueryInfo::GetRecord { key, records, quorum, cache_at } => {
1087 let results = if records.len() >= quorum.get() { // [not empty]
1088 if let Some(cache_key) = cache_at {
1089 // Cache the record at the closest node to the key that
1090 // did not return the record.
1091 let record = records.first().expect("[not empty]").record.clone();
1092 let quorum = NonZeroUsize::new(1).expect("1 > 0");
1093 let context = PutRecordContext::Cache;
1094 let info = QueryInfo::PutRecord {
1095 context,
1096 record,
1097 quorum,
1098 phase: PutRecordPhase::PutRecord {
1099 success: vec![],
1100 get_closest_peers_stats: QueryStats::empty()
1101 }
1102 };
1103 let inner = QueryInner::new(info);
1104 self.queries.add_fixed(iter::once(cache_key.into_preimage()), inner);
1105 }
1106 Ok(GetRecordOk { records })
1107 } else if records.is_empty() {
1108 Err(GetRecordError::NotFound {
1109 key,
1110 closest_peers: result.peers.collect()
1111 })
1112 } else {
1113 Err(GetRecordError::QuorumFailed { key, records, quorum })
1114 };
1115 Some(KademliaEvent::QueryResult {
1116 id: query_id,
1117 stats: result.stats,
1118 result: QueryResult::GetRecord(results)
1119 })
1120 }
1121
1122 QueryInfo::PutRecord {
1123 context,
1124 record,
1125 quorum,
1126 phase: PutRecordPhase::GetClosestPeers
1127 } => {
1128 let info = QueryInfo::PutRecord {
1129 context,
1130 record,
1131 quorum,
1132 phase: PutRecordPhase::PutRecord {
1133 success: vec![],
1134 get_closest_peers_stats: result.stats
1135 }
1136 };
1137 let inner = QueryInner::new(info);
1138 self.queries.continue_fixed(query_id, result.peers, inner);
1139 None
1140 }
1141
1142 QueryInfo::PutRecord {
1143 context,
1144 record,
1145 quorum,
1146 phase: PutRecordPhase::PutRecord { success, get_closest_peers_stats }
1147 } => {
1148 let mk_result = |key: record::Key| {
1149 if success.len() >= quorum.get() {
1150 Ok(PutRecordOk { key })
1151 } else {
1152 Err(PutRecordError::QuorumFailed { key, quorum, success })
1153 }
1154 };
1155 match context {
1156 PutRecordContext::Publish =>
1157 Some(KademliaEvent::QueryResult {
1158 id: query_id,
1159 stats: get_closest_peers_stats.merge(result.stats),
1160 result: QueryResult::PutRecord(mk_result(record.key))
1161 }),
1162 PutRecordContext::Republish =>
1163 Some(KademliaEvent::QueryResult {
1164 id: query_id,
1165 stats: get_closest_peers_stats.merge(result.stats),
1166 result: QueryResult::RepublishRecord(mk_result(record.key))
1167 }),
1168 PutRecordContext::Replicate => {
1169 debug!("Record replicated: {:?}", record.key);
1170 None
1171 }
1172 PutRecordContext::Cache => {
1173 debug!("Record cached: {:?}", record.key);
1174 None
1175 }
1176 }
1177 }
1178 }
1179 }
1180
1181 /// Handles a query that timed out.
1182 fn query_timeout(&mut self, query: Query<QueryInner>) -> Option<KademliaEvent> {
1183 let query_id = query.id();
1184 log::trace!("Query {:?} timed out.", query_id);
1185 let result = query.into_result();
1186 match result.inner.info {
1187 QueryInfo::Bootstrap { peer, mut remaining } => {
1188 let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1189
1190 if let Some(mut remaining) = remaining.take() {
1191 // Continue with the next bootstrap query if `remaining` is not empty.
1192 if let Some(target) = remaining.next() {
1193 let info = QueryInfo::Bootstrap {
1194 peer: target.clone().into_preimage(),
1195 remaining: Some(remaining)
1196 };
1197 let peers = self.kbuckets.closest_keys(&target);
1198 let inner = QueryInner::new(info);
1199 self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
1200 }
1201 }
1202
1203 Some(KademliaEvent::QueryResult {
1204 id: query_id,
1205 stats: result.stats,
1206 result: QueryResult::Bootstrap(Err(
1207 BootstrapError::Timeout { peer, num_remaining }
1208 ))
1209 })
1210 }
1211
1212 QueryInfo::AddProvider { context, key, .. } =>
1213 Some(match context {
1214 AddProviderContext::Publish =>
1215 KademliaEvent::QueryResult {
1216 id: query_id,
1217 stats: result.stats,
1218 result: QueryResult::StartProviding(Err(
1219 AddProviderError::Timeout { key }
1220 ))
1221 },
1222 AddProviderContext::Republish =>
1223 KademliaEvent::QueryResult {
1224 id: query_id,
1225 stats: result.stats,
1226 result: QueryResult::RepublishProvider(Err(
1227 AddProviderError::Timeout { key }
1228 ))
1229 }
1230 }),
1231
1232 QueryInfo::GetClosestPeers { key } => {
1233 Some(KademliaEvent::QueryResult {
1234 id: query_id,
1235 stats: result.stats,
1236 result: QueryResult::GetClosestPeers(Err(
1237 GetClosestPeersError::Timeout {
1238 key,
1239 peers: result.peers.collect()
1240 }
1241 ))
1242 })
1243 },
1244
1245 QueryInfo::PutRecord { record, quorum, context, phase } => {
1246 let err = Err(PutRecordError::Timeout {
1247 key: record.key,
1248 quorum,
1249 success: match phase {
1250 PutRecordPhase::GetClosestPeers => vec![],
1251 PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1252 }
1253 });
1254 match context {
1255 PutRecordContext::Publish =>
1256 Some(KademliaEvent::QueryResult {
1257 id: query_id,
1258 stats: result.stats,
1259 result: QueryResult::PutRecord(err)
1260 }),
1261 PutRecordContext::Republish =>
1262 Some(KademliaEvent::QueryResult {
1263 id: query_id,
1264 stats: result.stats,
1265 result: QueryResult::RepublishRecord(err)
1266 }),
1267 PutRecordContext::Replicate => match phase {
1268 PutRecordPhase::GetClosestPeers => {
1269 warn!("Locating closest peers for replication failed: {:?}", err);
1270 None
1271 }
1272 PutRecordPhase::PutRecord { .. } => {
1273 debug!("Replicating record failed: {:?}", err);
1274 None
1275 }
1276 }
1277 PutRecordContext::Cache => match phase {
1278 PutRecordPhase::GetClosestPeers => {
1279 // Caching a record at the closest peer to a key that did not return
1280 // a record is never preceded by a lookup for the closest peers, i.e.
1281 // it is a direct query to a single peer.
1282 unreachable!()
1283 }
1284 PutRecordPhase::PutRecord { .. } => {
1285 debug!("Caching record failed: {:?}", err);
1286 None
1287 }
1288 }
1289 }
1290 }
1291
1292 QueryInfo::GetRecord { key, records, quorum, .. } =>
1293 Some(KademliaEvent::QueryResult {
1294 id: query_id,
1295 stats: result.stats,
1296 result: QueryResult::GetRecord(Err(
1297 GetRecordError::Timeout { key, records, quorum },
1298 ))
1299 }),
1300
1301 QueryInfo::GetProviders { key, providers } =>
1302 Some(KademliaEvent::QueryResult {
1303 id: query_id,
1304 stats: result.stats,
1305 result: QueryResult::GetProviders(Err(
1306 GetProvidersError::Timeout {
1307 key,
1308 providers,
1309 closest_peers: result.peers.collect()
1310 }
1311 ))
1312 })
1313 }
1314 }
1315
1316 /// Processes a record received from a peer.
1317 fn record_received(
1318 &mut self,
1319 source: PeerId,
1320 connection: ConnectionId,
1321 request_id: KademliaRequestId,
1322 mut record: Record
1323 ) {
1324 if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1325 // If the (alleged) publisher is the local node, do nothing. The record of
1326 // the original publisher should never change as a result of replication
1327 // and the publisher is always assumed to have the "right" value.
1328 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1329 peer_id: source,
1330 handler: NotifyHandler::One(connection),
1331 event: KademliaHandlerIn::PutRecordRes {
1332 key: record.key,
1333 value: record.value,
1334 request_id,
1335 },
1336 });
1337 return
1338 }
1339
1340 let now = Instant::now();
1341
1342 // Calculate the expiration exponentially inversely proportional to the
1343 // number of nodes between the local node and the closest node to the key
1344 // (beyond the replication factor). This ensures avoiding over-caching
1345 // outside of the k closest nodes to a key.
1346 let target = kbucket::Key::new(record.key.clone());
1347 let num_between = self.kbuckets.count_nodes_between(&target);
1348 let k = self.queries.config().replication_factor.get();
1349 let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1350 let expiration = self.record_ttl.map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1351 // The smaller TTL prevails. Only if neither TTL is set is the record
1352 // stored "forever".
1353 record.expires = record.expires.or(expiration).min(expiration);
1354
1355 if let Some(job) = self.put_record_job.as_mut() {
1356 // Ignore the record in the next run of the replication
1357 // job, since we can assume the sender replicated the
1358 // record to the k closest peers. Effectively, only
1359 // one of the k closest peers performs a replication
1360 // in the configured interval, assuming a shared interval.
1361 job.skip(record.key.clone())
1362 }
1363
1364 // While records received from a publisher, as well as records that do
1365 // not exist locally should always (attempted to) be stored, there is a
1366 // choice here w.r.t. the handling of replicated records whose keys refer
1367 // to records that exist locally: The value and / or the publisher may
1368 // either be overridden or left unchanged. At the moment and in the
1369 // absence of a decisive argument for another option, both are always
1370 // overridden as it avoids having to load the existing record in the
1371 // first place.
1372
1373 if !record.is_expired(now) {
1374 // The record is cloned because of the weird libp2p protocol
1375 // requirement to send back the value in the response, although this
1376 // is a waste of resources.
1377 match self.store.put(record.clone()) {
1378 Ok(()) => debug!("Record stored: {:?}; {} bytes", record.key, record.value.len()),
1379 Err(e) => {
1380 info!("Record not stored: {:?}", e);
1381 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1382 peer_id: source,
1383 handler: NotifyHandler::One(connection),
1384 event: KademliaHandlerIn::Reset(request_id)
1385 });
1386
1387 return
1388 }
1389 }
1390 }
1391
1392 // The remote receives a [`KademliaHandlerIn::PutRecordRes`] even in the
1393 // case where the record is discarded due to being expired. Given that
1394 // the remote sent the local node a [`KademliaHandlerEvent::PutRecord`]
1395 // request, the remote perceives the local node as one node among the k
1396 // closest nodes to the target. In addition returning
1397 // [`KademliaHandlerIn::PutRecordRes`] does not reveal any internal
1398 // information to a possibly malicious remote node.
1399 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1400 peer_id: source,
1401 handler: NotifyHandler::One(connection),
1402 event: KademliaHandlerIn::PutRecordRes {
1403 key: record.key,
1404 value: record.value,
1405 request_id,
1406 },
1407 })
1408 }
1409
1410 /// Processes a provider record received from a peer.
1411 fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1412 if &provider.node_id != self.kbuckets.local_key().preimage() {
1413 let record = ProviderRecord {
1414 key,
1415 provider: provider.node_id,
1416 expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1417 addresses: provider.multiaddrs,
1418 };
1419 if let Err(e) = self.store.add_provider(record) {
1420 info!("Provider record not stored: {:?}", e);
1421 }
1422 }
1423 }
1424}
1425
1426/// Exponentially decrease the given duration (base 2).
1427fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
1428 Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
1429}
1430
1431impl<TStore> NetworkBehaviour for Kademlia<TStore>
1432where
1433 for<'a> TStore: RecordStore<'a>,
1434 TStore: Send + 'static,
1435{
1436 type ProtocolsHandler = KademliaHandlerProto<QueryId>;
1437 type OutEvent = KademliaEvent;
1438
1439 fn new_handler(&mut self) -> Self::ProtocolsHandler {
1440 KademliaHandlerProto::new(KademliaHandlerConfig {
1441 protocol_config: self.protocol_config.clone(),
1442 allow_listening: true,
1443 idle_timeout: self.connection_idle_timeout,
1444 })
1445 }
1446
1447 fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
1448 // We should order addresses from decreasing likelyhood of connectivity, so start with
1449 // the addresses of that peer in the k-buckets.
1450 let key = kbucket::Key::from(*peer_id);
1451 let mut peer_addrs =
1452 if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
1453 let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
1454 debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
1455 addrs
1456 } else {
1457 Vec::new()
1458 };
1459
1460 // We add to that a temporary list of addresses from the ongoing queries.
1461 for query in self.queries.iter() {
1462 if let Some(addrs) = query.inner.addresses.get(peer_id) {
1463 peer_addrs.extend(addrs.iter().cloned())
1464 }
1465 }
1466
1467 peer_addrs
1468 }
1469
1470 fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {
1471 // When a connection is established, we don't know yet whether the
1472 // remote supports the configured protocol name. Only once a connection
1473 // handler reports [`KademliaHandlerEvent::ProtocolConfirmed`] do we
1474 // update the local routing table.
1475 }
1476
1477 fn inject_connected(&mut self, peer: &PeerId) {
1478 // Queue events for sending pending RPCs to the connected peer.
1479 // There can be only one pending RPC for a particular peer and query per definition.
1480 for (peer_id, event) in self.queries.iter_mut().filter_map(|q|
1481 q.inner.pending_rpcs.iter()
1482 .position(|(p, _)| p == peer)
1483 .map(|p| q.inner.pending_rpcs.remove(p)))
1484 {
1485 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1486 peer_id, event, handler: NotifyHandler::Any
1487 });
1488 }
1489
1490 self.connected_peers.insert(*peer);
1491 }
1492
1493 fn inject_address_change(
1494 &mut self,
1495 peer: &PeerId,
1496 _: &ConnectionId,
1497 old: &ConnectedPoint,
1498 new: &ConnectedPoint
1499 ) {
1500 let (old, new) = (old.get_remote_address(), new.get_remote_address());
1501
1502 // Update routing table.
1503 if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::from(*peer)).value() {
1504 if addrs.replace(old, new) {
1505 debug!("Address '{}' replaced with '{}' for peer '{}'.", old, new, peer);
1506 } else {
1507 debug!(
1508 "Address '{}' not replaced with '{}' for peer '{}' as old address wasn't \
1509 present.",
1510 old, new, peer,
1511 );
1512 }
1513 } else {
1514 debug!(
1515 "Address '{}' not replaced with '{}' for peer '{}' as peer is not present in the \
1516 routing table.",
1517 old, new, peer,
1518 );
1519 }
1520
1521 // Update query address cache.
1522 //
1523 // Given two connected nodes: local node A and remote node B. Say node B
1524 // is not in node A's routing table. Additionally node B is part of the
1525 // `QueryInner::addresses` list of an ongoing query on node A. Say Node
1526 // B triggers an address change and then disconnects. Later on the
1527 // earlier mentioned query on node A would like to connect to node B.
1528 // Without replacing the address in the `QueryInner::addresses` set node
1529 // A would attempt to dial the old and not the new address.
1530 //
1531 // While upholding correctness, iterating through all discovered
1532 // addresses of a peer in all currently ongoing queries might have a
1533 // large performance impact. If so, the code below might be worth
1534 // revisiting.
1535 for query in self.queries.iter_mut() {
1536 if let Some(addrs) = query.inner.addresses.get_mut(peer) {
1537 for addr in addrs.iter_mut() {
1538 if addr == old {
1539 *addr = new.clone();
1540 }
1541 }
1542 }
1543 }
1544 }
1545
1546 fn inject_addr_reach_failure(
1547 &mut self,
1548 peer_id: Option<&PeerId>,
1549 addr: &Multiaddr,
1550 err: &dyn error::Error
1551 ) {
1552 if let Some(peer_id) = peer_id {
1553 let key = kbucket::Key::from(*peer_id);
1554
1555 if let Some(addrs) = self.kbuckets.entry(&key).value() {
1556 // TODO: Ideally, the address should only be removed if the error can
1557 // be classified as "permanent" but since `err` is currently a borrowed
1558 // trait object without a `'static` bound, even downcasting for inspection
1559 // of the error is not possible (and also not truly desirable or ergonomic).
1560 // The error passed in should rather be a dedicated enum.
1561 if addrs.remove(addr).is_ok() {
1562 debug!("Address '{}' removed from peer '{}' due to error: {}.",
1563 addr, peer_id, err);
1564 } else {
1565 // Despite apparently having no reachable address (any longer),
1566 // the peer is kept in the routing table with the last address to avoid
1567 // (temporary) loss of network connectivity to "flush" the routing
1568 // table. Once in, a peer is only removed from the routing table
1569 // if it is the least recently connected peer, currently disconnected
1570 // and is unreachable in the context of another peer pending insertion
1571 // into the same bucket. This is handled transparently by the
1572 // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
1573 // within `Kademlia::poll`.
1574 debug!("Last remaining address '{}' of peer '{}' is unreachable: {}.",
1575 addr, peer_id, err)
1576 }
1577 }
1578
1579 for query in self.queries.iter_mut() {
1580 if let Some(addrs) = query.inner.addresses.get_mut(peer_id) {
1581 addrs.retain(|a| a != addr);
1582 }
1583 }
1584 }
1585 }
1586
1587 fn inject_dial_failure(&mut self, peer_id: &PeerId) {
1588 for query in self.queries.iter_mut() {
1589 query.on_failure(peer_id);
1590 }
1591 }
1592
1593 fn inject_disconnected(&mut self, id: &PeerId) {
1594 for query in self.queries.iter_mut() {
1595 query.on_failure(id);
1596 }
1597 self.connection_updated(*id, None, NodeStatus::Disconnected);
1598 self.connected_peers.remove(id);
1599 }
1600
1601 fn inject_event(
1602 &mut self,
1603 source: PeerId,
1604 connection: ConnectionId,
1605 event: KademliaHandlerEvent<QueryId>
1606 ) {
1607 match event {
1608 KademliaHandlerEvent::ProtocolConfirmed { endpoint } => {
1609 debug_assert!(self.connected_peers.contains(&source));
1610 // The remote's address can only be put into the routing table,
1611 // and thus shared with other nodes, if the local node is the dialer,
1612 // since the remote address on an inbound connection may be specific
1613 // to that connection (e.g. typically the TCP port numbers).
1614 let address = match endpoint {
1615 ConnectedPoint::Dialer { address } => Some(address),
1616 ConnectedPoint::Listener { .. } => None,
1617 };
1618 self.connection_updated(source, address, NodeStatus::Connected);
1619 }
1620
1621 KademliaHandlerEvent::FindNodeReq { key, request_id } => {
1622 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
1623 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1624 peer_id: source,
1625 handler: NotifyHandler::One(connection),
1626 event: KademliaHandlerIn::FindNodeRes {
1627 closer_peers,
1628 request_id,
1629 },
1630 });
1631 }
1632
1633 KademliaHandlerEvent::FindNodeRes {
1634 closer_peers,
1635 user_data,
1636 } => {
1637 self.discovered(&user_data, &source, closer_peers.iter());
1638 }
1639
1640 KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
1641 let provider_peers = self.provider_peers(&key, &source);
1642 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
1643 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1644 peer_id: source,
1645 handler: NotifyHandler::One(connection),
1646 event: KademliaHandlerIn::GetProvidersRes {
1647 closer_peers,
1648 provider_peers,
1649 request_id,
1650 },
1651 });
1652 }
1653
1654 KademliaHandlerEvent::GetProvidersRes {
1655 closer_peers,
1656 provider_peers,
1657 user_data,
1658 } => {
1659 let peers = closer_peers.iter().chain(provider_peers.iter());
1660 self.discovered(&user_data, &source, peers);
1661 if let Some(query) = self.queries.get_mut(&user_data) {
1662 if let QueryInfo::GetProviders {
1663 providers, ..
1664 } = &mut query.inner.info {
1665 for peer in provider_peers {
1666 providers.insert(peer.node_id);
1667 }
1668 }
1669 }
1670 }
1671
1672 KademliaHandlerEvent::QueryError { user_data, error } => {
1673 log::debug!("Request to {:?} in query {:?} failed with {:?}",
1674 source, user_data, error);
1675 // If the query to which the error relates is still active,
1676 // signal the failure w.r.t. `source`.
1677 if let Some(query) = self.queries.get_mut(&user_data) {
1678 query.on_failure(&source)
1679 }
1680 }
1681
1682 KademliaHandlerEvent::AddProvider { key, provider } => {
1683 // Only accept a provider record from a legitimate peer.
1684 if provider.node_id != source {
1685 return
1686 }
1687
1688 self.provider_received(key, provider)
1689 }
1690
1691 KademliaHandlerEvent::GetRecord { key, request_id } => {
1692 // Lookup the record locally.
1693 let record = match self.store.get(&key) {
1694 Some(record) => {
1695 if record.is_expired(Instant::now()) {
1696 self.store.remove(&key);
1697 None
1698 } else {
1699 Some(record.into_owned())
1700 }
1701 },
1702 None => None
1703 };
1704
1705 let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
1706
1707 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1708 peer_id: source,
1709 handler: NotifyHandler::One(connection),
1710 event: KademliaHandlerIn::GetRecordRes {
1711 record,
1712 closer_peers,
1713 request_id,
1714 },
1715 });
1716 }
1717
1718 KademliaHandlerEvent::GetRecordRes {
1719 record,
1720 closer_peers,
1721 user_data,
1722 } => {
1723 if let Some(query) = self.queries.get_mut(&user_data) {
1724 if let QueryInfo::GetRecord {
1725 key, records, quorum, cache_at
1726 } = &mut query.inner.info {
1727 if let Some(record) = record {
1728 records.push(PeerRecord{ peer: Some(source), record });
1729
1730 let quorum = quorum.get();
1731 if records.len() >= quorum {
1732 // Desired quorum reached. The query may finish. See
1733 // [`Query::try_finish`] for details.
1734 let peers = records.iter()
1735 .filter_map(|PeerRecord{ peer, .. }| peer.as_ref())
1736 .cloned()
1737 .collect::<Vec<_>>();
1738 let finished = query.try_finish(peers.iter());
1739 if !finished {
1740 debug!(
1741 "GetRecord query ({:?}) reached quorum ({}/{}) with \
1742 response from peer {} but could not yet finish.",
1743 user_data, peers.len(), quorum, source,
1744 );
1745 }
1746 }
1747 } else if quorum.get() == 1 {
1748 // It is a "standard" Kademlia query, for which the
1749 // closest node to the key that did *not* return the
1750 // value is tracked in order to cache the record on
1751 // that node if the query turns out to be successful.
1752 let source_key = kbucket::Key::from(source);
1753 if let Some(cache_key) = cache_at {
1754 let key = kbucket::Key::new(key.clone());
1755 if source_key.distance(&key) < cache_key.distance(&key) {
1756 *cache_at = Some(source_key)
1757 }
1758 } else {
1759 *cache_at = Some(source_key)
1760 }
1761 }
1762 }
1763 }
1764
1765 self.discovered(&user_data, &source, closer_peers.iter());
1766 }
1767
1768 KademliaHandlerEvent::PutRecord {
1769 record,
1770 request_id
1771 } => {
1772 self.record_received(source, connection, request_id, record);
1773 }
1774
1775 KademliaHandlerEvent::PutRecordRes {
1776 user_data, ..
1777 } => {
1778 if let Some(query) = self.queries.get_mut(&user_data) {
1779 query.on_success(&source, vec![]);
1780 if let QueryInfo::PutRecord {
1781 phase: PutRecordPhase::PutRecord { success, .. }, quorum, ..
1782 } = &mut query.inner.info {
1783 success.push(source);
1784
1785 let quorum = quorum.get();
1786 if success.len() >= quorum {
1787 let peers = success.clone();
1788 let finished = query.try_finish(peers.iter());
1789 if !finished {
1790 debug!(
1791 "PutRecord query ({:?}) reached quorum ({}/{}) with response \
1792 from peer {} but could not yet finish.",
1793 user_data, peers.len(), quorum, source,
1794 );
1795 }
1796 }
1797 }
1798 }
1799 }
1800 };
1801 }
1802
1803 fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
1804 self.local_addrs.insert(addr.clone());
1805 }
1806
1807 fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
1808 self.local_addrs.remove(addr);
1809 }
1810
1811 fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
1812 if self.local_addrs.len() < MAX_LOCAL_EXTERNAL_ADDRS {
1813 self.local_addrs.insert(addr.clone());
1814 }
1815 }
1816
1817 fn poll(&mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters) -> Poll<
1818 NetworkBehaviourAction<
1819 KademliaHandlerIn<QueryId>,
1820 Self::OutEvent,
1821 >,
1822 > {
1823 let now = Instant::now();
1824
1825 // Calculate the available capacity for queries triggered by background jobs.
1826 let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
1827
1828 // Run the periodic provider announcement job.
1829 if let Some(mut job) = self.add_provider_job.take() {
1830 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
1831 for _ in 0 .. num {
1832 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
1833 self.start_add_provider(r.key, AddProviderContext::Republish)
1834 } else {
1835 break
1836 }
1837 }
1838 jobs_query_capacity -= num;
1839 self.add_provider_job = Some(job);
1840 }
1841
1842 // Run the periodic record replication / publication job.
1843 if let Some(mut job) = self.put_record_job.take() {
1844 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
1845 for _ in 0 .. num {
1846 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
1847 let context = if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1848 PutRecordContext::Republish
1849 } else {
1850 PutRecordContext::Replicate
1851 };
1852 self.start_put_record(r, Quorum::All, context)
1853 } else {
1854 break
1855 }
1856 }
1857 self.put_record_job = Some(job);
1858 }
1859
1860 loop {
1861 // Drain queued events first.
1862 if let Some(event) = self.queued_events.pop_front() {
1863 return Poll::Ready(event);
1864 }
1865
1866 // Drain applied pending entries from the routing table.
1867 if let Some(entry) = self.kbuckets.take_applied_pending() {
1868 let kbucket::Node { key, value } = entry.inserted;
1869 let event = KademliaEvent::RoutingUpdated {
1870 peer: key.into_preimage(),
1871 addresses: value,
1872 old_peer: entry.evicted.map(|n| n.key.into_preimage())
1873 };
1874 return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
1875 }
1876
1877 // Look for a finished query.
1878 loop {
1879 match self.queries.poll(now) {
1880 QueryPoolState::Finished(q) => {
1881 if let Some(event) = self.query_finished(q, parameters) {
1882 return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
1883 }
1884 }
1885 QueryPoolState::Timeout(q) => {
1886 if let Some(event) = self.query_timeout(q) {
1887 return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
1888 }
1889 }
1890 QueryPoolState::Waiting(Some((query, peer_id))) => {
1891 let event = query.inner.info.to_request(query.id());
1892 // TODO: AddProvider requests yield no response, so the query completes
1893 // as soon as all requests have been sent. However, the handler should
1894 // better emit an event when the request has been sent (and report
1895 // an error if sending fails), instead of immediately reporting
1896 // "success" somewhat prematurely here.
1897 if let QueryInfo::AddProvider {
1898 phase: AddProviderPhase::AddProvider { .. },
1899 ..
1900 } = &query.inner.info {
1901 query.on_success(&peer_id, vec![])
1902 }
1903 if self.connected_peers.contains(&peer_id) {
1904 self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1905 peer_id, event, handler: NotifyHandler::Any
1906 });
1907 } else if &peer_id != self.kbuckets.local_key().preimage() {
1908 query.inner.pending_rpcs.push((peer_id, event));
1909 self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
1910 peer_id, condition: DialPeerCondition::Disconnected
1911 });
1912 }
1913 }
1914 QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
1915 }
1916 }
1917
1918 // No immediate event was produced as a result of a finished query.
1919 // If no new events have been queued either, signal `NotReady` to
1920 // be polled again later.
1921 if self.queued_events.is_empty() {
1922 return Poll::Pending
1923 }
1924 }
1925 }
1926}
1927
1928/// A quorum w.r.t. the configured replication factor specifies the minimum
1929/// number of distinct nodes that must be successfully contacted in order
1930/// for a query to succeed.
1931#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1932pub enum Quorum {
1933 One,
1934 Majority,
1935 All,
1936 N(NonZeroUsize)
1937}
1938
1939impl Quorum {
1940 /// Evaluate the quorum w.r.t a given total (number of peers).
1941 fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
1942 match self {
1943 Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
1944 Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
1945 Quorum::All => total,
1946 Quorum::N(n) => NonZeroUsize::min(total, *n)
1947 }
1948 }
1949}
1950
1951/// A record either received by the given peer or retrieved from the local
1952/// record store.
1953#[derive(Debug, Clone, PartialEq, Eq)]
1954pub struct PeerRecord {
1955 /// The peer from whom the record was received. `None` if the record was
1956 /// retrieved from local storage.
1957 pub peer: Option<PeerId>,
1958 pub record: Record,
1959}
1960
1961//////////////////////////////////////////////////////////////////////////////
1962// Events
1963
1964/// The events produced by the `Kademlia` behaviour.
1965///
1966/// See [`NetworkBehaviour::poll`].
1967#[derive(Debug)]
1968pub enum KademliaEvent {
1969 /// A query has produced a result.
1970 QueryResult {
1971 /// The ID of the query that finished.
1972 id: QueryId,
1973 /// The result of the query.
1974 result: QueryResult,
1975 /// Execution statistics from the query.
1976 stats: QueryStats
1977 },
1978
1979 /// The routing table has been updated with a new peer and / or
1980 /// address, thereby possibly evicting another peer.
1981 RoutingUpdated {
1982 /// The ID of the peer that was added or updated.
1983 peer: PeerId,
1984 /// The full list of known addresses of `peer`.
1985 addresses: Addresses,
1986 /// The ID of the peer that was evicted from the routing table to make
1987 /// room for the new peer, if any.
1988 old_peer: Option<PeerId>,
1989 },
1990
1991 /// A peer has connected for whom no listen address is known.
1992 ///
1993 /// If the peer is to be added to the routing table, a known
1994 /// listen address for the peer must be provided via [`Kademlia::add_address`].
1995 UnroutablePeer {
1996 peer: PeerId
1997 },
1998
1999 /// A connection to a peer has been established for whom a listen address
2000 /// is known but the peer has not been added to the routing table either
2001 /// because [`KademliaBucketInserts::Manual`] is configured or because
2002 /// the corresponding bucket is full.
2003 ///
2004 /// If the peer is to be included in the routing table, it must
2005 /// must be explicitly added via [`Kademlia::add_address`], possibly after
2006 /// removing another peer.
2007 ///
2008 /// See [`Kademlia::kbucket`] for insight into the contents of
2009 /// the k-bucket of `peer`.
2010 RoutablePeer {
2011 peer: PeerId,
2012 address: Multiaddr,
2013 },
2014
2015 /// A connection to a peer has been established for whom a listen address
2016 /// is known but the peer is only pending insertion into the routing table
2017 /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2018 /// may not make it into the routing table.
2019 ///
2020 /// If the peer is to be unconditionally included in the routing table,
2021 /// it should be explicitly added via [`Kademlia::add_address`] after
2022 /// removing another peer.
2023 ///
2024 /// See [`Kademlia::kbucket`] for insight into the contents of
2025 /// the k-bucket of `peer`.
2026 PendingRoutablePeer {
2027 peer: PeerId,
2028 address: Multiaddr,
2029 }
2030}
2031
2032/// The results of Kademlia queries.
2033#[derive(Debug)]
2034pub enum QueryResult {
2035 /// The result of [`Kademlia::bootstrap`].
2036 Bootstrap(BootstrapResult),
2037
2038 /// The result of [`Kademlia::get_closest_peers`].
2039 GetClosestPeers(GetClosestPeersResult),
2040
2041 /// The result of [`Kademlia::get_providers`].
2042 GetProviders(GetProvidersResult),
2043
2044 /// The result of [`Kademlia::start_providing`].
2045 StartProviding(AddProviderResult),
2046
2047 /// The result of a (automatic) republishing of a provider record.
2048 RepublishProvider(AddProviderResult),
2049
2050 /// The result of [`Kademlia::get_record`].
2051 GetRecord(GetRecordResult),
2052
2053 /// The result of [`Kademlia::put_record`].
2054 PutRecord(PutRecordResult),
2055
2056 /// The result of a (automatic) republishing of a (value-)record.
2057 RepublishRecord(PutRecordResult),
2058}
2059
2060/// The result of [`Kademlia::get_record`].
2061pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2062
2063/// The successful result of [`Kademlia::get_record`].
2064#[derive(Debug, Clone)]
2065pub struct GetRecordOk {
2066 pub records: Vec<PeerRecord>
2067}
2068
2069/// The error result of [`Kademlia::get_record`].
2070#[derive(Debug, Clone)]
2071pub enum GetRecordError {
2072 NotFound {
2073 key: record::Key,
2074 closest_peers: Vec<PeerId>
2075 },
2076 QuorumFailed {
2077 key: record::Key,
2078 records: Vec<PeerRecord>,
2079 quorum: NonZeroUsize
2080 },
2081 Timeout {
2082 key: record::Key,
2083 records: Vec<PeerRecord>,
2084 quorum: NonZeroUsize
2085 }
2086}
2087
2088impl GetRecordError {
2089 /// Gets the key of the record for which the operation failed.
2090 pub fn key(&self) -> &record::Key {
2091 match self {
2092 GetRecordError::QuorumFailed { key, .. } => key,
2093 GetRecordError::Timeout { key, .. } => key,
2094 GetRecordError::NotFound { key, .. } => key,
2095 }
2096 }
2097
2098 /// Extracts the key of the record for which the operation failed,
2099 /// consuming the error.
2100 pub fn into_key(self) -> record::Key {
2101 match self {
2102 GetRecordError::QuorumFailed { key, .. } => key,
2103 GetRecordError::Timeout { key, .. } => key,
2104 GetRecordError::NotFound { key, .. } => key,
2105 }
2106 }
2107}
2108
2109/// The result of [`Kademlia::put_record`].
2110pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2111
2112/// The successful result of [`Kademlia::put_record`].
2113#[derive(Debug, Clone)]
2114pub struct PutRecordOk {
2115 pub key: record::Key
2116}
2117
2118/// The error result of [`Kademlia::put_record`].
2119#[derive(Debug)]
2120pub enum PutRecordError {
2121 QuorumFailed {
2122 key: record::Key,
2123 /// [`PeerId`]s of the peers the record was successfully stored on.
2124 success: Vec<PeerId>,
2125 quorum: NonZeroUsize
2126 },
2127 Timeout {
2128 key: record::Key,
2129 /// [`PeerId`]s of the peers the record was successfully stored on.
2130 success: Vec<PeerId>,
2131 quorum: NonZeroUsize
2132 },
2133}
2134
2135impl PutRecordError {
2136 /// Gets the key of the record for which the operation failed.
2137 pub fn key(&self) -> &record::Key {
2138 match self {
2139 PutRecordError::QuorumFailed { key, .. } => key,
2140 PutRecordError::Timeout { key, .. } => key,
2141 }
2142 }
2143
2144 /// Extracts the key of the record for which the operation failed,
2145 /// consuming the error.
2146 pub fn into_key(self) -> record::Key {
2147 match self {
2148 PutRecordError::QuorumFailed { key, .. } => key,
2149 PutRecordError::Timeout { key, .. } => key,
2150 }
2151 }
2152}
2153
2154/// The result of [`Kademlia::bootstrap`].
2155pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
2156
2157/// The successful result of [`Kademlia::bootstrap`].
2158#[derive(Debug, Clone)]
2159pub struct BootstrapOk {
2160 pub peer: PeerId,
2161 pub num_remaining: u32,
2162}
2163
2164/// The error result of [`Kademlia::bootstrap`].
2165#[derive(Debug, Clone)]
2166pub enum BootstrapError {
2167 Timeout {
2168 peer: PeerId,
2169 num_remaining: Option<u32>,
2170 }
2171}
2172
2173/// The result of [`Kademlia::get_closest_peers`].
2174pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
2175
2176/// The successful result of [`Kademlia::get_closest_peers`].
2177#[derive(Debug, Clone)]
2178pub struct GetClosestPeersOk {
2179 pub key: Vec<u8>,
2180 pub peers: Vec<PeerId>
2181}
2182
2183/// The error result of [`Kademlia::get_closest_peers`].
2184#[derive(Debug, Clone)]
2185pub enum GetClosestPeersError {
2186 Timeout {
2187 key: Vec<u8>,
2188 peers: Vec<PeerId>
2189 }
2190}
2191
2192impl GetClosestPeersError {
2193 /// Gets the key for which the operation failed.
2194 pub fn key(&self) -> &Vec<u8> {
2195 match self {
2196 GetClosestPeersError::Timeout { key, .. } => key,
2197 }
2198 }
2199
2200 /// Extracts the key for which the operation failed,
2201 /// consuming the error.
2202 pub fn into_key(self) -> Vec<u8> {
2203 match self {
2204 GetClosestPeersError::Timeout { key, .. } => key,
2205 }
2206 }
2207}
2208
2209/// The result of [`Kademlia::get_providers`].
2210pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
2211
2212/// The successful result of [`Kademlia::get_providers`].
2213#[derive(Debug, Clone)]
2214pub struct GetProvidersOk {
2215 pub key: record::Key,
2216 pub providers: HashSet<PeerId>,
2217 pub closest_peers: Vec<PeerId>
2218}
2219
2220/// The error result of [`Kademlia::get_providers`].
2221#[derive(Debug, Clone)]
2222pub enum GetProvidersError {
2223 Timeout {
2224 key: record::Key,
2225 providers: HashSet<PeerId>,
2226 closest_peers: Vec<PeerId>
2227 }
2228}
2229
2230impl GetProvidersError {
2231 /// Gets the key for which the operation failed.
2232 pub fn key(&self) -> &record::Key {
2233 match self {
2234 GetProvidersError::Timeout { key, .. } => key,
2235 }
2236 }
2237
2238 /// Extracts the key for which the operation failed,
2239 /// consuming the error.
2240 pub fn into_key(self) -> record::Key {
2241 match self {
2242 GetProvidersError::Timeout { key, .. } => key,
2243 }
2244 }
2245}
2246
2247/// The result of publishing a provider record.
2248pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
2249
2250/// The successful result of publishing a provider record.
2251#[derive(Debug, Clone)]
2252pub struct AddProviderOk {
2253 pub key: record::Key,
2254}
2255
2256/// The possible errors when publishing a provider record.
2257#[derive(Debug)]
2258pub enum AddProviderError {
2259 /// The query timed out.
2260 Timeout {
2261 key: record::Key,
2262 },
2263}
2264
2265impl AddProviderError {
2266 /// Gets the key for which the operation failed.
2267 pub fn key(&self) -> &record::Key {
2268 match self {
2269 AddProviderError::Timeout { key, .. } => key,
2270 }
2271 }
2272
2273 /// Extracts the key for which the operation failed,
2274 pub fn into_key(self) -> record::Key {
2275 match self {
2276 AddProviderError::Timeout { key, .. } => key,
2277 }
2278 }
2279}
2280
2281impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
2282 fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
2283 KadPeer {
2284 node_id: e.node.key.into_preimage(),
2285 multiaddrs: e.node.value.into_vec(),
2286 connection_ty: match e.status {
2287 NodeStatus::Connected => KadConnectionType::Connected,
2288 NodeStatus::Disconnected => KadConnectionType::NotConnected
2289 }
2290 }
2291 }
2292}
2293
2294//////////////////////////////////////////////////////////////////////////////
2295// Internal query state
2296
2297struct QueryInner {
2298 /// The query-specific state.
2299 info: QueryInfo,
2300 /// Addresses of peers discovered during a query.
2301 addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
2302 /// A map of pending requests to peers.
2303 ///
2304 /// A request is pending if the targeted peer is not currently connected
2305 /// and these requests are sent as soon as a connection to the peer is established.
2306 pending_rpcs: SmallVec<[(PeerId, KademliaHandlerIn<QueryId>); K_VALUE.get()]>
2307}
2308
2309impl QueryInner {
2310 fn new(info: QueryInfo) -> Self {
2311 QueryInner {
2312 info,
2313 addresses: Default::default(),
2314 pending_rpcs: SmallVec::default()
2315 }
2316 }
2317}
2318
2319/// The context of a [`QueryInfo::AddProvider`] query.
2320#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2321pub enum AddProviderContext {
2322 Publish,
2323 Republish,
2324}
2325
2326/// The context of a [`QueryInfo::PutRecord`] query.
2327#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2328pub enum PutRecordContext {
2329 Publish,
2330 Republish,
2331 Replicate,
2332 Cache,
2333}
2334
2335/// Information about a running query.
2336#[derive(Debug, Clone)]
2337pub enum QueryInfo {
2338 /// A query initiated by [`Kademlia::bootstrap`].
2339 Bootstrap {
2340 /// The targeted peer ID.
2341 peer: PeerId,
2342 /// The remaining random peer IDs to query, one per
2343 /// bucket that still needs refreshing.
2344 ///
2345 /// This is `None` if the initial self-lookup has not
2346 /// yet completed and `Some` with an exhausted iterator
2347 /// if bootstrapping is complete.
2348 remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>
2349 },
2350
2351 /// A query initiated by [`Kademlia::get_closest_peers`].
2352 GetClosestPeers { key: Vec<u8> },
2353
2354 /// A query initiated by [`Kademlia::get_providers`].
2355 GetProviders {
2356 /// The key for which to search for providers.
2357 key: record::Key,
2358 /// The found providers.
2359 providers: HashSet<PeerId>,
2360 },
2361
2362 /// A (repeated) query initiated by [`Kademlia::start_providing`].
2363 AddProvider {
2364 /// The record key.
2365 key: record::Key,
2366 /// The current phase of the query.
2367 phase: AddProviderPhase,
2368 /// The execution context of the query.
2369 context: AddProviderContext,
2370 },
2371
2372 /// A (repeated) query initiated by [`Kademlia::put_record`].
2373 PutRecord {
2374 record: Record,
2375 /// The expected quorum of responses w.r.t. the replication factor.
2376 quorum: NonZeroUsize,
2377 /// The current phase of the query.
2378 phase: PutRecordPhase,
2379 /// The execution context of the query.
2380 context: PutRecordContext,
2381 },
2382
2383 /// A query initiated by [`Kademlia::get_record`].
2384 GetRecord {
2385 /// The key to look for.
2386 key: record::Key,
2387 /// The records with the id of the peer that returned them. `None` when
2388 /// the record was found in the local store.
2389 records: Vec<PeerRecord>,
2390 /// The number of records to look for.
2391 quorum: NonZeroUsize,
2392 /// The closest peer to `key` that did not return a record.
2393 ///
2394 /// When a record is found in a standard Kademlia query (quorum == 1),
2395 /// it is cached at this peer as soon as a record is found.
2396 cache_at: Option<kbucket::Key<PeerId>>,
2397 },
2398}
2399
2400impl QueryInfo {
2401 /// Creates an event for a handler to issue an outgoing request in the
2402 /// context of a query.
2403 fn to_request(&self, query_id: QueryId) -> KademliaHandlerIn<QueryId> {
2404 match &self {
2405 QueryInfo::Bootstrap { peer, .. } => KademliaHandlerIn::FindNodeReq {
2406 key: peer.to_bytes(),
2407 user_data: query_id,
2408 },
2409 QueryInfo::GetClosestPeers { key, .. } => KademliaHandlerIn::FindNodeReq {
2410 key: key.clone(),
2411 user_data: query_id,
2412 },
2413 QueryInfo::GetProviders { key, .. } => KademliaHandlerIn::GetProvidersReq {
2414 key: key.clone(),
2415 user_data: query_id,
2416 },
2417 QueryInfo::AddProvider { key, phase, .. } => match phase {
2418 AddProviderPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq {
2419 key: key.to_vec(),
2420 user_data: query_id,
2421 },
2422 AddProviderPhase::AddProvider { provider_id, external_addresses, .. } => {
2423 KademliaHandlerIn::AddProvider {
2424 key: key.clone(),
2425 provider: crate::protocol::KadPeer {
2426 node_id: *provider_id,
2427 multiaddrs: external_addresses.clone(),
2428 connection_ty: crate::protocol::KadConnectionType::Connected,
2429 }
2430 }
2431 }
2432 },
2433 QueryInfo::GetRecord { key, .. } => KademliaHandlerIn::GetRecord {
2434 key: key.clone(),
2435 user_data: query_id,
2436 },
2437 QueryInfo::PutRecord { record, phase, .. } => match phase {
2438 PutRecordPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq {
2439 key: record.key.to_vec(),
2440 user_data: query_id,
2441 },
2442 PutRecordPhase::PutRecord { .. } => KademliaHandlerIn::PutRecord {
2443 record: record.clone(),
2444 user_data: query_id
2445 }
2446 }
2447 }
2448 }
2449}
2450
2451/// The phases of a [`QueryInfo::AddProvider`] query.
2452#[derive(Debug, Clone)]
2453pub enum AddProviderPhase {
2454 /// The query is searching for the closest nodes to the record key.
2455 GetClosestPeers,
2456
2457 /// The query advertises the local node as a provider for the key to
2458 /// the closest nodes to the key.
2459 AddProvider {
2460 /// The local peer ID that is advertised as a provider.
2461 provider_id: PeerId,
2462 /// The external addresses of the provider being advertised.
2463 external_addresses: Vec<Multiaddr>,
2464 /// Query statistics from the finished `GetClosestPeers` phase.
2465 get_closest_peers_stats: QueryStats,
2466 },
2467}
2468
2469/// The phases of a [`QueryInfo::PutRecord`] query.
2470#[derive(Debug, Clone, PartialEq, Eq)]
2471pub enum PutRecordPhase {
2472 /// The query is searching for the closest nodes to the record key.
2473 GetClosestPeers,
2474
2475 /// The query is replicating the record to the closest nodes to the key.
2476 PutRecord {
2477 /// A list of peers the given record has been successfully replicated to.
2478 success: Vec<PeerId>,
2479 /// Query statistics from the finished `GetClosestPeers` phase.
2480 get_closest_peers_stats: QueryStats,
2481 },
2482}
2483
2484/// A mutable reference to a running query.
2485pub struct QueryMut<'a> {
2486 query: &'a mut Query<QueryInner>,
2487}
2488
2489impl<'a> QueryMut<'a> {
2490 pub fn id(&self) -> QueryId {
2491 self.query.id()
2492 }
2493
2494 /// Gets information about the type and state of the query.
2495 pub fn info(&self) -> &QueryInfo {
2496 &self.query.inner.info
2497 }
2498
2499 /// Gets execution statistics about the query.
2500 ///
2501 /// For a multi-phase query such as `put_record`, these are the
2502 /// statistics of the current phase.
2503 pub fn stats(&self) -> &QueryStats {
2504 self.query.stats()
2505 }
2506
2507 /// Finishes the query asap, without waiting for the
2508 /// regular termination conditions.
2509 pub fn finish(&mut self) {
2510 self.query.finish()
2511 }
2512}
2513
2514/// An immutable reference to a running query.
2515pub struct QueryRef<'a> {
2516 query: &'a Query<QueryInner>,
2517}
2518
2519impl<'a> QueryRef<'a> {
2520 pub fn id(&self) -> QueryId {
2521 self.query.id()
2522 }
2523
2524 /// Gets information about the type and state of the query.
2525 pub fn info(&self) -> &QueryInfo {
2526 &self.query.inner.info
2527 }
2528
2529 /// Gets execution statistics about the query.
2530 ///
2531 /// For a multi-phase query such as `put_record`, these are the
2532 /// statistics of the current phase.
2533 pub fn stats(&self) -> &QueryStats {
2534 self.query.stats()
2535 }
2536}
2537
2538/// An operation failed to due no known peers in the routing table.
2539#[derive(Debug, Clone)]
2540pub struct NoKnownPeers();
2541
2542impl fmt::Display for NoKnownPeers {
2543 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2544 write!(f, "No known peers.")
2545 }
2546}
2547
2548impl std::error::Error for NoKnownPeers {}
2549
2550/// The possible outcomes of [`Kademlia::add_address`].
2551pub enum RoutingUpdate {
2552 /// The given peer and address has been added to the routing
2553 /// table.
2554 Success,
2555 /// The peer and address is pending insertion into
2556 /// the routing table, if a disconnected peer fails
2557 /// to respond. If the given peer and address ends up
2558 /// in the routing table, [`KademliaEvent::RoutingUpdated`]
2559 /// is eventually emitted.
2560 Pending,
2561 /// The routing table update failed, either because the
2562 /// corresponding bucket for the peer is full and the
2563 /// pending slot(s) are occupied, or because the given
2564 /// peer ID is deemed invalid (e.g. refers to the local
2565 /// peer ID).
2566 Failed,
2567}
2568
2569/// The maximum number of local external addresses. When reached any
2570/// further externally reported addresses are ignored. The behaviour always
2571/// tracks all its listen addresses.
2572const MAX_LOCAL_EXTERNAL_ADDRS: usize = 20;