cs_mwc_libp2p_kad/
behaviour.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use crate::K_VALUE;
26use crate::addresses::Addresses;
27use crate::handler::{
28    KademliaHandlerProto,
29    KademliaHandlerConfig,
30    KademliaRequestId,
31    KademliaHandlerEvent,
32    KademliaHandlerIn
33};
34use crate::jobs::*;
35use crate::kbucket::{self, KBucketsTable, NodeStatus};
36use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
37use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
38use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
39use fnv::{FnvHashMap, FnvHashSet};
40use mwc_libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId};
41use mwc_libp2p_swarm::{
42    DialPeerCondition,
43    NetworkBehaviour,
44    NetworkBehaviourAction,
45    NotifyHandler,
46    PollParameters,
47};
48use log::{info, debug, warn};
49use smallvec::SmallVec;
50use std::{borrow::Cow, error, iter, time::Duration};
51use std::collections::{HashSet, VecDeque};
52use std::fmt;
53use std::num::NonZeroUsize;
54use std::task::{Context, Poll};
55use std::vec;
56use wasm_timer::Instant;
57
58pub use crate::query::QueryStats;
59
60/// `Kademlia` is a `NetworkBehaviour` that implements the libp2p
61/// Kademlia protocol.
62pub struct Kademlia<TStore> {
63    /// The Kademlia routing table.
64    kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
65
66    /// The k-bucket insertion strategy.
67    kbucket_inserts: KademliaBucketInserts,
68
69    /// Configuration of the wire protocol.
70    protocol_config: KademliaProtocolConfig,
71
72    /// The currently active (i.e. in-progress) queries.
73    queries: QueryPool<QueryInner>,
74
75    /// The currently connected peers.
76    ///
77    /// This is a superset of the connected peers currently in the routing table.
78    connected_peers: FnvHashSet<PeerId>,
79
80    /// Periodic job for re-publication of provider records for keys
81    /// provided by the local node.
82    add_provider_job: Option<AddProviderJob>,
83
84    /// Periodic job for (re-)replication and (re-)publishing of
85    /// regular (value-)records.
86    put_record_job: Option<PutRecordJob>,
87
88    /// The TTL of regular (value-)records.
89    record_ttl: Option<Duration>,
90
91    /// The TTL of provider records.
92    provider_record_ttl: Option<Duration>,
93
94    /// How long to keep connections alive when they're idle.
95    connection_idle_timeout: Duration,
96
97    /// Queued events to return when the behaviour is being polled.
98    queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>,
99
100    /// The currently known addresses of the local node.
101    local_addrs: HashSet<Multiaddr>,
102
103    /// The record storage.
104    store: TStore,
105}
106
107/// The configurable strategies for the insertion of peers
108/// and their addresses into the k-buckets of the Kademlia
109/// routing table.
110#[derive(Copy, Clone, Debug, PartialEq, Eq)]
111pub enum KademliaBucketInserts {
112    /// Whenever a connection to a peer is established as a
113    /// result of a dialing attempt and that peer is not yet
114    /// in the routing table, it is inserted as long as there
115    /// is a free slot in the corresponding k-bucket. If the
116    /// k-bucket is full but still has a free pending slot,
117    /// it may be inserted into the routing table at a later time if an unresponsive
118    /// disconnected peer is evicted from the bucket.
119    OnConnected,
120    /// New peers and addresses are only added to the routing table via
121    /// explicit calls to [`Kademlia::add_address`].
122    ///
123    /// > **Note**: Even though peers can only get into the
124    /// > routing table as a result of [`Kademlia::add_address`],
125    /// > routing table entries are still updated as peers
126    /// > connect and disconnect (i.e. the order of the entries
127    /// > as well as the network addresses).
128    Manual,
129}
130
131/// The configuration for the `Kademlia` behaviour.
132///
133/// The configuration is consumed by [`Kademlia::new`].
134#[derive(Debug, Clone)]
135pub struct KademliaConfig {
136    kbucket_pending_timeout: Duration,
137    query_config: QueryConfig,
138    protocol_config: KademliaProtocolConfig,
139    record_ttl: Option<Duration>,
140    record_replication_interval: Option<Duration>,
141    record_publication_interval: Option<Duration>,
142    provider_record_ttl: Option<Duration>,
143    provider_publication_interval: Option<Duration>,
144    connection_idle_timeout: Duration,
145    kbucket_inserts: KademliaBucketInserts,
146}
147
148impl Default for KademliaConfig {
149    fn default() -> Self {
150        KademliaConfig {
151            kbucket_pending_timeout: Duration::from_secs(60),
152            query_config: QueryConfig::default(),
153            protocol_config: Default::default(),
154            record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
155            record_replication_interval: Some(Duration::from_secs(60 * 60)),
156            record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
157            provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
158            provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
159            connection_idle_timeout: Duration::from_secs(10),
160            kbucket_inserts: KademliaBucketInserts::OnConnected,
161        }
162    }
163}
164
165impl KademliaConfig {
166    /// Sets a custom protocol name.
167    ///
168    /// Kademlia nodes only communicate with other nodes using the same protocol
169    /// name. Using a custom name therefore allows to segregate the DHT from
170    /// others, if that is desired.
171    pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
172        self.protocol_config.set_protocol_name(name);
173        self
174    }
175
176    /// Sets the timeout for a single query.
177    ///
178    /// > **Note**: A single query usually comprises at least as many requests
179    /// > as the replication factor, i.e. this is not a request timeout.
180    ///
181    /// The default is 60 seconds.
182    pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
183        self.query_config.timeout = timeout;
184        self
185    }
186
187    /// Sets the replication factor to use.
188    ///
189    /// The replication factor determines to how many closest peers
190    /// a record is replicated. The default is [`K_VALUE`].
191    pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
192        self.query_config.replication_factor = replication_factor;
193        self
194    }
195
196    /// Sets the allowed level of parallelism for iterative queries.
197    ///
198    /// The `α` parameter in the Kademlia paper. The maximum number of peers
199    /// that an iterative query is allowed to wait for in parallel while
200    /// iterating towards the closest nodes to a target. Defaults to
201    /// `ALPHA_VALUE`.
202    ///
203    /// This only controls the level of parallelism of an iterative query, not
204    /// the level of parallelism of a query to a fixed set of peers.
205    ///
206    /// When used with [`KademliaConfig::disjoint_query_paths`] it equals
207    /// the amount of disjoint paths used.
208    pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
209        self.query_config.parallelism = parallelism;
210        self
211    }
212
213    /// Require iterative queries to use disjoint paths for increased resiliency
214    /// in the presence of potentially adversarial nodes.
215    ///
216    /// When enabled the number of disjoint paths used equals the configured
217    /// parallelism.
218    ///
219    /// See the S/Kademlia paper for more information on the high level design
220    /// as well as its security improvements.
221    pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
222        self.query_config.disjoint_query_paths = enabled;
223        self
224    }
225
226    /// Sets the TTL for stored records.
227    ///
228    /// The TTL should be significantly longer than the (re-)publication
229    /// interval, to avoid premature expiration of records. The default is 36
230    /// hours.
231    ///
232    /// `None` means records never expire.
233    ///
234    /// Does not apply to provider records.
235    pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
236        self.record_ttl = record_ttl;
237        self
238    }
239
240    /// Sets the (re-)replication interval for stored records.
241    ///
242    /// Periodic replication of stored records ensures that the records
243    /// are always replicated to the available nodes closest to the key in the
244    /// context of DHT topology changes (i.e. nodes joining and leaving), thus
245    /// ensuring persistence until the record expires. Replication does not
246    /// prolong the regular lifetime of a record (for otherwise it would live
247    /// forever regardless of the configured TTL). The expiry of a record
248    /// is only extended through re-publication.
249    ///
250    /// This interval should be significantly shorter than the publication
251    /// interval, to ensure persistence between re-publications. The default
252    /// is 1 hour.
253    ///
254    /// `None` means that stored records are never re-replicated.
255    ///
256    /// Does not apply to provider records.
257    pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
258        self.record_replication_interval = interval;
259        self
260    }
261
262    /// Sets the (re-)publication interval of stored records.
263    ///
264    /// Records persist in the DHT until they expire. By default, published
265    /// records are re-published in regular intervals for as long as the record
266    /// exists in the local storage of the original publisher, thereby extending
267    /// the records lifetime.
268    ///
269    /// This interval should be significantly shorter than the record TTL, to
270    /// ensure records do not expire prematurely. The default is 24 hours.
271    ///
272    /// `None` means that stored records are never automatically re-published.
273    ///
274    /// Does not apply to provider records.
275    pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
276        self.record_publication_interval = interval;
277        self
278    }
279
280    /// Sets the TTL for provider records.
281    ///
282    /// `None` means that stored provider records never expire.
283    ///
284    /// Must be significantly larger than the provider publication interval.
285    pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
286        self.provider_record_ttl = ttl;
287        self
288    }
289
290    /// Sets the interval at which provider records for keys provided
291    /// by the local node are re-published.
292    ///
293    /// `None` means that stored provider records are never automatically
294    /// re-published.
295    ///
296    /// Must be significantly less than the provider record TTL.
297    pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
298        self.provider_publication_interval = interval;
299        self
300    }
301
302    /// Sets the amount of time to keep connections alive when they're idle.
303    pub fn set_connection_idle_timeout(&mut self, duration: Duration) -> &mut Self {
304        self.connection_idle_timeout = duration;
305        self
306    }
307
308    /// Modifies the maximum allowed size of individual Kademlia packets.
309    ///
310    /// It might be necessary to increase this value if trying to put large
311    /// records.
312    pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
313        self.protocol_config.set_max_packet_size(size);
314        self
315    }
316
317    /// Sets the k-bucket insertion strategy for the Kademlia routing table.
318    pub fn set_kbucket_inserts(&mut self, inserts: KademliaBucketInserts) -> &mut Self {
319        self.kbucket_inserts = inserts;
320        self
321    }
322}
323
324impl<TStore> Kademlia<TStore>
325where
326    for<'a> TStore: RecordStore<'a>
327{
328    /// Creates a new `Kademlia` network behaviour with a default configuration.
329    pub fn new(id: PeerId, store: TStore) -> Self {
330        Self::with_config(id, store, Default::default())
331    }
332
333    /// Get the protocol name of this kademlia instance.
334    pub fn protocol_name(&self) -> &[u8] {
335        self.protocol_config.protocol_name()
336    }
337
338    /// Creates a new `Kademlia` network behaviour with the given configuration.
339    pub fn with_config(id: PeerId, store: TStore, config: KademliaConfig) -> Self {
340        let local_key = kbucket::Key::from(id);
341
342        let put_record_job = config
343            .record_replication_interval
344            .or(config.record_publication_interval)
345            .map(|interval| PutRecordJob::new(
346                id,
347                interval,
348                config.record_publication_interval,
349                config.record_ttl,
350            ));
351
352        let add_provider_job = config
353            .provider_publication_interval
354            .map(AddProviderJob::new);
355
356        Kademlia {
357            store,
358            kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
359            kbucket_inserts: config.kbucket_inserts,
360            protocol_config: config.protocol_config,
361            queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
362            queries: QueryPool::new(config.query_config),
363            connected_peers: Default::default(),
364            add_provider_job,
365            put_record_job,
366            record_ttl: config.record_ttl,
367            provider_record_ttl: config.provider_record_ttl,
368            connection_idle_timeout: config.connection_idle_timeout,
369            local_addrs: HashSet::new()
370        }
371    }
372
373    /// Gets an iterator over immutable references to all running queries.
374    pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
375        self.queries.iter().filter_map(|query|
376            if !query.is_finished() {
377                Some(QueryRef { query })
378            } else {
379                None
380            })
381    }
382
383    /// Gets an iterator over mutable references to all running queries.
384    pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
385        self.queries.iter_mut().filter_map(|query|
386            if !query.is_finished() {
387                Some(QueryMut { query })
388            } else {
389                None
390            })
391    }
392
393    /// Gets an immutable reference to a running query, if it exists.
394    pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
395        self.queries.get(id).and_then(|query|
396            if !query.is_finished() {
397                Some(QueryRef { query })
398            } else {
399                None
400            })
401    }
402
403    /// Gets a mutable reference to a running query, if it exists.
404    pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
405        self.queries.get_mut(id).and_then(|query|
406            if !query.is_finished() {
407                Some(QueryMut { query })
408            } else {
409                None
410            })
411    }
412
413    /// Adds a known listen address of a peer participating in the DHT to the
414    /// routing table.
415    ///
416    /// Explicitly adding addresses of peers serves two purposes:
417    ///
418    ///   1. In order for a node to join the DHT, it must know about at least
419    ///      one other node of the DHT.
420    ///
421    ///   2. When a remote peer initiates a connection and that peer is not
422    ///      yet in the routing table, the `Kademlia` behaviour must be
423    ///      informed of an address on which that peer is listening for
424    ///      connections before it can be added to the routing table
425    ///      from where it can subsequently be discovered by all peers
426    ///      in the DHT.
427    ///
428    /// If the routing table has been updated as a result of this operation,
429    /// a [`KademliaEvent::RoutingUpdated`] event is emitted.
430    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
431        let key = kbucket::Key::from(*peer);
432        match self.kbuckets.entry(&key) {
433            kbucket::Entry::Present(mut entry, _) => {
434                if entry.value().insert(address) {
435                    self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
436                        KademliaEvent::RoutingUpdated {
437                            peer: *peer,
438                            addresses: entry.value().clone(),
439                            old_peer: None,
440                        }
441                    ))
442                }
443                RoutingUpdate::Success
444            }
445            kbucket::Entry::Pending(mut entry, _) => {
446                entry.value().insert(address);
447                RoutingUpdate::Pending
448            }
449            kbucket::Entry::Absent(entry) => {
450                let addresses = Addresses::new(address);
451                let status =
452                    if self.connected_peers.contains(peer) {
453                        NodeStatus::Connected
454                    } else {
455                        NodeStatus::Disconnected
456                    };
457                match entry.insert(addresses.clone(), status) {
458                    kbucket::InsertResult::Inserted => {
459                        self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
460                            KademliaEvent::RoutingUpdated {
461                                peer: *peer,
462                                addresses,
463                                old_peer: None,
464                            }
465                        ));
466                        RoutingUpdate::Success
467                    },
468                    kbucket::InsertResult::Full => {
469                        debug!("Bucket full. Peer not added to routing table: {}", peer);
470                        RoutingUpdate::Failed
471                    },
472                    kbucket::InsertResult::Pending { disconnected } => {
473                        self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
474                            peer_id: disconnected.into_preimage(),
475                            condition: DialPeerCondition::Disconnected
476                        });
477                        RoutingUpdate::Pending
478                    },
479                }
480            },
481            kbucket::Entry::SelfEntry => RoutingUpdate::Failed,
482        }
483    }
484
485    /// Removes an address of a peer from the routing table.
486    ///
487    /// If the given address is the last address of the peer in the
488    /// routing table, the peer is removed from the routing table
489    /// and `Some` is returned with a view of the removed entry.
490    /// The same applies if the peer is currently pending insertion
491    /// into the routing table.
492    ///
493    /// If the given peer or address is not in the routing table,
494    /// this is a no-op.
495    pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr)
496        -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>>
497    {
498        let key = kbucket::Key::from(*peer);
499        match self.kbuckets.entry(&key) {
500            kbucket::Entry::Present(mut entry, _) => {
501                if entry.value().remove(address).is_err() {
502                    Some(entry.remove()) // it is the last address, thus remove the peer.
503                } else {
504                    None
505                }
506            }
507            kbucket::Entry::Pending(mut entry, _) => {
508                if entry.value().remove(address).is_err() {
509                    Some(entry.remove()) // it is the last address, thus remove the peer.
510                } else {
511                    None
512                }
513            }
514            kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
515                None
516            }
517        }
518    }
519
520    /// Removes a peer from the routing table.
521    ///
522    /// Returns `None` if the peer was not in the routing table,
523    /// not even pending insertion.
524    pub fn remove_peer(&mut self, peer: &PeerId)
525        -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>>
526    {
527        let key = kbucket::Key::from(*peer);
528        match self.kbuckets.entry(&key) {
529            kbucket::Entry::Present(entry, _) => {
530                Some(entry.remove())
531            }
532            kbucket::Entry::Pending(entry, _) => {
533                Some(entry.remove())
534            }
535            kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
536                None
537            }
538        }
539    }
540
541    /// Returns an iterator over all non-empty buckets in the routing table.
542    pub fn kbuckets(&mut self)
543        -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
544    {
545        self.kbuckets.iter().filter(|b| !b.is_empty())
546    }
547
548    /// Returns the k-bucket for the distance to the given key.
549    ///
550    /// Returns `None` if the given key refers to the local key.
551    pub fn kbucket<K>(&mut self, key: K)
552        -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
553    where
554        K: Into<kbucket::Key<K>> + Clone
555    {
556        self.kbuckets.bucket(&key.into())
557    }
558
559    /// Initiates an iterative query for the closest peers to the given key.
560    ///
561    /// The result of the query is delivered in a
562    /// [`KademliaEvent::QueryResult{QueryResult::GetClosestPeers}`].
563    pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
564    where
565        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone
566    {
567        let info = QueryInfo::GetClosestPeers { key: key.clone().into() };
568        let target: kbucket::Key<K> = key.into();
569        let peers = self.kbuckets.closest_keys(&target);
570        let inner = QueryInner::new(info);
571        self.queries.add_iter_closest(target.clone(), peers, inner)
572    }
573
574    /// Performs a lookup for a record in the DHT.
575    ///
576    /// The result of this operation is delivered in a
577    /// [`KademliaEvent::QueryResult{QueryResult::GetRecord}`].
578    pub fn get_record(&mut self, key: &record::Key, quorum: Quorum) -> QueryId {
579        let quorum = quorum.eval(self.queries.config().replication_factor);
580        let mut records = Vec::with_capacity(quorum.get());
581
582        if let Some(record) = self.store.get(key) {
583            if record.is_expired(Instant::now()) {
584                self.store.remove(key)
585            } else {
586                records.push(PeerRecord{ peer: None, record: record.into_owned()});
587            }
588        }
589
590        let done = records.len() >= quorum.get();
591        let target = kbucket::Key::new(key.clone());
592        let info = QueryInfo::GetRecord { key: key.clone(), records, quorum, cache_at: None };
593        let peers = self.kbuckets.closest_keys(&target);
594        let inner = QueryInner::new(info);
595        let id = self.queries.add_iter_closest(target.clone(), peers, inner); // (*)
596
597        // Instantly finish the query if we already have enough records.
598        if done {
599            self.queries.get_mut(&id).expect("by (*)").finish();
600        }
601
602        id
603    }
604
605    /// Stores a record in the DHT.
606    ///
607    /// Returns `Ok` if a record has been stored locally, providing the
608    /// `QueryId` of the initial query that replicates the record in the DHT.
609    /// The result of the query is eventually reported as a
610    /// [`KademliaEvent::QueryResult{QueryResult::PutRecord}`].
611    ///
612    /// The record is always stored locally with the given expiration. If the record's
613    /// expiration is `None`, the common case, it does not expire in local storage
614    /// but is still replicated with the configured record TTL. To remove the record
615    /// locally and stop it from being re-published in the DHT, see [`Kademlia::remove_record`].
616    ///
617    /// After the initial publication of the record, it is subject to (re-)replication
618    /// and (re-)publication as per the configured intervals. Periodic (re-)publication
619    /// does not update the record's expiration in local storage, thus a given record
620    /// with an explicit expiration will always expire at that instant and until then
621    /// is subject to regular (re-)replication and (re-)publication.
622    pub fn put_record(&mut self, mut record: Record, quorum: Quorum) -> Result<QueryId, store::Error> {
623        record.publisher = Some(*self.kbuckets.local_key().preimage());
624        self.store.put(record.clone())?;
625        record.expires = record.expires.or_else(||
626            self.record_ttl.map(|ttl| Instant::now() + ttl));
627        let quorum = quorum.eval(self.queries.config().replication_factor);
628        let target = kbucket::Key::new(record.key.clone());
629        let peers = self.kbuckets.closest_keys(&target);
630        let context = PutRecordContext::Publish;
631        let info = QueryInfo::PutRecord {
632            context,
633            record,
634            quorum,
635            phase: PutRecordPhase::GetClosestPeers
636        };
637        let inner = QueryInner::new(info);
638        Ok(self.queries.add_iter_closest(target.clone(), peers, inner))
639    }
640
641    /// Removes the record with the given key from _local_ storage,
642    /// if the local node is the publisher of the record.
643    ///
644    /// Has no effect if a record for the given key is stored locally but
645    /// the local node is not a publisher of the record.
646    ///
647    /// This is a _local_ operation. However, it also has the effect that
648    /// the record will no longer be periodically re-published, allowing the
649    /// record to eventually expire throughout the DHT.
650    pub fn remove_record(&mut self, key: &record::Key) {
651        if let Some(r) = self.store.get(key) {
652            if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
653                self.store.remove(key)
654            }
655        }
656    }
657
658    /// Gets a mutable reference to the record store.
659    pub fn store_mut(&mut self) -> &mut TStore {
660        &mut self.store
661    }
662
663    /// Bootstraps the local node to join the DHT.
664    ///
665    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
666    /// own ID in the DHT. This introduces the local node to the other nodes
667    /// in the DHT and populates its routing table with the closest neighbours.
668    ///
669    /// Subsequently, all buckets farther from the bucket of the closest neighbour are
670    /// refreshed by initiating an additional bootstrapping query for each such
671    /// bucket with random keys.
672    ///
673    /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
674    /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
675    /// reported via [`KademliaEvent::QueryResult{QueryResult::Bootstrap}`] events,
676    /// with one such event per bootstrapping query.
677    ///
678    /// Returns `Err` if bootstrapping is impossible due an empty routing table.
679    ///
680    /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
681    /// > See [`Kademlia::add_address`].
682    pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
683        let local_key = self.kbuckets.local_key().clone();
684        let info = QueryInfo::Bootstrap {
685            peer: *local_key.preimage(),
686            remaining: None
687        };
688        let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
689        if peers.is_empty() {
690            Err(NoKnownPeers())
691        } else {
692            let inner = QueryInner::new(info);
693            Ok(self.queries.add_iter_closest(local_key, peers, inner))
694        }
695    }
696
697    /// Establishes the local node as a provider of a value for the given key.
698    ///
699    /// This operation publishes a provider record with the given key and
700    /// identity of the local node to the peers closest to the key, thus establishing
701    /// the local node as a provider.
702    ///
703    /// Returns `Ok` if a provider record has been stored locally, providing the
704    /// `QueryId` of the initial query that announces the local node as a provider.
705    ///
706    /// The publication of the provider records is periodically repeated as per the
707    /// configured interval, to renew the expiry and account for changes to the DHT
708    /// topology. A provider record may be removed from local storage and
709    /// thus no longer re-published by calling [`Kademlia::stop_providing`].
710    ///
711    /// In contrast to the standard Kademlia push-based model for content distribution
712    /// implemented by [`Kademlia::put_record`], the provider API implements a
713    /// pull-based model that may be used in addition or as an alternative.
714    /// The means by which the actual value is obtained from a provider is out of scope
715    /// of the libp2p Kademlia provider API.
716    ///
717    /// The results of the (repeated) provider announcements sent by this node are
718    /// reported via [`KademliaEvent::QueryResult{QueryResult::StartProviding}`].
719    pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
720        // Note: We store our own provider records locally without local addresses
721        // to avoid redundant storage and outdated addresses. Instead these are
722        // acquired on demand when returning a `ProviderRecord` for the local node.
723        let local_addrs = Vec::new();
724        let record = ProviderRecord::new(
725            key.clone(),
726            *self.kbuckets.local_key().preimage(),
727            local_addrs);
728        self.store.add_provider(record)?;
729        let target = kbucket::Key::new(key.clone());
730        let peers = self.kbuckets.closest_keys(&target);
731        let context = AddProviderContext::Publish;
732        let info = QueryInfo::AddProvider {
733            context,
734            key,
735            phase: AddProviderPhase::GetClosestPeers
736        };
737        let inner = QueryInner::new(info);
738        let id = self.queries.add_iter_closest(target.clone(), peers, inner);
739        Ok(id)
740    }
741
742    /// Stops the local node from announcing that it is a provider for the given key.
743    ///
744    /// This is a local operation. The local node will still be considered as a
745    /// provider for the key by other nodes until these provider records expire.
746    pub fn stop_providing(&mut self, key: &record::Key) {
747        self.store.remove_provider(key, self.kbuckets.local_key().preimage());
748    }
749
750    /// Performs a lookup for providers of a value to the given key.
751    ///
752    /// The result of this operation is delivered in a
753    /// reported via [`KademliaEvent::QueryResult{QueryResult::GetProviders}`].
754    pub fn get_providers(&mut self, key: record::Key) -> QueryId {
755        let info = QueryInfo::GetProviders {
756            key: key.clone(),
757            providers: HashSet::new(),
758        };
759        let target = kbucket::Key::new(key);
760        let peers = self.kbuckets.closest_keys(&target);
761        let inner = QueryInner::new(info);
762        self.queries.add_iter_closest(target.clone(), peers, inner)
763    }
764
765    /// Processes discovered peers from a successful request in an iterative `Query`.
766    fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
767    where
768        I: Iterator<Item = &'a KadPeer> + Clone
769    {
770        let local_id = self.kbuckets.local_key().preimage();
771        let others_iter = peers.filter(|p| &p.node_id != local_id);
772        if let Some(query) = self.queries.get_mut(query_id) {
773            log::trace!("Request to {:?} in query {:?} succeeded.", source, query_id);
774            for peer in others_iter.clone() {
775                log::trace!("Peer {:?} reported by {:?} in query {:?}.",
776                            peer, source, query_id);
777                let addrs = peer.multiaddrs.iter().cloned().collect();
778                query.inner.addresses.insert(peer.node_id, addrs);
779            }
780            query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
781        }
782    }
783
784    /// Finds the closest peers to a `target` in the context of a request by
785    /// the `source` peer, such that the `source` peer is never included in the
786    /// result.
787    fn find_closest<T: Clone>(&mut self, target: &kbucket::Key<T>, source: &PeerId) -> Vec<KadPeer> {
788        if target == self.kbuckets.local_key() {
789            Vec::new()
790        } else {
791            self.kbuckets
792                .closest(target)
793                .filter(|e| e.node.key.preimage() != source)
794                .take(self.queries.config().replication_factor.get())
795                .map(KadPeer::from)
796                .collect()
797        }
798    }
799
800    /// Collects all peers who are known to be providers of the value for a given `Multihash`.
801    fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
802        let kbuckets = &mut self.kbuckets;
803        let connected = &mut self.connected_peers;
804        let local_addrs = &self.local_addrs;
805        self.store.providers(key)
806            .into_iter()
807            .filter_map(move |p|
808                if &p.provider != source {
809                    let node_id = p.provider;
810                    let multiaddrs = p.addresses;
811                    let connection_ty = if connected.contains(&node_id) {
812                        KadConnectionType::Connected
813                    } else {
814                        KadConnectionType::NotConnected
815                    };
816                    if multiaddrs.is_empty() {
817                        // The provider is either the local node and we fill in
818                        // the local addresses on demand, or it is a legacy
819                        // provider record without addresses, in which case we
820                        // try to find addresses in the routing table, as was
821                        // done before provider records were stored along with
822                        // their addresses.
823                        if &node_id == kbuckets.local_key().preimage() {
824                            Some(local_addrs.iter().cloned().collect::<Vec<_>>())
825                        } else {
826                            let key = kbucket::Key::from(node_id);
827                            kbuckets.entry(&key).view().map(|e| e.node.value.clone().into_vec())
828                        }
829                    } else {
830                        Some(multiaddrs)
831                    }
832                    .map(|multiaddrs| {
833                        KadPeer {
834                            node_id,
835                            multiaddrs,
836                            connection_ty,
837                        }
838                    })
839                } else {
840                    None
841                })
842            .take(self.queries.config().replication_factor.get())
843            .collect()
844    }
845
846    /// Starts an iterative `ADD_PROVIDER` query for the given key.
847    fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
848        let info = QueryInfo::AddProvider {
849            context,
850            key: key.clone(),
851            phase: AddProviderPhase::GetClosestPeers
852        };
853        let target = kbucket::Key::new(key);
854        let peers = self.kbuckets.closest_keys(&target);
855        let inner = QueryInner::new(info);
856        self.queries.add_iter_closest(target.clone(), peers, inner);
857    }
858
859    /// Starts an iterative `PUT_VALUE` query for the given record.
860    fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
861        let quorum = quorum.eval(self.queries.config().replication_factor);
862        let target = kbucket::Key::new(record.key.clone());
863        let peers = self.kbuckets.closest_keys(&target);
864        let info = QueryInfo::PutRecord {
865            record, quorum, context, phase: PutRecordPhase::GetClosestPeers
866        };
867        let inner = QueryInner::new(info);
868        self.queries.add_iter_closest(target.clone(), peers, inner);
869    }
870
871    /// Updates the routing table with a new connection status and address of a peer.
872    fn connection_updated(&mut self, peer: PeerId, address: Option<Multiaddr>, new_status: NodeStatus) {
873        let key = kbucket::Key::from(peer);
874        match self.kbuckets.entry(&key) {
875            kbucket::Entry::Present(mut entry, old_status) => {
876                if let Some(address) = address {
877                    if entry.value().insert(address) {
878                        self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
879                            KademliaEvent::RoutingUpdated {
880                                peer,
881                                addresses: entry.value().clone(),
882                                old_peer: None,
883                            }
884                        ))
885                    }
886                }
887                if old_status != new_status {
888                    entry.update(new_status);
889                }
890            },
891
892            kbucket::Entry::Pending(mut entry, old_status) => {
893                if let Some(address) = address {
894                    entry.value().insert(address);
895                }
896                if old_status != new_status {
897                    entry.update(new_status);
898                }
899            },
900
901            kbucket::Entry::Absent(entry) => {
902                // Only connected nodes with a known address are newly inserted.
903                if new_status != NodeStatus::Connected {
904                    return
905                }
906                match (address, self.kbucket_inserts) {
907                    (None, _) => {
908                        self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
909                            KademliaEvent::UnroutablePeer { peer }
910                        ));
911                    }
912                    (Some(a), KademliaBucketInserts::Manual) => {
913                        self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
914                            KademliaEvent::RoutablePeer { peer, address: a }
915                        ));
916                    }
917                    (Some(a), KademliaBucketInserts::OnConnected) => {
918                        let addresses = Addresses::new(a);
919                        match entry.insert(addresses.clone(), new_status) {
920                            kbucket::InsertResult::Inserted => {
921                                let event = KademliaEvent::RoutingUpdated {
922                                    peer,
923                                    addresses,
924                                    old_peer: None,
925                                };
926                                self.queued_events.push_back(
927                                    NetworkBehaviourAction::GenerateEvent(event));
928                            },
929                            kbucket::InsertResult::Full => {
930                                debug!("Bucket full. Peer not added to routing table: {}", peer);
931                                let address = addresses.first().clone();
932                                self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
933                                    KademliaEvent::RoutablePeer { peer, address }
934                                ));
935                            },
936                            kbucket::InsertResult::Pending { disconnected } => {
937                                debug_assert!(!self.connected_peers.contains(disconnected.preimage()));
938                                let address = addresses.first().clone();
939                                self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
940                                    KademliaEvent::PendingRoutablePeer { peer, address }
941                                ));
942                                self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
943                                    peer_id: disconnected.into_preimage(),
944                                    condition: DialPeerCondition::Disconnected
945                                })
946                            },
947                        }
948                    }
949                }
950            },
951            _ => {}
952        }
953    }
954
955    /// Handles a finished (i.e. successful) query.
956    fn query_finished(&mut self, q: Query<QueryInner>, params: &mut impl PollParameters)
957        -> Option<KademliaEvent>
958    {
959        let query_id = q.id();
960        log::trace!("Query {:?} finished.", query_id);
961        let result = q.into_result();
962        match result.inner.info {
963            QueryInfo::Bootstrap { peer, remaining } => {
964                let local_key = self.kbuckets.local_key().clone();
965                let mut remaining = remaining.unwrap_or_else(|| {
966                    debug_assert_eq!(&peer, local_key.preimage());
967                    // The lookup for the local key finished. To complete the bootstrap process,
968                    // a bucket refresh should be performed for every bucket farther away than
969                    // the first non-empty bucket (which are most likely no more than the last
970                    // few, i.e. farthest, buckets).
971                    self.kbuckets.iter()
972                        .skip_while(|b| b.is_empty())
973                        .skip(1) // Skip the bucket with the closest neighbour.
974                        .map(|b| {
975                            // Try to find a key that falls into the bucket. While such keys can
976                            // be generated fully deterministically, the current libp2p kademlia
977                            // wire protocol requires transmission of the preimages of the actual
978                            // keys in the DHT keyspace, hence for now this is just a "best effort"
979                            // to find a key that hashes into a specific bucket. The probabilities
980                            // of finding a key in the bucket `b` with as most 16 trials are as
981                            // follows:
982                            //
983                            // Pr(bucket-255) = 1 - (1/2)^16   ~= 1
984                            // Pr(bucket-254) = 1 - (3/4)^16   ~= 1
985                            // Pr(bucket-253) = 1 - (7/8)^16   ~= 0.88
986                            // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
987                            // ...
988                            let mut target = kbucket::Key::from(PeerId::random());
989                            for _ in 0 .. 16 {
990                                let d = local_key.distance(&target);
991                                if b.contains(&d) {
992                                    break;
993                                }
994                                target = kbucket::Key::from(PeerId::random());
995                            }
996                            target
997                        }).collect::<Vec<_>>().into_iter()
998                });
999
1000                let num_remaining = remaining.len().saturating_sub(1) as u32;
1001
1002                if let Some(target) = remaining.next() {
1003                    let info = QueryInfo::Bootstrap {
1004                        peer: target.clone().into_preimage(),
1005                        remaining: Some(remaining)
1006                    };
1007                    let peers = self.kbuckets.closest_keys(&target);
1008                    let inner = QueryInner::new(info);
1009                    self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
1010                }
1011
1012                Some(KademliaEvent::QueryResult {
1013                    id: query_id,
1014                    stats: result.stats,
1015                    result: QueryResult::Bootstrap(Ok(BootstrapOk { peer, num_remaining }))
1016                })
1017            }
1018
1019            QueryInfo::GetClosestPeers { key, .. } => {
1020                Some(KademliaEvent::QueryResult {
1021                    id: query_id,
1022                    stats: result.stats,
1023                    result: QueryResult::GetClosestPeers(Ok(
1024                        GetClosestPeersOk { key, peers: result.peers.collect() }
1025                    ))
1026                })
1027            }
1028
1029            QueryInfo::GetProviders { key, providers } => {
1030                Some(KademliaEvent::QueryResult {
1031                    id: query_id,
1032                    stats: result.stats,
1033                    result: QueryResult::GetProviders(Ok(
1034                        GetProvidersOk {
1035                            key,
1036                            providers,
1037                            closest_peers: result.peers.collect()
1038                        }
1039                    ))
1040                })
1041            }
1042
1043            QueryInfo::AddProvider {
1044                context,
1045                key,
1046                phase: AddProviderPhase::GetClosestPeers
1047            } => {
1048                let provider_id = *params.local_peer_id();
1049                let external_addresses = params.external_addresses().map(|r| r.addr).collect();
1050                let inner = QueryInner::new(QueryInfo::AddProvider {
1051                    context,
1052                    key,
1053                    phase: AddProviderPhase::AddProvider {
1054                        provider_id,
1055                        external_addresses,
1056                        get_closest_peers_stats: result.stats
1057                    }
1058                });
1059                self.queries.continue_fixed(query_id, result.peers, inner);
1060                None
1061            }
1062
1063            QueryInfo::AddProvider {
1064                context,
1065                key,
1066                phase: AddProviderPhase::AddProvider { get_closest_peers_stats, .. }
1067            } => {
1068                match context {
1069                    AddProviderContext::Publish => {
1070                        Some(KademliaEvent::QueryResult {
1071                            id: query_id,
1072                            stats: get_closest_peers_stats.merge(result.stats),
1073                            result: QueryResult::StartProviding(Ok(AddProviderOk { key }))
1074                        })
1075                    }
1076                    AddProviderContext::Republish => {
1077                        Some(KademliaEvent::QueryResult {
1078                            id: query_id,
1079                            stats: get_closest_peers_stats.merge(result.stats),
1080                            result: QueryResult::RepublishProvider(Ok(AddProviderOk { key }))
1081                        })
1082                    }
1083                }
1084            }
1085
1086            QueryInfo::GetRecord { key, records, quorum, cache_at } => {
1087                let results = if records.len() >= quorum.get() { // [not empty]
1088                    if let Some(cache_key) = cache_at {
1089                        // Cache the record at the closest node to the key that
1090                        // did not return the record.
1091                        let record = records.first().expect("[not empty]").record.clone();
1092                        let quorum = NonZeroUsize::new(1).expect("1 > 0");
1093                        let context = PutRecordContext::Cache;
1094                        let info = QueryInfo::PutRecord {
1095                            context,
1096                            record,
1097                            quorum,
1098                            phase: PutRecordPhase::PutRecord {
1099                                success: vec![],
1100                                get_closest_peers_stats: QueryStats::empty()
1101                            }
1102                        };
1103                        let inner = QueryInner::new(info);
1104                        self.queries.add_fixed(iter::once(cache_key.into_preimage()), inner);
1105                    }
1106                    Ok(GetRecordOk { records })
1107                } else if records.is_empty() {
1108                    Err(GetRecordError::NotFound {
1109                        key,
1110                        closest_peers: result.peers.collect()
1111                    })
1112                } else {
1113                    Err(GetRecordError::QuorumFailed { key, records, quorum })
1114                };
1115                Some(KademliaEvent::QueryResult {
1116                    id: query_id,
1117                    stats: result.stats,
1118                    result: QueryResult::GetRecord(results)
1119                })
1120            }
1121
1122            QueryInfo::PutRecord {
1123                context,
1124                record,
1125                quorum,
1126                phase: PutRecordPhase::GetClosestPeers
1127            } => {
1128                let info = QueryInfo::PutRecord {
1129                    context,
1130                    record,
1131                    quorum,
1132                    phase: PutRecordPhase::PutRecord {
1133                        success: vec![],
1134                        get_closest_peers_stats: result.stats
1135                    }
1136                };
1137                let inner = QueryInner::new(info);
1138                self.queries.continue_fixed(query_id, result.peers, inner);
1139                None
1140            }
1141
1142            QueryInfo::PutRecord {
1143                context,
1144                record,
1145                quorum,
1146                phase: PutRecordPhase::PutRecord { success, get_closest_peers_stats }
1147            } => {
1148                let mk_result = |key: record::Key| {
1149                    if success.len() >= quorum.get() {
1150                        Ok(PutRecordOk { key })
1151                    } else {
1152                        Err(PutRecordError::QuorumFailed { key, quorum, success })
1153                    }
1154                };
1155                match context {
1156                    PutRecordContext::Publish =>
1157                        Some(KademliaEvent::QueryResult {
1158                            id: query_id,
1159                            stats: get_closest_peers_stats.merge(result.stats),
1160                            result: QueryResult::PutRecord(mk_result(record.key))
1161                        }),
1162                    PutRecordContext::Republish =>
1163                        Some(KademliaEvent::QueryResult {
1164                            id: query_id,
1165                            stats: get_closest_peers_stats.merge(result.stats),
1166                            result: QueryResult::RepublishRecord(mk_result(record.key))
1167                        }),
1168                    PutRecordContext::Replicate => {
1169                        debug!("Record replicated: {:?}", record.key);
1170                        None
1171                    }
1172                    PutRecordContext::Cache => {
1173                        debug!("Record cached: {:?}", record.key);
1174                        None
1175                    }
1176                }
1177            }
1178        }
1179    }
1180
1181    /// Handles a query that timed out.
1182    fn query_timeout(&mut self, query: Query<QueryInner>) -> Option<KademliaEvent> {
1183        let query_id = query.id();
1184        log::trace!("Query {:?} timed out.", query_id);
1185        let result = query.into_result();
1186        match result.inner.info {
1187            QueryInfo::Bootstrap { peer, mut remaining } => {
1188                let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1189
1190                if let Some(mut remaining) = remaining.take() {
1191                    // Continue with the next bootstrap query if `remaining` is not empty.
1192                    if let Some(target) = remaining.next() {
1193                        let info = QueryInfo::Bootstrap {
1194                            peer: target.clone().into_preimage(),
1195                            remaining: Some(remaining)
1196                        };
1197                        let peers = self.kbuckets.closest_keys(&target);
1198                        let inner = QueryInner::new(info);
1199                        self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
1200                    }
1201                }
1202
1203                Some(KademliaEvent::QueryResult {
1204                    id: query_id,
1205                    stats: result.stats,
1206                    result: QueryResult::Bootstrap(Err(
1207                        BootstrapError::Timeout { peer, num_remaining }
1208                    ))
1209                })
1210            }
1211
1212            QueryInfo::AddProvider { context, key, .. } =>
1213                Some(match context {
1214                    AddProviderContext::Publish =>
1215                        KademliaEvent::QueryResult {
1216                            id: query_id,
1217                            stats: result.stats,
1218                            result: QueryResult::StartProviding(Err(
1219                                AddProviderError::Timeout { key }
1220                            ))
1221                        },
1222                    AddProviderContext::Republish =>
1223                        KademliaEvent::QueryResult {
1224                            id: query_id,
1225                            stats: result.stats,
1226                            result: QueryResult::RepublishProvider(Err(
1227                                AddProviderError::Timeout { key }
1228                            ))
1229                        }
1230                }),
1231
1232            QueryInfo::GetClosestPeers { key } => {
1233                Some(KademliaEvent::QueryResult {
1234                    id: query_id,
1235                    stats: result.stats,
1236                    result: QueryResult::GetClosestPeers(Err(
1237                        GetClosestPeersError::Timeout {
1238                            key,
1239                            peers: result.peers.collect()
1240                        }
1241                    ))
1242                })
1243            },
1244
1245            QueryInfo::PutRecord { record, quorum, context, phase } => {
1246                let err = Err(PutRecordError::Timeout {
1247                    key: record.key,
1248                    quorum,
1249                    success: match phase {
1250                        PutRecordPhase::GetClosestPeers => vec![],
1251                        PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1252                    }
1253                });
1254                match context {
1255                    PutRecordContext::Publish =>
1256                        Some(KademliaEvent::QueryResult {
1257                            id: query_id,
1258                            stats: result.stats,
1259                            result: QueryResult::PutRecord(err)
1260                        }),
1261                    PutRecordContext::Republish =>
1262                        Some(KademliaEvent::QueryResult {
1263                            id: query_id,
1264                            stats: result.stats,
1265                            result: QueryResult::RepublishRecord(err)
1266                        }),
1267                    PutRecordContext::Replicate => match phase {
1268                        PutRecordPhase::GetClosestPeers => {
1269                            warn!("Locating closest peers for replication failed: {:?}", err);
1270                            None
1271                        }
1272                        PutRecordPhase::PutRecord { .. } => {
1273                            debug!("Replicating record failed: {:?}", err);
1274                            None
1275                        }
1276                    }
1277                    PutRecordContext::Cache => match phase {
1278                        PutRecordPhase::GetClosestPeers => {
1279                            // Caching a record at the closest peer to a key that did not return
1280                            // a record is never preceded by a lookup for the closest peers, i.e.
1281                            // it is a direct query to a single peer.
1282                            unreachable!()
1283                        }
1284                        PutRecordPhase::PutRecord { .. } => {
1285                            debug!("Caching record failed: {:?}", err);
1286                            None
1287                        }
1288                    }
1289                }
1290            }
1291
1292            QueryInfo::GetRecord { key, records, quorum, .. } =>
1293                Some(KademliaEvent::QueryResult {
1294                    id: query_id,
1295                    stats: result.stats,
1296                    result: QueryResult::GetRecord(Err(
1297                        GetRecordError::Timeout { key, records, quorum },
1298                    ))
1299                }),
1300
1301            QueryInfo::GetProviders { key, providers } =>
1302                Some(KademliaEvent::QueryResult {
1303                    id: query_id,
1304                    stats: result.stats,
1305                    result: QueryResult::GetProviders(Err(
1306                        GetProvidersError::Timeout {
1307                            key,
1308                            providers,
1309                            closest_peers: result.peers.collect()
1310                        }
1311                    ))
1312                })
1313            }
1314    }
1315
1316    /// Processes a record received from a peer.
1317    fn record_received(
1318        &mut self,
1319        source: PeerId,
1320        connection: ConnectionId,
1321        request_id: KademliaRequestId,
1322        mut record: Record
1323    ) {
1324        if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1325            // If the (alleged) publisher is the local node, do nothing. The record of
1326            // the original publisher should never change as a result of replication
1327            // and the publisher is always assumed to have the "right" value.
1328            self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1329                peer_id: source,
1330                handler: NotifyHandler::One(connection),
1331                event: KademliaHandlerIn::PutRecordRes {
1332                    key: record.key,
1333                    value: record.value,
1334                    request_id,
1335                },
1336            });
1337            return
1338        }
1339
1340        let now = Instant::now();
1341
1342        // Calculate the expiration exponentially inversely proportional to the
1343        // number of nodes between the local node and the closest node to the key
1344        // (beyond the replication factor). This ensures avoiding over-caching
1345        // outside of the k closest nodes to a key.
1346        let target = kbucket::Key::new(record.key.clone());
1347        let num_between = self.kbuckets.count_nodes_between(&target);
1348        let k = self.queries.config().replication_factor.get();
1349        let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1350        let expiration = self.record_ttl.map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1351        // The smaller TTL prevails. Only if neither TTL is set is the record
1352        // stored "forever".
1353        record.expires = record.expires.or(expiration).min(expiration);
1354
1355        if let Some(job) = self.put_record_job.as_mut() {
1356            // Ignore the record in the next run of the replication
1357            // job, since we can assume the sender replicated the
1358            // record to the k closest peers. Effectively, only
1359            // one of the k closest peers performs a replication
1360            // in the configured interval, assuming a shared interval.
1361            job.skip(record.key.clone())
1362        }
1363
1364        // While records received from a publisher, as well as records that do
1365        // not exist locally should always (attempted to) be stored, there is a
1366        // choice here w.r.t. the handling of replicated records whose keys refer
1367        // to records that exist locally: The value and / or the publisher may
1368        // either be overridden or left unchanged. At the moment and in the
1369        // absence of a decisive argument for another option, both are always
1370        // overridden as it avoids having to load the existing record in the
1371        // first place.
1372
1373        if !record.is_expired(now) {
1374            // The record is cloned because of the weird libp2p protocol
1375            // requirement to send back the value in the response, although this
1376            // is a waste of resources.
1377            match self.store.put(record.clone()) {
1378                Ok(()) => debug!("Record stored: {:?}; {} bytes", record.key, record.value.len()),
1379                Err(e) => {
1380                    info!("Record not stored: {:?}", e);
1381                    self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1382                        peer_id: source,
1383                        handler: NotifyHandler::One(connection),
1384                        event: KademliaHandlerIn::Reset(request_id)
1385                    });
1386
1387                    return
1388                }
1389            }
1390        }
1391
1392        // The remote receives a [`KademliaHandlerIn::PutRecordRes`] even in the
1393        // case where the record is discarded due to being expired. Given that
1394        // the remote sent the local node a [`KademliaHandlerEvent::PutRecord`]
1395        // request, the remote perceives the local node as one node among the k
1396        // closest nodes to the target. In addition returning
1397        // [`KademliaHandlerIn::PutRecordRes`] does not reveal any internal
1398        // information to a possibly malicious remote node.
1399        self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1400            peer_id: source,
1401            handler: NotifyHandler::One(connection),
1402            event: KademliaHandlerIn::PutRecordRes {
1403                key: record.key,
1404                value: record.value,
1405                request_id,
1406            },
1407        })
1408    }
1409
1410    /// Processes a provider record received from a peer.
1411    fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1412        if &provider.node_id != self.kbuckets.local_key().preimage() {
1413            let record = ProviderRecord {
1414                key,
1415                provider: provider.node_id,
1416                expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1417                addresses: provider.multiaddrs,
1418            };
1419            if let Err(e) = self.store.add_provider(record) {
1420                info!("Provider record not stored: {:?}", e);
1421            }
1422        }
1423    }
1424}
1425
1426/// Exponentially decrease the given duration (base 2).
1427fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
1428    Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
1429}
1430
1431impl<TStore> NetworkBehaviour for Kademlia<TStore>
1432where
1433    for<'a> TStore: RecordStore<'a>,
1434    TStore: Send + 'static,
1435{
1436    type ProtocolsHandler = KademliaHandlerProto<QueryId>;
1437    type OutEvent = KademliaEvent;
1438
1439    fn new_handler(&mut self) -> Self::ProtocolsHandler {
1440        KademliaHandlerProto::new(KademliaHandlerConfig {
1441            protocol_config: self.protocol_config.clone(),
1442            allow_listening: true,
1443            idle_timeout: self.connection_idle_timeout,
1444        })
1445    }
1446
1447    fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
1448        // We should order addresses from decreasing likelyhood of connectivity, so start with
1449        // the addresses of that peer in the k-buckets.
1450        let key = kbucket::Key::from(*peer_id);
1451        let mut peer_addrs =
1452            if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
1453                let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
1454                debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
1455                addrs
1456            } else {
1457                Vec::new()
1458            };
1459
1460        // We add to that a temporary list of addresses from the ongoing queries.
1461        for query in self.queries.iter() {
1462            if let Some(addrs) = query.inner.addresses.get(peer_id) {
1463                peer_addrs.extend(addrs.iter().cloned())
1464            }
1465        }
1466
1467        peer_addrs
1468    }
1469
1470    fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {
1471        // When a connection is established, we don't know yet whether the
1472        // remote supports the configured protocol name. Only once a connection
1473        // handler reports [`KademliaHandlerEvent::ProtocolConfirmed`] do we
1474        // update the local routing table.
1475    }
1476
1477    fn inject_connected(&mut self, peer: &PeerId) {
1478        // Queue events for sending pending RPCs to the connected peer.
1479        // There can be only one pending RPC for a particular peer and query per definition.
1480        for (peer_id, event) in self.queries.iter_mut().filter_map(|q|
1481            q.inner.pending_rpcs.iter()
1482                .position(|(p, _)| p == peer)
1483                .map(|p| q.inner.pending_rpcs.remove(p)))
1484        {
1485            self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1486                peer_id, event, handler: NotifyHandler::Any
1487            });
1488        }
1489
1490        self.connected_peers.insert(*peer);
1491    }
1492
1493    fn inject_address_change(
1494        &mut self,
1495        peer: &PeerId,
1496        _: &ConnectionId,
1497        old: &ConnectedPoint,
1498        new: &ConnectedPoint
1499    ) {
1500        let (old, new) = (old.get_remote_address(), new.get_remote_address());
1501
1502        // Update routing table.
1503        if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::from(*peer)).value() {
1504            if addrs.replace(old, new) {
1505                debug!("Address '{}' replaced with '{}' for peer '{}'.", old, new, peer);
1506            } else {
1507                debug!(
1508                    "Address '{}' not replaced with '{}' for peer '{}' as old address wasn't \
1509                     present.",
1510                    old, new, peer,
1511                );
1512            }
1513        } else {
1514            debug!(
1515                "Address '{}' not replaced with '{}' for peer '{}' as peer is not present in the \
1516                 routing table.",
1517                old, new, peer,
1518            );
1519        }
1520
1521        // Update query address cache.
1522        //
1523        // Given two connected nodes: local node A and remote node B. Say node B
1524        // is not in node A's routing table. Additionally node B is part of the
1525        // `QueryInner::addresses` list of an ongoing query on node A. Say Node
1526        // B triggers an address change and then disconnects. Later on the
1527        // earlier mentioned query on node A would like to connect to node B.
1528        // Without replacing the address in the `QueryInner::addresses` set node
1529        // A would attempt to dial the old and not the new address.
1530        //
1531        // While upholding correctness, iterating through all discovered
1532        // addresses of a peer in all currently ongoing queries might have a
1533        // large performance impact. If so, the code below might be worth
1534        // revisiting.
1535        for query in self.queries.iter_mut() {
1536            if let Some(addrs) = query.inner.addresses.get_mut(peer) {
1537                for addr in addrs.iter_mut() {
1538                    if addr == old {
1539                        *addr = new.clone();
1540                    }
1541                }
1542            }
1543        }
1544    }
1545
1546    fn inject_addr_reach_failure(
1547        &mut self,
1548        peer_id: Option<&PeerId>,
1549        addr: &Multiaddr,
1550        err: &dyn error::Error
1551    ) {
1552        if let Some(peer_id) = peer_id {
1553            let key = kbucket::Key::from(*peer_id);
1554
1555            if let Some(addrs) = self.kbuckets.entry(&key).value() {
1556                // TODO: Ideally, the address should only be removed if the error can
1557                // be classified as "permanent" but since `err` is currently a borrowed
1558                // trait object without a `'static` bound, even downcasting for inspection
1559                // of the error is not possible (and also not truly desirable or ergonomic).
1560                // The error passed in should rather be a dedicated enum.
1561                if addrs.remove(addr).is_ok() {
1562                    debug!("Address '{}' removed from peer '{}' due to error: {}.",
1563                        addr, peer_id, err);
1564                } else {
1565                    // Despite apparently having no reachable address (any longer),
1566                    // the peer is kept in the routing table with the last address to avoid
1567                    // (temporary) loss of network connectivity to "flush" the routing
1568                    // table. Once in, a peer is only removed from the routing table
1569                    // if it is the least recently connected peer, currently disconnected
1570                    // and is unreachable in the context of another peer pending insertion
1571                    // into the same bucket. This is handled transparently by the
1572                    // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
1573                    // within `Kademlia::poll`.
1574                    debug!("Last remaining address '{}' of peer '{}' is unreachable: {}.",
1575                        addr, peer_id, err)
1576                }
1577            }
1578
1579            for query in self.queries.iter_mut() {
1580                if let Some(addrs) = query.inner.addresses.get_mut(peer_id) {
1581                    addrs.retain(|a| a != addr);
1582                }
1583            }
1584        }
1585    }
1586
1587    fn inject_dial_failure(&mut self, peer_id: &PeerId) {
1588        for query in self.queries.iter_mut() {
1589            query.on_failure(peer_id);
1590        }
1591    }
1592
1593    fn inject_disconnected(&mut self, id: &PeerId) {
1594        for query in self.queries.iter_mut() {
1595            query.on_failure(id);
1596        }
1597        self.connection_updated(*id, None, NodeStatus::Disconnected);
1598        self.connected_peers.remove(id);
1599    }
1600
1601    fn inject_event(
1602        &mut self,
1603        source: PeerId,
1604        connection: ConnectionId,
1605        event: KademliaHandlerEvent<QueryId>
1606    ) {
1607        match event {
1608            KademliaHandlerEvent::ProtocolConfirmed { endpoint } => {
1609                debug_assert!(self.connected_peers.contains(&source));
1610                // The remote's address can only be put into the routing table,
1611                // and thus shared with other nodes, if the local node is the dialer,
1612                // since the remote address on an inbound connection may be specific
1613                // to that connection (e.g. typically the TCP port numbers).
1614                let address = match endpoint {
1615                    ConnectedPoint::Dialer { address } => Some(address),
1616                    ConnectedPoint::Listener { .. } => None,
1617                };
1618                self.connection_updated(source, address, NodeStatus::Connected);
1619            }
1620
1621            KademliaHandlerEvent::FindNodeReq { key, request_id } => {
1622                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
1623                self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1624                    peer_id: source,
1625                    handler: NotifyHandler::One(connection),
1626                    event: KademliaHandlerIn::FindNodeRes {
1627                        closer_peers,
1628                        request_id,
1629                    },
1630                });
1631            }
1632
1633            KademliaHandlerEvent::FindNodeRes {
1634                closer_peers,
1635                user_data,
1636            } => {
1637                self.discovered(&user_data, &source, closer_peers.iter());
1638            }
1639
1640            KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
1641                let provider_peers = self.provider_peers(&key, &source);
1642                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
1643                self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1644                    peer_id: source,
1645                    handler: NotifyHandler::One(connection),
1646                    event: KademliaHandlerIn::GetProvidersRes {
1647                        closer_peers,
1648                        provider_peers,
1649                        request_id,
1650                    },
1651                });
1652            }
1653
1654            KademliaHandlerEvent::GetProvidersRes {
1655                closer_peers,
1656                provider_peers,
1657                user_data,
1658            } => {
1659                let peers = closer_peers.iter().chain(provider_peers.iter());
1660                self.discovered(&user_data, &source, peers);
1661                if let Some(query) = self.queries.get_mut(&user_data) {
1662                    if let QueryInfo::GetProviders {
1663                        providers, ..
1664                    } = &mut query.inner.info {
1665                        for peer in provider_peers {
1666                            providers.insert(peer.node_id);
1667                        }
1668                    }
1669                }
1670            }
1671
1672            KademliaHandlerEvent::QueryError { user_data, error } => {
1673                log::debug!("Request to {:?} in query {:?} failed with {:?}",
1674                            source, user_data, error);
1675                // If the query to which the error relates is still active,
1676                // signal the failure w.r.t. `source`.
1677                if let Some(query) = self.queries.get_mut(&user_data) {
1678                    query.on_failure(&source)
1679                }
1680            }
1681
1682            KademliaHandlerEvent::AddProvider { key, provider } => {
1683                // Only accept a provider record from a legitimate peer.
1684                if provider.node_id != source {
1685                    return
1686                }
1687
1688                self.provider_received(key, provider)
1689            }
1690
1691            KademliaHandlerEvent::GetRecord { key, request_id } => {
1692                // Lookup the record locally.
1693                let record = match self.store.get(&key) {
1694                    Some(record) => {
1695                        if record.is_expired(Instant::now()) {
1696                            self.store.remove(&key);
1697                            None
1698                        } else {
1699                            Some(record.into_owned())
1700                        }
1701                    },
1702                    None => None
1703                };
1704
1705                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
1706
1707                self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1708                    peer_id: source,
1709                    handler: NotifyHandler::One(connection),
1710                    event: KademliaHandlerIn::GetRecordRes {
1711                        record,
1712                        closer_peers,
1713                        request_id,
1714                    },
1715                });
1716            }
1717
1718            KademliaHandlerEvent::GetRecordRes {
1719                record,
1720                closer_peers,
1721                user_data,
1722            } => {
1723                if let Some(query) = self.queries.get_mut(&user_data) {
1724                    if let QueryInfo::GetRecord {
1725                        key, records, quorum, cache_at
1726                    } = &mut query.inner.info {
1727                        if let Some(record) = record {
1728                            records.push(PeerRecord{ peer: Some(source), record });
1729
1730                            let quorum = quorum.get();
1731                            if records.len() >= quorum {
1732                                // Desired quorum reached. The query may finish. See
1733                                // [`Query::try_finish`] for details.
1734                                let peers = records.iter()
1735                                    .filter_map(|PeerRecord{ peer, .. }| peer.as_ref())
1736                                    .cloned()
1737                                    .collect::<Vec<_>>();
1738                                let finished = query.try_finish(peers.iter());
1739                                if !finished {
1740                                    debug!(
1741                                        "GetRecord query ({:?}) reached quorum ({}/{}) with \
1742                                         response from peer {} but could not yet finish.",
1743                                        user_data, peers.len(), quorum, source,
1744                                    );
1745                                }
1746                            }
1747                        } else if quorum.get() == 1 {
1748                            // It is a "standard" Kademlia query, for which the
1749                            // closest node to the key that did *not* return the
1750                            // value is tracked in order to cache the record on
1751                            // that node if the query turns out to be successful.
1752                            let source_key = kbucket::Key::from(source);
1753                            if let Some(cache_key) = cache_at {
1754                                let key = kbucket::Key::new(key.clone());
1755                                if source_key.distance(&key) < cache_key.distance(&key) {
1756                                    *cache_at = Some(source_key)
1757                                }
1758                            } else {
1759                                *cache_at = Some(source_key)
1760                            }
1761                        }
1762                    }
1763                }
1764
1765                self.discovered(&user_data, &source, closer_peers.iter());
1766            }
1767
1768            KademliaHandlerEvent::PutRecord {
1769                record,
1770                request_id
1771            } => {
1772                self.record_received(source, connection, request_id, record);
1773            }
1774
1775            KademliaHandlerEvent::PutRecordRes {
1776                user_data, ..
1777            } => {
1778                if let Some(query) = self.queries.get_mut(&user_data) {
1779                    query.on_success(&source, vec![]);
1780                    if let QueryInfo::PutRecord {
1781                        phase: PutRecordPhase::PutRecord { success, .. }, quorum, ..
1782                    } = &mut query.inner.info {
1783                        success.push(source);
1784
1785                        let quorum = quorum.get();
1786                        if success.len() >= quorum {
1787                            let peers = success.clone();
1788                            let finished = query.try_finish(peers.iter());
1789                            if !finished {
1790                                debug!(
1791                                    "PutRecord query ({:?}) reached quorum ({}/{}) with response \
1792                                     from peer {} but could not yet finish.",
1793                                    user_data, peers.len(), quorum, source,
1794                                );
1795                            }
1796                        }
1797                    }
1798                }
1799            }
1800        };
1801    }
1802
1803    fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
1804        self.local_addrs.insert(addr.clone());
1805    }
1806
1807    fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
1808        self.local_addrs.remove(addr);
1809    }
1810
1811    fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
1812        if self.local_addrs.len() < MAX_LOCAL_EXTERNAL_ADDRS {
1813            self.local_addrs.insert(addr.clone());
1814        }
1815    }
1816
1817    fn poll(&mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters) -> Poll<
1818        NetworkBehaviourAction<
1819            KademliaHandlerIn<QueryId>,
1820            Self::OutEvent,
1821        >,
1822    > {
1823        let now = Instant::now();
1824
1825        // Calculate the available capacity for queries triggered by background jobs.
1826        let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
1827
1828        // Run the periodic provider announcement job.
1829        if let Some(mut job) = self.add_provider_job.take() {
1830            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
1831            for _ in 0 .. num {
1832                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
1833                    self.start_add_provider(r.key, AddProviderContext::Republish)
1834                } else {
1835                    break
1836                }
1837            }
1838            jobs_query_capacity -= num;
1839            self.add_provider_job = Some(job);
1840        }
1841
1842        // Run the periodic record replication / publication job.
1843        if let Some(mut job) = self.put_record_job.take() {
1844            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
1845            for _ in 0 .. num {
1846                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
1847                    let context = if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1848                        PutRecordContext::Republish
1849                    } else {
1850                        PutRecordContext::Replicate
1851                    };
1852                    self.start_put_record(r, Quorum::All, context)
1853                } else {
1854                    break
1855                }
1856            }
1857            self.put_record_job = Some(job);
1858        }
1859
1860        loop {
1861            // Drain queued events first.
1862            if let Some(event) = self.queued_events.pop_front() {
1863                return Poll::Ready(event);
1864            }
1865
1866            // Drain applied pending entries from the routing table.
1867            if let Some(entry) = self.kbuckets.take_applied_pending() {
1868                let kbucket::Node { key, value } = entry.inserted;
1869                let event = KademliaEvent::RoutingUpdated {
1870                    peer: key.into_preimage(),
1871                    addresses: value,
1872                    old_peer: entry.evicted.map(|n| n.key.into_preimage())
1873                };
1874                return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
1875            }
1876
1877            // Look for a finished query.
1878            loop {
1879                match self.queries.poll(now) {
1880                    QueryPoolState::Finished(q) => {
1881                        if let Some(event) = self.query_finished(q, parameters) {
1882                            return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
1883                        }
1884                    }
1885                    QueryPoolState::Timeout(q) => {
1886                        if let Some(event) = self.query_timeout(q) {
1887                            return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
1888                        }
1889                    }
1890                    QueryPoolState::Waiting(Some((query, peer_id))) => {
1891                        let event = query.inner.info.to_request(query.id());
1892                        // TODO: AddProvider requests yield no response, so the query completes
1893                        // as soon as all requests have been sent. However, the handler should
1894                        // better emit an event when the request has been sent (and report
1895                        // an error if sending fails), instead of immediately reporting
1896                        // "success" somewhat prematurely here.
1897                        if let QueryInfo::AddProvider {
1898                            phase: AddProviderPhase::AddProvider { .. },
1899                            ..
1900                        } = &query.inner.info {
1901                            query.on_success(&peer_id, vec![])
1902                        }
1903                        if self.connected_peers.contains(&peer_id) {
1904                            self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1905                                peer_id, event, handler: NotifyHandler::Any
1906                            });
1907                        } else if &peer_id != self.kbuckets.local_key().preimage() {
1908                            query.inner.pending_rpcs.push((peer_id, event));
1909                            self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
1910                                peer_id, condition: DialPeerCondition::Disconnected
1911                            });
1912                        }
1913                    }
1914                    QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
1915                }
1916            }
1917
1918            // No immediate event was produced as a result of a finished query.
1919            // If no new events have been queued either, signal `NotReady` to
1920            // be polled again later.
1921            if self.queued_events.is_empty() {
1922                return Poll::Pending
1923            }
1924        }
1925    }
1926}
1927
1928/// A quorum w.r.t. the configured replication factor specifies the minimum
1929/// number of distinct nodes that must be successfully contacted in order
1930/// for a query to succeed.
1931#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1932pub enum Quorum {
1933    One,
1934    Majority,
1935    All,
1936    N(NonZeroUsize)
1937}
1938
1939impl Quorum {
1940    /// Evaluate the quorum w.r.t a given total (number of peers).
1941    fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
1942        match self {
1943            Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
1944            Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
1945            Quorum::All => total,
1946            Quorum::N(n) => NonZeroUsize::min(total, *n)
1947        }
1948    }
1949}
1950
1951/// A record either received by the given peer or retrieved from the local
1952/// record store.
1953#[derive(Debug, Clone, PartialEq, Eq)]
1954pub struct PeerRecord {
1955    /// The peer from whom the record was received. `None` if the record was
1956    /// retrieved from local storage.
1957    pub peer: Option<PeerId>,
1958    pub record: Record,
1959}
1960
1961//////////////////////////////////////////////////////////////////////////////
1962// Events
1963
1964/// The events produced by the `Kademlia` behaviour.
1965///
1966/// See [`NetworkBehaviour::poll`].
1967#[derive(Debug)]
1968pub enum KademliaEvent {
1969    /// A query has produced a result.
1970    QueryResult {
1971        /// The ID of the query that finished.
1972        id: QueryId,
1973        /// The result of the query.
1974        result: QueryResult,
1975        /// Execution statistics from the query.
1976        stats: QueryStats
1977    },
1978
1979    /// The routing table has been updated with a new peer and / or
1980    /// address, thereby possibly evicting another peer.
1981    RoutingUpdated {
1982        /// The ID of the peer that was added or updated.
1983        peer: PeerId,
1984        /// The full list of known addresses of `peer`.
1985        addresses: Addresses,
1986        /// The ID of the peer that was evicted from the routing table to make
1987        /// room for the new peer, if any.
1988        old_peer: Option<PeerId>,
1989    },
1990
1991    /// A peer has connected for whom no listen address is known.
1992    ///
1993    /// If the peer is to be added to the routing table, a known
1994    /// listen address for the peer must be provided via [`Kademlia::add_address`].
1995    UnroutablePeer {
1996        peer: PeerId
1997    },
1998
1999    /// A connection to a peer has been established for whom a listen address
2000    /// is known but the peer has not been added to the routing table either
2001    /// because [`KademliaBucketInserts::Manual`] is configured or because
2002    /// the corresponding bucket is full.
2003    ///
2004    /// If the peer is to be included in the routing table, it must
2005    /// must be explicitly added via [`Kademlia::add_address`], possibly after
2006    /// removing another peer.
2007    ///
2008    /// See [`Kademlia::kbucket`] for insight into the contents of
2009    /// the k-bucket of `peer`.
2010    RoutablePeer {
2011        peer: PeerId,
2012        address: Multiaddr,
2013    },
2014
2015    /// A connection to a peer has been established for whom a listen address
2016    /// is known but the peer is only pending insertion into the routing table
2017    /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2018    /// may not make it into the routing table.
2019    ///
2020    /// If the peer is to be unconditionally included in the routing table,
2021    /// it should be explicitly added via [`Kademlia::add_address`] after
2022    /// removing another peer.
2023    ///
2024    /// See [`Kademlia::kbucket`] for insight into the contents of
2025    /// the k-bucket of `peer`.
2026    PendingRoutablePeer {
2027        peer: PeerId,
2028        address: Multiaddr,
2029    }
2030}
2031
2032/// The results of Kademlia queries.
2033#[derive(Debug)]
2034pub enum QueryResult {
2035    /// The result of [`Kademlia::bootstrap`].
2036    Bootstrap(BootstrapResult),
2037
2038    /// The result of [`Kademlia::get_closest_peers`].
2039    GetClosestPeers(GetClosestPeersResult),
2040
2041    /// The result of [`Kademlia::get_providers`].
2042    GetProviders(GetProvidersResult),
2043
2044    /// The result of [`Kademlia::start_providing`].
2045    StartProviding(AddProviderResult),
2046
2047    /// The result of a (automatic) republishing of a provider record.
2048    RepublishProvider(AddProviderResult),
2049
2050    /// The result of [`Kademlia::get_record`].
2051    GetRecord(GetRecordResult),
2052
2053    /// The result of [`Kademlia::put_record`].
2054    PutRecord(PutRecordResult),
2055
2056    /// The result of a (automatic) republishing of a (value-)record.
2057    RepublishRecord(PutRecordResult),
2058}
2059
2060/// The result of [`Kademlia::get_record`].
2061pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2062
2063/// The successful result of [`Kademlia::get_record`].
2064#[derive(Debug, Clone)]
2065pub struct GetRecordOk {
2066    pub records: Vec<PeerRecord>
2067}
2068
2069/// The error result of [`Kademlia::get_record`].
2070#[derive(Debug, Clone)]
2071pub enum GetRecordError {
2072    NotFound {
2073        key: record::Key,
2074        closest_peers: Vec<PeerId>
2075    },
2076    QuorumFailed {
2077        key: record::Key,
2078        records: Vec<PeerRecord>,
2079        quorum: NonZeroUsize
2080    },
2081    Timeout {
2082        key: record::Key,
2083        records: Vec<PeerRecord>,
2084        quorum: NonZeroUsize
2085    }
2086}
2087
2088impl GetRecordError {
2089    /// Gets the key of the record for which the operation failed.
2090    pub fn key(&self) -> &record::Key {
2091        match self {
2092            GetRecordError::QuorumFailed { key, .. } => key,
2093            GetRecordError::Timeout { key, .. } => key,
2094            GetRecordError::NotFound { key, .. } => key,
2095        }
2096    }
2097
2098    /// Extracts the key of the record for which the operation failed,
2099    /// consuming the error.
2100    pub fn into_key(self) -> record::Key {
2101        match self {
2102            GetRecordError::QuorumFailed { key, .. } => key,
2103            GetRecordError::Timeout { key, .. } => key,
2104            GetRecordError::NotFound { key, .. } => key,
2105        }
2106    }
2107}
2108
2109/// The result of [`Kademlia::put_record`].
2110pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2111
2112/// The successful result of [`Kademlia::put_record`].
2113#[derive(Debug, Clone)]
2114pub struct PutRecordOk {
2115    pub key: record::Key
2116}
2117
2118/// The error result of [`Kademlia::put_record`].
2119#[derive(Debug)]
2120pub enum PutRecordError {
2121    QuorumFailed {
2122        key: record::Key,
2123        /// [`PeerId`]s of the peers the record was successfully stored on.
2124        success: Vec<PeerId>,
2125        quorum: NonZeroUsize
2126    },
2127    Timeout {
2128        key: record::Key,
2129        /// [`PeerId`]s of the peers the record was successfully stored on.
2130        success: Vec<PeerId>,
2131        quorum: NonZeroUsize
2132    },
2133}
2134
2135impl PutRecordError {
2136    /// Gets the key of the record for which the operation failed.
2137    pub fn key(&self) -> &record::Key {
2138        match self {
2139            PutRecordError::QuorumFailed { key, .. } => key,
2140            PutRecordError::Timeout { key, .. } => key,
2141        }
2142    }
2143
2144    /// Extracts the key of the record for which the operation failed,
2145    /// consuming the error.
2146    pub fn into_key(self) -> record::Key {
2147        match self {
2148            PutRecordError::QuorumFailed { key, .. } => key,
2149            PutRecordError::Timeout { key, .. } => key,
2150        }
2151    }
2152}
2153
2154/// The result of [`Kademlia::bootstrap`].
2155pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
2156
2157/// The successful result of [`Kademlia::bootstrap`].
2158#[derive(Debug, Clone)]
2159pub struct BootstrapOk {
2160    pub peer: PeerId,
2161    pub num_remaining: u32,
2162}
2163
2164/// The error result of [`Kademlia::bootstrap`].
2165#[derive(Debug, Clone)]
2166pub enum BootstrapError {
2167    Timeout {
2168        peer: PeerId,
2169        num_remaining: Option<u32>,
2170    }
2171}
2172
2173/// The result of [`Kademlia::get_closest_peers`].
2174pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
2175
2176/// The successful result of [`Kademlia::get_closest_peers`].
2177#[derive(Debug, Clone)]
2178pub struct GetClosestPeersOk {
2179    pub key: Vec<u8>,
2180    pub peers: Vec<PeerId>
2181}
2182
2183/// The error result of [`Kademlia::get_closest_peers`].
2184#[derive(Debug, Clone)]
2185pub enum GetClosestPeersError {
2186    Timeout {
2187        key: Vec<u8>,
2188        peers: Vec<PeerId>
2189    }
2190}
2191
2192impl GetClosestPeersError {
2193    /// Gets the key for which the operation failed.
2194    pub fn key(&self) -> &Vec<u8> {
2195        match self {
2196            GetClosestPeersError::Timeout { key, .. } => key,
2197        }
2198    }
2199
2200    /// Extracts the key for which the operation failed,
2201    /// consuming the error.
2202    pub fn into_key(self) -> Vec<u8> {
2203        match self {
2204            GetClosestPeersError::Timeout { key, .. } => key,
2205        }
2206    }
2207}
2208
2209/// The result of [`Kademlia::get_providers`].
2210pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
2211
2212/// The successful result of [`Kademlia::get_providers`].
2213#[derive(Debug, Clone)]
2214pub struct GetProvidersOk {
2215    pub key: record::Key,
2216    pub providers: HashSet<PeerId>,
2217    pub closest_peers: Vec<PeerId>
2218}
2219
2220/// The error result of [`Kademlia::get_providers`].
2221#[derive(Debug, Clone)]
2222pub enum GetProvidersError {
2223    Timeout {
2224        key: record::Key,
2225        providers: HashSet<PeerId>,
2226        closest_peers: Vec<PeerId>
2227    }
2228}
2229
2230impl GetProvidersError {
2231    /// Gets the key for which the operation failed.
2232    pub fn key(&self) -> &record::Key {
2233        match self {
2234            GetProvidersError::Timeout { key, .. } => key,
2235        }
2236    }
2237
2238    /// Extracts the key for which the operation failed,
2239    /// consuming the error.
2240    pub fn into_key(self) -> record::Key {
2241        match self {
2242            GetProvidersError::Timeout { key, .. } => key,
2243        }
2244    }
2245}
2246
2247/// The result of publishing a provider record.
2248pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
2249
2250/// The successful result of publishing a provider record.
2251#[derive(Debug, Clone)]
2252pub struct AddProviderOk {
2253    pub key: record::Key,
2254}
2255
2256/// The possible errors when publishing a provider record.
2257#[derive(Debug)]
2258pub enum AddProviderError {
2259    /// The query timed out.
2260    Timeout {
2261        key: record::Key,
2262    },
2263}
2264
2265impl AddProviderError {
2266    /// Gets the key for which the operation failed.
2267    pub fn key(&self) -> &record::Key {
2268        match self {
2269            AddProviderError::Timeout { key, .. } => key,
2270        }
2271    }
2272
2273    /// Extracts the key for which the operation failed,
2274    pub fn into_key(self) -> record::Key {
2275        match self {
2276            AddProviderError::Timeout { key, .. } => key,
2277        }
2278    }
2279}
2280
2281impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
2282    fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
2283        KadPeer {
2284            node_id: e.node.key.into_preimage(),
2285            multiaddrs: e.node.value.into_vec(),
2286            connection_ty: match e.status {
2287                NodeStatus::Connected => KadConnectionType::Connected,
2288                NodeStatus::Disconnected => KadConnectionType::NotConnected
2289            }
2290        }
2291    }
2292}
2293
2294//////////////////////////////////////////////////////////////////////////////
2295// Internal query state
2296
2297struct QueryInner {
2298    /// The query-specific state.
2299    info: QueryInfo,
2300    /// Addresses of peers discovered during a query.
2301    addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
2302    /// A map of pending requests to peers.
2303    ///
2304    /// A request is pending if the targeted peer is not currently connected
2305    /// and these requests are sent as soon as a connection to the peer is established.
2306    pending_rpcs: SmallVec<[(PeerId, KademliaHandlerIn<QueryId>); K_VALUE.get()]>
2307}
2308
2309impl QueryInner {
2310    fn new(info: QueryInfo) -> Self {
2311        QueryInner {
2312            info,
2313            addresses: Default::default(),
2314            pending_rpcs: SmallVec::default()
2315        }
2316    }
2317}
2318
2319/// The context of a [`QueryInfo::AddProvider`] query.
2320#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2321pub enum AddProviderContext {
2322    Publish,
2323    Republish,
2324}
2325
2326/// The context of a [`QueryInfo::PutRecord`] query.
2327#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2328pub enum PutRecordContext {
2329    Publish,
2330    Republish,
2331    Replicate,
2332    Cache,
2333}
2334
2335/// Information about a running query.
2336#[derive(Debug, Clone)]
2337pub enum QueryInfo {
2338    /// A query initiated by [`Kademlia::bootstrap`].
2339    Bootstrap {
2340        /// The targeted peer ID.
2341        peer: PeerId,
2342        /// The remaining random peer IDs to query, one per
2343        /// bucket that still needs refreshing.
2344        ///
2345        /// This is `None` if the initial self-lookup has not
2346        /// yet completed and `Some` with an exhausted iterator
2347        /// if bootstrapping is complete.
2348        remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>
2349    },
2350
2351    /// A query initiated by [`Kademlia::get_closest_peers`].
2352    GetClosestPeers { key: Vec<u8> },
2353
2354    /// A query initiated by [`Kademlia::get_providers`].
2355    GetProviders {
2356        /// The key for which to search for providers.
2357        key: record::Key,
2358        /// The found providers.
2359        providers: HashSet<PeerId>,
2360    },
2361
2362    /// A (repeated) query initiated by [`Kademlia::start_providing`].
2363    AddProvider {
2364        /// The record key.
2365        key: record::Key,
2366        /// The current phase of the query.
2367        phase: AddProviderPhase,
2368        /// The execution context of the query.
2369        context: AddProviderContext,
2370    },
2371
2372    /// A (repeated) query initiated by [`Kademlia::put_record`].
2373    PutRecord {
2374        record: Record,
2375        /// The expected quorum of responses w.r.t. the replication factor.
2376        quorum: NonZeroUsize,
2377        /// The current phase of the query.
2378        phase: PutRecordPhase,
2379        /// The execution context of the query.
2380        context: PutRecordContext,
2381    },
2382
2383    /// A query initiated by [`Kademlia::get_record`].
2384    GetRecord {
2385        /// The key to look for.
2386        key: record::Key,
2387        /// The records with the id of the peer that returned them. `None` when
2388        /// the record was found in the local store.
2389        records: Vec<PeerRecord>,
2390        /// The number of records to look for.
2391        quorum: NonZeroUsize,
2392        /// The closest peer to `key` that did not return a record.
2393        ///
2394        /// When a record is found in a standard Kademlia query (quorum == 1),
2395        /// it is cached at this peer as soon as a record is found.
2396        cache_at: Option<kbucket::Key<PeerId>>,
2397    },
2398}
2399
2400impl QueryInfo {
2401    /// Creates an event for a handler to issue an outgoing request in the
2402    /// context of a query.
2403    fn to_request(&self, query_id: QueryId) -> KademliaHandlerIn<QueryId> {
2404        match &self {
2405            QueryInfo::Bootstrap { peer, .. } => KademliaHandlerIn::FindNodeReq {
2406                key: peer.to_bytes(),
2407                user_data: query_id,
2408            },
2409            QueryInfo::GetClosestPeers { key, .. } => KademliaHandlerIn::FindNodeReq {
2410                key: key.clone(),
2411                user_data: query_id,
2412            },
2413            QueryInfo::GetProviders { key, .. } => KademliaHandlerIn::GetProvidersReq {
2414                key: key.clone(),
2415                user_data: query_id,
2416            },
2417            QueryInfo::AddProvider { key, phase, .. } => match phase {
2418                AddProviderPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq {
2419                    key: key.to_vec(),
2420                    user_data: query_id,
2421                },
2422                AddProviderPhase::AddProvider { provider_id, external_addresses, .. } => {
2423                    KademliaHandlerIn::AddProvider {
2424                        key: key.clone(),
2425                        provider: crate::protocol::KadPeer {
2426                            node_id: *provider_id,
2427                            multiaddrs: external_addresses.clone(),
2428                            connection_ty: crate::protocol::KadConnectionType::Connected,
2429                        }
2430                    }
2431                }
2432            },
2433            QueryInfo::GetRecord { key, .. } => KademliaHandlerIn::GetRecord {
2434                key: key.clone(),
2435                user_data: query_id,
2436            },
2437            QueryInfo::PutRecord { record, phase, .. } => match phase {
2438                PutRecordPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq {
2439                    key: record.key.to_vec(),
2440                    user_data: query_id,
2441                },
2442                PutRecordPhase::PutRecord { .. } => KademliaHandlerIn::PutRecord {
2443                    record: record.clone(),
2444                    user_data: query_id
2445                }
2446            }
2447        }
2448    }
2449}
2450
2451/// The phases of a [`QueryInfo::AddProvider`] query.
2452#[derive(Debug, Clone)]
2453pub enum AddProviderPhase {
2454    /// The query is searching for the closest nodes to the record key.
2455    GetClosestPeers,
2456
2457    /// The query advertises the local node as a provider for the key to
2458    /// the closest nodes to the key.
2459    AddProvider {
2460        /// The local peer ID that is advertised as a provider.
2461        provider_id: PeerId,
2462        /// The external addresses of the provider being advertised.
2463        external_addresses: Vec<Multiaddr>,
2464        /// Query statistics from the finished `GetClosestPeers` phase.
2465        get_closest_peers_stats: QueryStats,
2466    },
2467}
2468
2469/// The phases of a [`QueryInfo::PutRecord`] query.
2470#[derive(Debug, Clone, PartialEq, Eq)]
2471pub enum PutRecordPhase {
2472    /// The query is searching for the closest nodes to the record key.
2473    GetClosestPeers,
2474
2475    /// The query is replicating the record to the closest nodes to the key.
2476    PutRecord {
2477        /// A list of peers the given record has been successfully replicated to.
2478        success: Vec<PeerId>,
2479        /// Query statistics from the finished `GetClosestPeers` phase.
2480        get_closest_peers_stats: QueryStats,
2481    },
2482}
2483
2484/// A mutable reference to a running query.
2485pub struct QueryMut<'a> {
2486    query: &'a mut Query<QueryInner>,
2487}
2488
2489impl<'a> QueryMut<'a> {
2490    pub fn id(&self) -> QueryId {
2491        self.query.id()
2492    }
2493
2494    /// Gets information about the type and state of the query.
2495    pub fn info(&self) -> &QueryInfo {
2496        &self.query.inner.info
2497    }
2498
2499    /// Gets execution statistics about the query.
2500    ///
2501    /// For a multi-phase query such as `put_record`, these are the
2502    /// statistics of the current phase.
2503    pub fn stats(&self) -> &QueryStats {
2504        self.query.stats()
2505    }
2506
2507    /// Finishes the query asap, without waiting for the
2508    /// regular termination conditions.
2509    pub fn finish(&mut self) {
2510        self.query.finish()
2511    }
2512}
2513
2514/// An immutable reference to a running query.
2515pub struct QueryRef<'a> {
2516    query: &'a Query<QueryInner>,
2517}
2518
2519impl<'a> QueryRef<'a> {
2520    pub fn id(&self) -> QueryId {
2521        self.query.id()
2522    }
2523
2524    /// Gets information about the type and state of the query.
2525    pub fn info(&self) -> &QueryInfo {
2526        &self.query.inner.info
2527    }
2528
2529    /// Gets execution statistics about the query.
2530    ///
2531    /// For a multi-phase query such as `put_record`, these are the
2532    /// statistics of the current phase.
2533    pub fn stats(&self) -> &QueryStats {
2534        self.query.stats()
2535    }
2536}
2537
2538/// An operation failed to due no known peers in the routing table.
2539#[derive(Debug, Clone)]
2540pub struct NoKnownPeers();
2541
2542impl fmt::Display for NoKnownPeers {
2543    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2544        write!(f, "No known peers.")
2545    }
2546}
2547
2548impl std::error::Error for NoKnownPeers {}
2549
2550/// The possible outcomes of [`Kademlia::add_address`].
2551pub enum RoutingUpdate {
2552    /// The given peer and address has been added to the routing
2553    /// table.
2554    Success,
2555    /// The peer and address is pending insertion into
2556    /// the routing table, if a disconnected peer fails
2557    /// to respond. If the given peer and address ends up
2558    /// in the routing table, [`KademliaEvent::RoutingUpdated`]
2559    /// is eventually emitted.
2560    Pending,
2561    /// The routing table update failed, either because the
2562    /// corresponding bucket for the peer is full and the
2563    /// pending slot(s) are occupied, or because the given
2564    /// peer ID is deemed invalid (e.g. refers to the local
2565    /// peer ID).
2566    Failed,
2567}
2568
2569/// The maximum number of local external addresses. When reached any
2570/// further externally reported addresses are ignored. The behaviour always
2571/// tracks all its listen addresses.
2572const MAX_LOCAL_EXTERNAL_ADDRS: usize = 20;