libp2p_kad/
behaviour.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23#![allow(clippy::needless_lifetimes)]
24
25mod test;
26
27use crate::K_VALUE;
28use crate::addresses::{Addresses, Remove};
29use crate::handler::{
30    KademliaHandlerProto,
31    KademliaHandlerConfig,
32    KademliaRequestId,
33    KademliaHandlerEvent,
34    KademliaHandlerIn
35};
36use crate::jobs::*;
37use crate::kbucket::{self, KBucketsTable, NodeStatus, KBucketRef, KeyBytes};
38use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
39use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState, WeightedPeer};
40use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
41use crate::contact::Contact;
42use fnv::{FnvHashMap, FnvHashSet};
43use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId, multiaddr, identity::PublicKey, identity::Keypair};
44use libp2p_swarm::{
45    DialPeerCondition,
46    NetworkBehaviour,
47    NetworkBehaviourAction,
48    NotifyHandler,
49    PollParameters,
50};
51use log::{info, debug, warn, LevelFilter};
52use smallvec::SmallVec;
53use std::{borrow::Cow, error, iter, time::Duration};
54use std::collections::{HashSet, VecDeque};
55use std::fmt;
56use std::num::NonZeroUsize;
57use std::task::{Context, Poll};
58use std::vec;
59use wasm_timer::Instant;
60use trust_graph::{Certificate};
61use derivative::Derivative;
62use crate::metrics::Metrics;
63
64pub use crate::query::QueryStats;
65
66type TrustGraph = trust_graph::TrustGraph<trust_graph::InMemoryStorage>;
67
68/// `Kademlia` is a `NetworkBehaviour` that implements the libp2p
69/// Kademlia protocol.
70pub struct Kademlia<TStore> {
71    /// The Kademlia routing table.
72    kbuckets: KBucketsTable<kbucket::Key<PeerId>, Contact>,
73
74    /// The k-bucket insertion strategy.
75    kbucket_inserts: KademliaBucketInserts,
76
77    /// Configuration of the wire protocol.
78    protocol_config: KademliaProtocolConfig,
79
80    /// The currently active (i.e. in-progress) queries.
81    queries: QueryPool<QueryInner>,
82
83    /// The currently connected peers.
84    ///
85    /// This is a superset of the connected peers currently in the routing table.
86    connected_peers: FnvHashSet<PeerId>,
87
88    /// Periodic job for re-publication of provider records for keys
89    /// provided by the local node.
90    add_provider_job: Option<AddProviderJob>,
91
92    /// Periodic job for (re-)replication and (re-)publishing of
93    /// regular (value-)records.
94    put_record_job: Option<PutRecordJob>,
95
96    /// The TTL of regular (value-)records.
97    record_ttl: Option<Duration>,
98
99    /// The TTL of provider records.
100    provider_record_ttl: Option<Duration>,
101
102    /// How long to keep connections alive when they're idle.
103    connection_idle_timeout: Duration,
104
105    /// Queued events to return when the behaviour is being polled.
106    queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>,
107
108    /// The currently known addresses of the local node.
109    local_addrs: HashSet<Multiaddr>,
110
111    /// The record storage.
112    store: TStore,
113
114    pub trust: TrustGraph,
115    pub(super) metrics: Metrics,
116
117    // TODO: maintenance job (periodic bootstrap) (first time: after a minute or less)
118    // TODO: "small" bootstrap function: lookup yourself
119    // TODO: how substrate uses bootstrap? is there a periodic maintenance job?
120}
121
122/// The configurable strategies for the insertion of peers
123/// and their addresses into the k-buckets of the Kademlia
124/// routing table.
125#[derive(Copy, Clone, Debug, PartialEq, Eq)]
126pub enum KademliaBucketInserts {
127    /// Whenever a connection to a peer is established as a
128    /// result of a dialing attempt and that peer is not yet
129    /// in the routing table, it is inserted as long as there
130    /// is a free slot in the corresponding k-bucket. If the
131    /// k-bucket is full but still has a free pending slot,
132    /// it may be inserted into the routing table at a later time if an unresponsive
133    /// disconnected peer is evicted from the bucket.
134    OnConnected,
135    /// New peers and addresses are only added to the routing table via
136    /// explicit calls to [`Kademlia::add_address`].
137    ///
138    /// > **Note**: Even though peers can only get into the
139    /// > routing table as a result of [`Kademlia::add_address`],
140    /// > routing table entries are still updated as peers
141    /// > connect and disconnect (i.e. the order of the entries
142    /// > as well as the network addresses).
143    Manual,
144}
145
146/// The configuration for the `Kademlia` behaviour.
147///
148/// The configuration is consumed by [`Kademlia::new`].
149#[derive(Debug, Clone)]
150pub struct KademliaConfig {
151    kbucket_pending_timeout: Duration,
152    query_config: QueryConfig,
153    protocol_config: KademliaProtocolConfig,
154    record_ttl: Option<Duration>,
155    record_replication_interval: Option<Duration>,
156    record_publication_interval: Option<Duration>,
157    provider_record_ttl: Option<Duration>,
158    provider_publication_interval: Option<Duration>,
159    connection_idle_timeout: Duration,
160    kbucket_inserts: KademliaBucketInserts,
161}
162
163impl Default for KademliaConfig {
164    fn default() -> Self {
165        KademliaConfig {
166            kbucket_pending_timeout: Duration::from_secs(60),
167            query_config: QueryConfig::default(),
168            protocol_config: Default::default(),
169            record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
170            record_replication_interval: Some(Duration::from_secs(60 * 60)),
171            record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
172            provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
173            provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
174            connection_idle_timeout: Duration::from_secs(10),
175            kbucket_inserts: KademliaBucketInserts::OnConnected,
176        }
177    }
178}
179
180impl KademliaConfig {
181    /// Sets a custom protocol name.
182    ///
183    /// Kademlia nodes only communicate with other nodes using the same protocol
184    /// name. Using a custom name therefore allows to segregate the DHT from
185    /// others, if that is desired.
186    pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
187        self.protocol_config.set_protocol_name(name);
188        self
189    }
190
191    /// Sets the timeout for a single query.
192    ///
193    /// > **Note**: A single query usually comprises at least as many requests
194    /// > as the replication factor, i.e. this is not a request timeout.
195    ///
196    /// The default is 60 seconds.
197    pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
198        self.query_config.timeout = timeout;
199        self
200    }
201
202    /// Sets the replication factor to use.
203    ///
204    /// The replication factor determines to how many closest peers
205    /// a record is replicated. The default is [`K_VALUE`].
206    pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
207        self.query_config.replication_factor = replication_factor;
208        self
209    }
210
211    /// Sets the allowed level of parallelism for iterative queries.
212    ///
213    /// The `α` parameter in the Kademlia paper. The maximum number of peers
214    /// that an iterative query is allowed to wait for in parallel while
215    /// iterating towards the closest nodes to a target. Defaults to
216    /// `ALPHA_VALUE`.
217    ///
218    /// This only controls the level of parallelism of an iterative query, not
219    /// the level of parallelism of a query to a fixed set of peers.
220    ///
221    /// When used with [`KademliaConfig::disjoint_query_paths`] it equals
222    /// the amount of disjoint paths used.
223    pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
224        self.query_config.parallelism = parallelism;
225        self
226    }
227
228    /// Require iterative queries to use disjoint paths for increased resiliency
229    /// in the presence of potentially adversarial nodes.
230    ///
231    /// When enabled the number of disjoint paths used equals the configured
232    /// parallelism.
233    ///
234    /// See the S/Kademlia paper for more information on the high level design
235    /// as well as its security improvements.
236    pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
237        if enabled {
238            unimplemented!(
239                "TODO FIXME: disjoint paths are not working correctly with weighted \
240                    and swamp buckets. Need to fix at least behaviour::test::put_record"
241            )
242        }
243        self.query_config.disjoint_query_paths = enabled;
244        self
245    }
246
247    /// Sets the TTL for stored records.
248    ///
249    /// The TTL should be significantly longer than the (re-)publication
250    /// interval, to avoid premature expiration of records. The default is 36
251    /// hours.
252    ///
253    /// `None` means records never expire.
254    ///
255    /// Does not apply to provider records.
256    pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
257        self.record_ttl = record_ttl;
258        self
259    }
260
261    /// Sets the (re-)replication interval for stored records.
262    ///
263    /// Periodic replication of stored records ensures that the records
264    /// are always replicated to the available nodes closest to the key in the
265    /// context of DHT topology changes (i.e. nodes joining and leaving), thus
266    /// ensuring persistence until the record expires. Replication does not
267    /// prolong the regular lifetime of a record (for otherwise it would live
268    /// forever regardless of the configured TTL). The expiry of a record
269    /// is only extended through re-publication.
270    ///
271    /// This interval should be significantly shorter than the publication
272    /// interval, to ensure persistence between re-publications. The default
273    /// is 1 hour.
274    ///
275    /// `None` means that stored records are never re-replicated.
276    ///
277    /// Does not apply to provider records.
278    pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
279        self.record_replication_interval = interval;
280        self
281    }
282
283    /// Sets the (re-)publication interval of stored records.
284    ///
285    /// Records persist in the DHT until they expire. By default, published
286    /// records are re-published in regular intervals for as long as the record
287    /// exists in the local storage of the original publisher, thereby extending
288    /// the records lifetime.
289    ///
290    /// This interval should be significantly shorter than the record TTL, to
291    /// ensure records do not expire prematurely. The default is 24 hours.
292    ///
293    /// `None` means that stored records are never automatically re-published.
294    ///
295    /// Does not apply to provider records.
296    pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
297        self.record_publication_interval = interval;
298        self
299    }
300
301    /// Sets the TTL for provider records.
302    ///
303    /// `None` means that stored provider records never expire.
304    ///
305    /// Must be significantly larger than the provider publication interval.
306    pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
307        self.provider_record_ttl = ttl;
308        self
309    }
310
311    /// Sets the interval at which provider records for keys provided
312    /// by the local node are re-published.
313    ///
314    /// `None` means that stored provider records are never automatically
315    /// re-published.
316    ///
317    /// Must be significantly less than the provider record TTL.
318    pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
319        self.provider_publication_interval = interval;
320        self
321    }
322
323    /// Sets the amount of time to keep connections alive when they're idle.
324    pub fn set_connection_idle_timeout(&mut self, duration: Duration) -> &mut Self {
325        self.connection_idle_timeout = duration;
326        self
327    }
328
329    /// Modifies the maximum allowed size of individual Kademlia packets.
330    ///
331    /// It might be necessary to increase this value if trying to put large
332    /// records.
333    pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
334        self.protocol_config.set_max_packet_size(size);
335        self
336    }
337
338    /// Sets the k-bucket insertion strategy for the Kademlia routing table.
339    pub fn set_kbucket_inserts(&mut self, inserts: KademliaBucketInserts) -> &mut Self {
340        self.kbucket_inserts = inserts;
341        self
342    }
343}
344
345impl<TStore> Kademlia<TStore>
346where
347    for<'a> TStore: RecordStore<'a>
348{
349    /// Creates a new `Kademlia` network behaviour with a default configuration.
350    pub fn new(kp: Keypair, id: PeerId, store: TStore, trust: TrustGraph) -> Self {
351        Self::with_config(kp, id, store, Default::default(), trust)
352    }
353
354    /// Get the protocol name of this kademlia instance.
355    pub fn protocol_name(&self) -> &[u8] {
356        self.protocol_config.protocol_name()
357    }
358
359    /// Creates a new `Kademlia` network behaviour with the given configuration.
360    pub fn with_config(kp: Keypair, id: PeerId, store: TStore, config: KademliaConfig, trust: TrustGraph) -> Self {
361        let local_key = kbucket::Key::from(id);
362
363        let put_record_job = config
364            .record_replication_interval
365            .or(config.record_publication_interval)
366            .map(|interval| PutRecordJob::new(
367                id,
368                interval,
369                config.record_publication_interval,
370                config.record_ttl,
371            ));
372
373        let add_provider_job = config
374            .provider_publication_interval
375            .map(AddProviderJob::new);
376
377        Kademlia {
378            store,
379            kbuckets: KBucketsTable::new(kp, local_key, config.kbucket_pending_timeout),
380            kbucket_inserts: config.kbucket_inserts,
381            protocol_config: config.protocol_config,
382            queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
383            queries: QueryPool::new(config.query_config),
384            connected_peers: Default::default(),
385            add_provider_job,
386            put_record_job,
387            record_ttl: config.record_ttl,
388            provider_record_ttl: config.provider_record_ttl,
389            connection_idle_timeout: config.connection_idle_timeout,
390            local_addrs: HashSet::new(),
391            trust,
392            metrics: Metrics::disabled(),
393        }
394    }
395
396    /// Enables metrics collection
397    pub fn enable_metrics(&mut self, registry: &prometheus::Registry) {
398        self.metrics = Metrics::enabled(registry, self.kbuckets.local_key().preimage());
399    }
400
401    /// Gets an iterator over immutable references to all running queries.
402    pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
403        self.queries.iter().filter_map(|query|
404            if !query.is_finished() {
405                Some(QueryRef { query })
406            } else {
407                None
408            })
409    }
410
411    /// Gets an iterator over mutable references to all running queries.
412    pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
413        self.queries.iter_mut().filter_map(|query|
414            if !query.is_finished() {
415                Some(QueryMut { query })
416            } else {
417                None
418            })
419    }
420
421    /// Gets an immutable reference to a running query, if it exists.
422    pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
423        self.queries.get(id).and_then(|query|
424            if !query.is_finished() {
425                Some(QueryRef { query })
426            } else {
427                None
428            })
429    }
430
431    /// Gets a mutable reference to a running query, if it exists.
432    pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
433        self.queries.get_mut(id).and_then(|query|
434            if !query.is_finished() {
435                Some(QueryMut { query })
436            } else {
437                None
438            })
439    }
440
441    /// Adds a known listen address of a peer participating in the DHT to the
442    /// routing table.
443    ///
444    /// Explicitly adding addresses of peers serves two purposes:
445    ///
446    ///   1. In order for a node to join the DHT, it must know about at least
447    ///      one other node of the DHT.
448    ///
449    ///   2. When a remote peer initiates a connection and that peer is not
450    ///      yet in the routing table, the `Kademlia` behaviour must be
451    ///      informed of an address on which that peer is listening for
452    ///      connections before it can be added to the routing table
453    ///      from where it can subsequently be discovered by all peers
454    ///      in the DHT.
455    ///
456    /// If the routing table has been updated as a result of this operation,
457    /// a [`KademliaEvent::RoutingUpdated`] event is emitted.
458    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr, public_key: PublicKey) -> RoutingUpdate {
459        let key = kbucket::Key::from(*peer);
460        let result = match self.kbuckets.entry(&key) {
461            kbucket::Entry::Present(mut entry, _) => {
462                if entry.value().insert(address) {
463                    self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
464                        KademliaEvent::RoutingUpdated {
465                            peer: *peer,
466                            addresses: entry.value().clone().into(),
467                            old_peer: None,
468                        }
469                    ))
470                }
471                RoutingUpdate::Success
472            }
473            kbucket::Entry::Pending(mut entry, _) => {
474                entry.value().insert(address);
475                RoutingUpdate::Pending
476            }
477            kbucket::Entry::Absent(entry) => {
478                debug!(
479                    "Will insert newly connected node {} with key {}",
480                    entry.key().clone().into_preimage(),
481                    bs58::encode(entry.key().as_ref()).into_string()
482                );
483                let contact = Contact {
484                    addresses: Addresses::new(address),
485                    public_key
486                };
487                let status =
488                    if self.connected_peers.contains(peer) {
489                        NodeStatus::Connected
490                    } else {
491                        NodeStatus::Disconnected
492                    };
493                let (status, events) = Self::insert_new_peer(entry, contact, status, &self.connected_peers, &self.trust);
494                events.into_iter().for_each(|e| self.queued_events.push_back(e));
495                status
496            },
497            kbucket::Entry::SelfEntry => RoutingUpdate::Failed,
498        };
499
500        self.print_bucket_table();
501        result
502    }
503
504    /// Removes an address of a peer from the routing table.
505    ///
506    /// If the given address is the last address of the peer in the
507    /// routing table, the peer is removed from the routing table
508    /// and `Some` is returned with a view of the removed entry.
509    /// The same applies if the peer is currently pending insertion
510    /// into the routing table.
511    ///
512    /// If the given peer or address is not in the routing table,
513    /// this is a no-op.
514    pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr)
515        -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Contact>>
516    {
517        let key = kbucket::Key::from(*peer);
518        match self.kbuckets.entry(&key) {
519            kbucket::Entry::Present(mut entry, _) => {
520                if entry.value().addresses.remove(address, Remove::Completely).is_err() {
521                    Some(entry.remove()) // it is the last address, thus remove the peer.
522                } else {
523                    None
524                }
525            }
526            kbucket::Entry::Pending(mut entry, _) => {
527                if entry.value().addresses.remove(address, Remove::Completely).is_err() {
528                    Some(entry.remove()) // it is the last address, thus remove the peer.
529                } else {
530                    None
531                }
532            }
533            kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
534                None
535            }
536        }
537    }
538
539    /// Removes a peer from the routing table.
540    ///
541    /// Returns `None` if the peer was not in the routing table,
542    /// not even pending insertion.
543    pub fn remove_peer(&mut self, peer: &PeerId)
544        -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Contact>>
545    {
546        let key = kbucket::Key::from(*peer);
547        match self.kbuckets.entry(&key) {
548            kbucket::Entry::Present(entry, _) => {
549                Some(entry.remove())
550            }
551            kbucket::Entry::Pending(entry, _) => {
552                Some(entry.remove())
553            }
554            kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {
555                None
556            }
557        }
558    }
559
560    /// Returns an iterator over all non-empty buckets in the routing table.
561    pub fn kbuckets(&mut self)
562        -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Contact>>
563    {
564        self.kbuckets.iter().filter(|b| !b.is_empty())
565    }
566
567    /// Returns the k-bucket for the distance to the given key.
568    ///
569    /// Returns `None` if the given key refers to the local key.
570    pub fn kbucket<K>(&mut self, key: K)
571        -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Contact>>
572    where
573        K: Into<kbucket::Key<K>> + Clone
574    {
575        self.kbuckets.bucket(&key.into())
576    }
577
578    /// Initiates an iterative query for the closest peers to the given key.
579    ///
580    /// The result of the query is delivered in a
581    /// [`KademliaEvent::QueryResult{QueryResult::GetClosestPeers}`].
582    pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
583    where
584        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone
585    {
586        let info = QueryInfo::GetClosestPeers { key: key.clone().into() };
587        let target: kbucket::Key<K> = key.into();
588        let peers = Self::closest_keys(&mut self.kbuckets, &target);
589        let inner = QueryInner::new(info);
590        self.queries.add_iter_closest(target.clone(), peers, inner)
591    }
592
593    /// Returns closest peers to the given key; takes peers from local routing table
594    pub fn local_closest_peers<'s, 'k : 's, K: 's>(&'s mut self, key: &'k kbucket::Key<K>)
595        -> impl Iterator<Item = WeightedPeer> + 's
596        where
597            K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone {
598        Self::closest_keys(&mut self.kbuckets, key)
599    }
600
601    /// Performs a lookup for a record in the DHT.
602    ///
603    /// The result of this operation is delivered in a
604    /// [`KademliaEvent::QueryResult{QueryResult::GetRecord}`].
605    pub fn get_record(&mut self, key: &record::Key, quorum: Quorum) -> QueryId {
606        let quorum = quorum.eval(self.queries.config().replication_factor);
607        let mut records = Vec::with_capacity(quorum.get());
608
609        if let Some(record) = self.store.get(key) {
610            if record.is_expired(Instant::now()) {
611                self.store.remove(key);
612                self.metrics.record_removed();
613            } else {
614                records.push(PeerRecord{ peer: None, record: record.into_owned()});
615            }
616        }
617
618        let done = records.len() >= quorum.get();
619        let target = kbucket::Key::new(key.clone());
620        let info = QueryInfo::GetRecord { key: key.clone(), records, quorum, cache_at: None };
621        let peers = Self::closest_keys(&mut self.kbuckets, &target);
622        let inner = QueryInner::new(info);
623        let id = self.queries.add_iter_closest(target.clone(), peers, inner); // (*)
624
625        // Instantly finish the query if we already have enough records.
626        if done {
627            self.queries.get_mut(&id).expect("by (*)").finish();
628        }
629
630        id
631    }
632
633    /// Stores a record in the DHT.
634    ///
635    /// Returns `Ok` if a record has been stored locally, providing the
636    /// `QueryId` of the initial query that replicates the record in the DHT.
637    /// The result of the query is eventually reported as a
638    /// [`KademliaEvent::QueryResult{QueryResult::PutRecord}`].
639    ///
640    /// The record is always stored locally with the given expiration. If the record's
641    /// expiration is `None`, the common case, it does not expire in local storage
642    /// but is still replicated with the configured record TTL. To remove the record
643    /// locally and stop it from being re-published in the DHT, see [`Kademlia::remove_record`].
644    ///
645    /// After the initial publication of the record, it is subject to (re-)replication
646    /// and (re-)publication as per the configured intervals. Periodic (re-)publication
647    /// does not update the record's expiration in local storage, thus a given record
648    /// with an explicit expiration will always expire at that instant and until then
649    /// is subject to regular (re-)replication and (re-)publication.
650    pub fn put_record(&mut self, mut record: Record, quorum: Quorum) -> Result<QueryId, store::Error> {
651        record.publisher = Some(*self.kbuckets.local_key().preimage());
652        self.store.put(record.clone())?;
653        self.metrics.store_put();
654        record.expires = record.expires.or_else(||
655            self.record_ttl.map(|ttl| Instant::now() + ttl));
656        let quorum = quorum.eval(self.queries.config().replication_factor);
657        let target = kbucket::Key::new(record.key.clone());
658
659        let peers = Self::closest_keys(&mut self.kbuckets, &target);
660        let context = PutRecordContext::Publish;
661        let info = QueryInfo::PutRecord {
662            context,
663            record,
664            quorum,
665            phase: PutRecordPhase::GetClosestPeers
666        };
667        let inner = QueryInner::new(info);
668        Ok(self.queries.add_iter_closest(target.clone(), peers, inner))
669    }
670
671    /// Removes the record with the given key from _local_ storage,
672    /// if the local node is the publisher of the record.
673    ///
674    /// Has no effect if a record for the given key is stored locally but
675    /// the local node is not a publisher of the record.
676    ///
677    /// This is a _local_ operation. However, it also has the effect that
678    /// the record will no longer be periodically re-published, allowing the
679    /// record to eventually expire throughout the DHT.
680    pub fn remove_record(&mut self, key: &record::Key) {
681        if let Some(r) = self.store.get(key) {
682            if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
683                self.store.remove(key);
684                self.metrics.record_removed();
685            }
686        }
687    }
688
689    /// Gets a mutable reference to the record store.
690    pub fn store_mut(&mut self) -> &mut TStore {
691        &mut self.store
692    }
693
694    /// Bootstraps the local node to join the DHT.
695    ///
696    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
697    /// own ID in the DHT. This introduces the local node to the other nodes
698    /// in the DHT and populates its routing table with the closest neighbours.
699    ///
700    /// Subsequently, all buckets farther from the bucket of the closest neighbour are
701    /// refreshed by initiating an additional bootstrapping query for each such
702    /// bucket with random keys.
703    ///
704    /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
705    /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
706    /// reported via [`KademliaEvent::QueryResult{QueryResult::Bootstrap}`] events,
707    /// with one such event per bootstrapping query.
708    ///
709    /// Returns `Err` if bootstrapping is impossible due an empty routing table.
710    ///
711    /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
712    /// > See [`Kademlia::add_address`].
713    pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
714        let local_key = self.kbuckets.local_key().clone();
715        let info = QueryInfo::Bootstrap {
716            peer: *local_key.preimage(),
717            remaining: None
718        };
719        let peers = Self::closest_keys(&mut self.kbuckets, &local_key).collect::<Vec<_>>();
720        if peers.is_empty() {
721            Err(NoKnownPeers())
722        } else {
723            let inner = QueryInner::new(info);
724            Ok(self.queries.add_iter_closest(local_key, peers, inner))
725        }
726    }
727
728    /// Establishes the local node as a provider of a value for the given key.
729    ///
730    /// This operation publishes a provider record with the given key and
731    /// identity of the local node to the peers closest to the key, thus establishing
732    /// the local node as a provider.
733    ///
734    /// Returns `Ok` if a provider record has been stored locally, providing the
735    /// `QueryId` of the initial query that announces the local node as a provider.
736    ///
737    /// The publication of the provider records is periodically repeated as per the
738    /// configured interval, to renew the expiry and account for changes to the DHT
739    /// topology. A provider record may be removed from local storage and
740    /// thus no longer re-published by calling [`Kademlia::stop_providing`].
741    ///
742    /// In contrast to the standard Kademlia push-based model for content distribution
743    /// implemented by [`Kademlia::put_record`], the provider API implements a
744    /// pull-based model that may be used in addition or as an alternative.
745    /// The means by which the actual value is obtained from a provider is out of scope
746    /// of the libp2p Kademlia provider API.
747    ///
748    /// The results of the (repeated) provider announcements sent by this node are
749    /// reported via [`KademliaEvent::QueryResult{QueryResult::StartProviding}`].
750    pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
751        self.print_bucket_table();
752
753        // Note: We store our own provider records locally without local addresses
754        // to avoid redundant storage and outdated addresses. Instead these are
755        // acquired on demand when returning a `ProviderRecord` for the local node.
756        let local_addrs = Vec::new();
757        // TODO: calculate weight for self?
758        let record = ProviderRecord::new(
759            key.clone(),
760            *self.kbuckets.local_key().preimage(),
761            local_addrs);
762        self.store.add_provider(record)?;
763        let target = kbucket::Key::new(key.clone());
764        debug!(
765            "start_providing for key {} ; kademlia key {}",
766            bs58::encode(target.preimage().as_ref()).into_string(), // peer id
767            bs58::encode(target.as_ref()).into_string(), // sha256
768        );
769        let provider_key = self.kbuckets.local_public_key();
770        let certificates = self.get_certificates(&provider_key);
771        let peers = Self::closest_keys(&mut self.kbuckets, &target);
772        let context = AddProviderContext::Publish;
773        let info = QueryInfo::AddProvider {
774            context,
775            key,
776            phase: AddProviderPhase::GetClosestPeers,
777            provider_key,
778            certificates
779        };
780        let inner = QueryInner::new(info);
781        let id = self.queries.add_iter_closest(target.clone(), peers, inner);
782        Ok(id)
783    }
784
785    /// Stops the local node from announcing that it is a provider for the given key.
786    ///
787    /// This is a local operation. The local node will still be considered as a
788    /// provider for the key by other nodes until these provider records expire.
789    pub fn stop_providing(&mut self, key: &record::Key) {
790        let target = kbucket::Key::new(key.clone());
791        debug!(
792            "stop_providing for key {} ; kademlia key {}",
793            bs58::encode(key.as_ref()).into_string(), // peer id
794            bs58::encode(target.as_ref()).into_string(), // sha256
795        );
796        self.store.remove_provider(key, self.kbuckets.local_key().preimage());
797    }
798
799    /// Performs a lookup for providers of a value to the given key.
800    ///
801    /// The result of this operation is delivered in a
802    /// reported via [`KademliaEvent::QueryResult{QueryResult::GetProviders}`].
803    pub fn get_providers(&mut self, key: record::Key) -> QueryId {
804        self.print_bucket_table();
805        let info = QueryInfo::GetProviders {
806            key: key.clone(),
807            providers: HashSet::new(),
808        };
809        let target = kbucket::Key::new(key);
810        debug!(
811            "get_providers for key {} ; kademlia key {}",
812            bs58::encode(target.preimage().as_ref()).into_string(), // peer id
813            bs58::encode(target.as_ref()).into_string(), // sha256
814        );
815        let peers = Self::closest_keys(&mut self.kbuckets, &target);
816        let inner = QueryInner::new(info);
817        self.queries.add_iter_closest(target.clone(), peers, inner)
818    }
819
820    /// Forces replication of a single record
821    pub fn replicate_record(&mut self, key: record::Key) {
822        if let Some(rec) = self.store.get(&key).map(|r| r.into_owned()) {
823            self.start_put_record(rec, Quorum::All, PutRecordContext::Replicate);
824            if let Some(job) = self.put_record_job.as_mut() {
825                // Skip replication for this key next time
826                job.skip(key)
827            }
828        }
829    }
830
831    /// Processes discovered peers from a successful request in an iterative `Query`.
832    fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
833    where
834        I: Iterator<Item = &'a KadPeer> + Clone
835    {
836        // Add certificates to trust graph
837        let cur_time = trust_graph::current_time();
838        for peer in peers.clone() {
839            for cert in peer.certificates.iter() {
840                match self.trust.add(cert, cur_time) {
841                    Ok(_) => log::trace!("{} added cert {:?} from {}", self.kbuckets.local_key().preimage(), cert, source),
842                    Err(err) => log::info!("Unable to add certificate for peer {}: {}", peer.node_id, err),
843                }
844            }
845        }
846
847        let local_id = self.kbuckets.local_key().preimage();
848        let others_iter = peers.filter(|p| &p.node_id != local_id);
849        let trust = &self.trust;
850        if let Some(query) = self.queries.get_mut(query_id) {
851            log::trace!("Request to {:?} in query {:?} succeeded.", source, query_id);
852            for peer in others_iter.clone() {
853                log::trace!("Peer {:?} reported by {:?} in query {:?}.", peer, source, query_id);
854                query.inner.contacts.insert(peer.node_id, peer.clone().into());
855            }
856            query.on_success(source, others_iter.map(|kp| WeightedPeer {
857                peer_id: kp.node_id.clone().into(),
858                weight: get_weight(trust, &kp.public_key),
859            }))
860        }
861    }
862
863    /// Finds the closest peers to a `target` in the context of a request by
864    /// the `source` peer, such that the `source` peer is never included in the
865    /// result.
866    fn find_closest<T: Clone>(&mut self, target: &kbucket::Key<T>, source: &PeerId) -> Vec<KadPeer> {
867        if target == self.kbuckets.local_key() {
868            Vec::new()
869        } else {
870            let mut peers: Vec<_> = self.kbuckets
871                .closest(target)
872                .filter(|e| e.node.key.preimage() != source)
873                .take(self.queries.config().replication_factor.get())
874                .map(KadPeer::from)
875                .collect();
876            peers.iter_mut().for_each(|mut peer|
877                peer.certificates = self.get_certificates(&peer.public_key)
878            );
879            peers
880        }
881    }
882
883    /// Collects all peers who are known to be providers of the value for a given `Multihash`.
884    fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
885        let kbuckets = &mut self.kbuckets;
886        let connected = &mut self.connected_peers;
887        let local_addrs = &self.local_addrs;
888        let trust = &self.trust;
889        self.store.providers(key)
890            .into_iter()
891            .filter_map(move |p| {
892                let provider_id = if log::max_level() >= LevelFilter::Debug {
893                    p.provider.to_string()
894                } else {
895                    String::new()
896                };
897
898                let kad_peer = if &p.provider != source {
899                    let node_id = p.provider;
900                    let connection_ty = if connected.contains(&node_id) {
901                        KadConnectionType::Connected
902                    } else {
903                        KadConnectionType::NotConnected
904                    };
905
906                    if &node_id == kbuckets.local_key().preimage() {
907                        // The provider is either the local node and we fill in
908                        // the local addresses on demand,
909                        let self_key = kbuckets.local_public_key();
910                        Some(KadPeer {
911                            node_id,
912                            connection_ty,
913                            multiaddrs: local_addrs.iter().cloned().collect::<Vec<_>>(),
914                            certificates: get_certificates(&trust, &self_key),
915                            public_key: self_key,
916                        })
917                    } else {
918                        let key = kbucket::Key::from(node_id);
919                        kbuckets.entry(&key).view().map(|e| {
920                            let contact = e.node.value;
921                            let multiaddrs = if p.addresses.is_empty() {
922                                // This is a legacy (pre-#1708) provider without addresses,
923                                // so take addresses from the routing table
924                                contact.addresses.clone().into_vec()
925                            } else {
926                                p.addresses
927                            };
928                            let certificates = {
929                                match node_id.as_public_key() {
930                                    Some(pk) =>
931                                        get_certificates(&trust, &pk),
932                                    None => {
933                                        log::warn!("Provider {} has a non-inlined public key: {:?}", node_id, key);
934                                        vec![]
935                                    }
936                                }
937                            };
938
939                            KadPeer {
940                                node_id,
941                                multiaddrs,
942                                public_key: contact.public_key.clone(),
943                                connection_ty: match e.status {
944                                    NodeStatus::Connected => KadConnectionType::Connected,
945                                    NodeStatus::Disconnected => KadConnectionType::NotConnected
946                                },
947                                certificates
948                            }
949                        })
950                    }
951                } else {
952                    None
953                };
954                debug!(
955                    "Local provider for {}: {}; source: {}; found? {}",
956                    bs58::encode(key).into_string(),
957                    provider_id,
958                    source,
959                    kad_peer.is_some()
960                );
961                kad_peer
962            })
963            .take(self.queries.config().replication_factor.get())
964            .collect::<Vec<_>>()
965    }
966
967    /// Starts an iterative `ADD_PROVIDER` query for the given key.
968    fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
969        let provider_key = self.kbuckets.local_public_key();
970        let certificates = self.get_certificates(&provider_key);
971        let info = QueryInfo::AddProvider {
972            context,
973            key: key.clone(),
974            phase: AddProviderPhase::GetClosestPeers,
975            provider_key,
976            certificates
977        };
978        let target = kbucket::Key::new(key);
979        let peers = Self::closest_keys(&mut self.kbuckets, &target);
980        let inner = QueryInner::new(info);
981        self.queries.add_iter_closest(target.clone(), peers, inner);
982    }
983
984    /// Starts an iterative `PUT_VALUE` query for the given record.
985    fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
986        let quorum = quorum.eval(self.queries.config().replication_factor);
987        let target = kbucket::Key::new(record.key.clone());
988        let peers = Self::closest_keys(&mut self.kbuckets, &target);
989        let info = QueryInfo::PutRecord {
990            record, quorum, context, phase: PutRecordPhase::GetClosestPeers
991        };
992        let inner = QueryInner::new(info);
993        self.queries.add_iter_closest(target.clone(), peers, inner);
994    }
995
996    /// Updates the routing table with a new connection status and address of a peer.
997    fn connection_updated(&mut self, peer: PeerId, contact: Option<Contact>, new_status: NodeStatus) {
998        let key = kbucket::Key::from(peer);
999        match self.kbuckets.entry(&key) {
1000            kbucket::Entry::Present(mut entry, old_status) => {
1001                if let Some(contact) = contact {
1002                    if *entry.value() != contact { // TODO: what about public key change?
1003                        *entry.value() = contact; // TODO: is there a better way to do that?
1004                        self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
1005                            KademliaEvent::RoutingUpdated {
1006                                peer,
1007                                addresses: entry.value().addresses.clone(),
1008                                old_peer: None,
1009                            }
1010                        ))
1011                    }
1012                }
1013                if old_status != new_status {
1014                    entry.update(new_status);
1015                }
1016            },
1017
1018            kbucket::Entry::Pending(mut entry, old_status) => {
1019                if let Some(contact) = contact {
1020                    *entry.value() = contact;
1021                }
1022                if old_status != new_status {
1023                    entry.update(new_status);
1024                }
1025            },
1026
1027            kbucket::Entry::Absent(entry) => {
1028                // Only connected nodes with a known address are newly inserted.
1029                if new_status != NodeStatus::Connected {
1030                    return
1031                }
1032
1033                match (contact, self.kbucket_inserts) {
1034                    (None, _) => {
1035                        self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
1036                            KademliaEvent::UnroutablePeer { peer }
1037                        ));
1038                    }
1039                    (Some(c), KademliaBucketInserts::Manual) => {
1040                        let address = c.addresses.iter().last().expect("addresses can't be empty here").clone();
1041                        self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
1042                            KademliaEvent::RoutablePeer { peer, address }
1043                        ));
1044                    }
1045                    (Some(contact), KademliaBucketInserts::OnConnected) => {
1046                        // Only connected nodes with a known address are newly inserted.
1047                        Self::insert_new_peer(entry, contact, new_status, &self.connected_peers, &self.trust)
1048                            .1
1049                            .into_iter()
1050                            .for_each(|e|
1051                                self.queued_events.push_back(e)
1052                            );
1053                    }
1054                }
1055            },
1056            _ => {}
1057        }
1058
1059        self.print_bucket_table();
1060    }
1061
1062    fn insert_new_peer(
1063        entry: kbucket::AbsentEntry<kbucket::Key<PeerId>, Contact>,
1064        contact: Contact,
1065        status: NodeStatus,
1066        connected_peers: &FnvHashSet<PeerId>,
1067        trust: &TrustGraph
1068    ) -> (RoutingUpdate, Vec<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>)
1069    {
1070        let addresses = contact.addresses.clone();
1071        let peer = entry.key().preimage().clone();
1072        let weight = get_weight(&trust, &contact.public_key);
1073        debug!(
1074            "Calculated weight for {} pk {}: {}",
1075            entry.key().preimage(),
1076            bs58::encode(contact.public_key.clone().into_protobuf_encoding()).into_string(),
1077            weight
1078        );
1079        // TODO: how to avoid clone when bucket isn't Full?
1080        let address = contact.addresses.iter().last().expect("addresses can't be empty here").clone();
1081        match entry.insert(contact, status, weight) {
1082            kbucket::InsertResult::Inserted => {
1083                (
1084                    RoutingUpdate::Success,
1085                    vec![
1086                        NetworkBehaviourAction::GenerateEvent(
1087                            KademliaEvent::RoutingUpdated {
1088                                peer,
1089                                addresses,
1090                                old_peer: None,
1091                            }
1092                        )
1093                    ]
1094                )
1095            },
1096            kbucket::InsertResult::Full => {
1097                debug!("Bucket full. Peer not added to routing table: {}", peer);
1098                (
1099                    RoutingUpdate::Failed,
1100                    vec![NetworkBehaviourAction::GenerateEvent(
1101                        KademliaEvent::RoutablePeer { peer, address }
1102                    )]
1103                )
1104            },
1105            kbucket::InsertResult::Pending { disconnected } => { // least recently connected peer is returned
1106                debug_assert!(!connected_peers.contains(disconnected.preimage()));
1107                let address = addresses.first().clone();
1108                (
1109                    RoutingUpdate::Pending,
1110                    vec![
1111                        // TODO: 'A connection to a peer has been established' isn't true at this point
1112                        NetworkBehaviourAction::GenerateEvent(
1113                            KademliaEvent::PendingRoutablePeer { peer, address }
1114                        ),
1115                        NetworkBehaviourAction::DialPeer {
1116                            peer_id: disconnected.into_preimage(),
1117                            condition: DialPeerCondition::Disconnected
1118                        },
1119                    ]
1120                )
1121            },
1122        }
1123    }
1124
1125    /// Handles a finished (i.e. successful) query.
1126    fn query_finished(&mut self, q: Query<QueryInner>, params: &mut impl PollParameters)
1127        -> Option<KademliaEvent>
1128    {
1129        let query_id = q.id();
1130        let result = q.into_result();
1131
1132        log::trace!("Query {} ({:?}) finished", format!("{:#?}", result.inner.info).lines().take(1).next().unwrap(), query_id);
1133        match result.inner.info {
1134            QueryInfo::Bootstrap { peer, remaining } => {
1135                self.print_bucket_table();
1136                let local_key = self.kbuckets.local_key().clone();
1137                let mut remaining = remaining.unwrap_or_else(|| {
1138                    debug_assert_eq!(&peer, local_key.preimage());
1139                    // The lookup for the local key finished. To complete the bootstrap process,
1140                    // a bucket refresh should be performed for every bucket farther away than
1141                    // the first non-empty bucket (which are most likely no more than the last
1142                    // few, i.e. farthest, buckets).
1143                    self.kbuckets.iter()
1144                        .skip_while(|b| b.is_empty())
1145                        .skip(1) // Skip the bucket with the closest neighbour.
1146                        .map(|b| {
1147                            // Try to find a key that falls into the bucket. While such keys can
1148                            // be generated fully deterministically, the current libp2p kademlia
1149                            // wire protocol requires transmission of the preimages of the actual
1150                            // keys in the DHT keyspace, hence for now this is just a "best effort"
1151                            // to find a key that hashes into a specific bucket. The probabilities
1152                            // of finding a key in the bucket `b` with as most 16 trials are as
1153                            // follows:
1154                            //
1155                            // Pr(bucket-255) = 1 - (1/2)^16   ~= 1
1156                            // Pr(bucket-254) = 1 - (3/4)^16   ~= 1
1157                            // Pr(bucket-253) = 1 - (7/8)^16   ~= 0.88
1158                            // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1159                            // ...
1160                            let mut target = kbucket::Key::from(PeerId::random());
1161                            for _ in 0 .. 16 {
1162                                let d = local_key.distance(&target);
1163                                if b.contains(&d) {
1164                                    break;
1165                                }
1166                                target = kbucket::Key::from(PeerId::random());
1167                            }
1168                            target
1169                        }).collect::<Vec<_>>().into_iter()
1170                });
1171
1172                let num_remaining = remaining.len().saturating_sub(1) as u32;
1173
1174                if let Some(target) = remaining.next() {
1175                    let info = QueryInfo::Bootstrap {
1176                        peer: target.clone().into_preimage(),
1177                        remaining: Some(remaining)
1178                    };
1179                    let peers = Self::closest_keys(&mut self.kbuckets, &target);
1180                    let inner = QueryInner::new(info);
1181                    self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
1182                }
1183
1184                Some(KademliaEvent::QueryResult {
1185                    id: query_id,
1186                    stats: result.stats,
1187                    result: QueryResult::Bootstrap(Ok(BootstrapOk { peer, num_remaining }))
1188                })
1189            }
1190
1191            QueryInfo::GetClosestPeers { key, .. } => {
1192                Some(KademliaEvent::QueryResult {
1193                    id: query_id,
1194                    stats: result.stats,
1195                    result: QueryResult::GetClosestPeers(Ok(
1196                        GetClosestPeersOk { key, peers: result.peers.collect() }
1197                    ))
1198                })
1199            }
1200
1201            QueryInfo::GetProviders { key, providers } => {
1202                Some(KademliaEvent::QueryResult {
1203                    id: query_id,
1204                    stats: result.stats,
1205                    result: QueryResult::GetProviders(Ok(
1206                        GetProvidersOk {
1207                            key,
1208                            providers,
1209                            closest_peers: result.peers.collect()
1210                        }
1211                    ))
1212                })
1213            }
1214
1215            QueryInfo::AddProvider {
1216                context,
1217                key,
1218                phase: AddProviderPhase::GetClosestPeers,
1219                ..
1220            } => {
1221                let provider_id = *params.local_peer_id();
1222                let external_addresses = params.external_addresses().map(|r| r.addr).collect();
1223                let provider_key = self.kbuckets.local_public_key();
1224                let certificates = self.get_certificates(&provider_key);
1225                let inner = QueryInner::new(QueryInfo::AddProvider {
1226                    context,
1227                    key,
1228                    phase: AddProviderPhase::AddProvider {
1229                        provider_id,
1230                        external_addresses,
1231                        get_closest_peers_stats: result.stats
1232                    },
1233                    provider_key,
1234                    certificates
1235                });
1236                let contacts = &result.inner.contacts;
1237                let trust = &self.trust;
1238                let peers = result.peers.into_iter().map(|peer_id| {
1239                    let weight = contacts
1240                        .get(&peer_id)
1241                        .map(|c| get_weight(&trust, &c.public_key))
1242                        .unwrap_or_default();
1243                    WeightedPeer {
1244                        peer_id: peer_id.into(),
1245                        weight,
1246                    }
1247                });
1248                self.queries.continue_fixed(query_id, peers, inner);
1249                None
1250            }
1251
1252            QueryInfo::AddProvider {
1253                context,
1254                key,
1255                phase: AddProviderPhase::AddProvider { get_closest_peers_stats, .. },
1256                ..
1257            } => {
1258                log::debug!("AddProvider finished {:?}!", context);
1259                match context {
1260                    AddProviderContext::Publish => {
1261                        Some(KademliaEvent::QueryResult {
1262                            id: query_id,
1263                            stats: get_closest_peers_stats.merge(result.stats),
1264                            result: QueryResult::StartProviding(Ok(AddProviderOk { key }))
1265                        })
1266                    }
1267                    AddProviderContext::Republish => {
1268                        Some(KademliaEvent::QueryResult {
1269                            id: query_id,
1270                            stats: get_closest_peers_stats.merge(result.stats),
1271                            result: QueryResult::RepublishProvider(Ok(AddProviderOk { key }))
1272                        })
1273                    }
1274                }
1275            }
1276
1277            QueryInfo::GetRecord { key, records, quorum, cache_at } => {
1278                let results = if records.len() >= quorum.get() { // [not empty]
1279                    if let Some(cache_key) = cache_at {
1280                        // Cache the record at the closest node to the key that
1281                        // did not return the record.
1282                        let record = records.first().expect("[not empty]").record.clone();
1283                        let quorum = NonZeroUsize::new(1).expect("1 > 0");
1284                        let context = PutRecordContext::Cache;
1285                        let info = QueryInfo::PutRecord {
1286                            context,
1287                            record,
1288                            quorum,
1289                            phase: PutRecordPhase::PutRecord {
1290                                success: vec![],
1291                                get_closest_peers_stats: QueryStats::empty()
1292                            }
1293                        };
1294                        let inner = QueryInner::new(info);
1295                        let peer_id = cache_key.preimage();
1296                        let trust = &self.trust;
1297                        let weight =
1298                            result.inner.contacts.get(peer_id)
1299                                .map(|c| get_weight(&trust, &c.public_key))
1300                                .unwrap_or_default();
1301                        let peer = WeightedPeer {
1302                            weight,
1303                            peer_id: cache_key
1304                        };
1305                        self.queries.add_fixed(iter::once(peer), inner);
1306                    }
1307                    Ok(GetRecordOk { records })
1308                } else if records.is_empty() {
1309                    Err(GetRecordError::NotFound {
1310                        key,
1311                        closest_peers: result.peers.collect()
1312                    })
1313                } else {
1314                    Err(GetRecordError::QuorumFailed { key, records, quorum })
1315                };
1316                Some(KademliaEvent::QueryResult {
1317                    id: query_id,
1318                    stats: result.stats,
1319                    result: QueryResult::GetRecord(results)
1320                })
1321            }
1322
1323            QueryInfo::PutRecord {
1324                context,
1325                record,
1326                quorum,
1327                phase: PutRecordPhase::GetClosestPeers
1328            } => {
1329                let info = QueryInfo::PutRecord {
1330                    context,
1331                    record,
1332                    quorum,
1333                    phase: PutRecordPhase::PutRecord {
1334                        success: vec![],
1335                        get_closest_peers_stats: result.stats
1336                    }
1337                };
1338                let inner = QueryInner::new(info);
1339                let contacts = &result.inner.contacts;
1340                let trust = &self.trust;
1341                let peers = result.peers.into_iter().map(|peer_id| {
1342                    let weight =
1343                        contacts
1344                            .get(&peer_id)
1345                            .map(|c| get_weight(&trust, &c.public_key))
1346                            .unwrap_or_default();
1347
1348                    WeightedPeer {
1349                        peer_id: peer_id.into(),
1350                        weight,
1351                    }
1352                });
1353                self.queries.continue_fixed(query_id, peers, inner);
1354                None
1355            }
1356
1357            QueryInfo::PutRecord {
1358                context,
1359                record,
1360                quorum,
1361                phase: PutRecordPhase::PutRecord { success, get_closest_peers_stats }
1362            } => {
1363                let mk_result = |key: record::Key| {
1364                    if success.len() >= quorum.get() {
1365                        Ok(PutRecordOk { key })
1366                    } else {
1367                        Err(PutRecordError::QuorumFailed { key, quorum, success })
1368                    }
1369                };
1370                match context {
1371                    PutRecordContext::Publish =>
1372                        Some(KademliaEvent::QueryResult {
1373                            id: query_id,
1374                            stats: get_closest_peers_stats.merge(result.stats),
1375                            result: QueryResult::PutRecord(mk_result(record.key))
1376                        }),
1377                    PutRecordContext::Republish =>
1378                        Some(KademliaEvent::QueryResult {
1379                            id: query_id,
1380                            stats: get_closest_peers_stats.merge(result.stats),
1381                            result: QueryResult::RepublishRecord(mk_result(record.key))
1382                        }),
1383                    PutRecordContext::Replicate => {
1384                        debug!("Record replicated: {:?}", record.key);
1385                        None
1386                    }
1387                    PutRecordContext::Cache => {
1388                        debug!("Record cached: {:?}", record.key);
1389                        None
1390                    }
1391                }
1392            }
1393        }
1394    }
1395
1396    /// Handles a query that timed out.
1397    fn query_timeout(&mut self, query: Query<QueryInner>) -> Option<KademliaEvent> {
1398        let query_id = query.id();
1399        log::trace!("Query {:?} timed out.", query_id);
1400        let result = query.into_result();
1401        match result.inner.info {
1402            QueryInfo::Bootstrap { peer, mut remaining } => {
1403                let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1404
1405                if let Some(mut remaining) = remaining.take() {
1406                    // Continue with the next bootstrap query if `remaining` is not empty.
1407                    if let Some(target) = remaining.next() {
1408                        let info = QueryInfo::Bootstrap {
1409                            peer: target.clone().into_preimage(),
1410                            remaining: Some(remaining)
1411                        };
1412                        let peers = Self::closest_keys(&mut self.kbuckets, &target);
1413                        let inner = QueryInner::new(info);
1414                        self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
1415                    }
1416                }
1417
1418                Some(KademliaEvent::QueryResult {
1419                    id: query_id,
1420                    stats: result.stats,
1421                    result: QueryResult::Bootstrap(Err(
1422                        BootstrapError::Timeout { peer, num_remaining }
1423                    ))
1424                })
1425            }
1426
1427            QueryInfo::AddProvider { context, key, .. } =>
1428                Some(match context {
1429                    AddProviderContext::Publish =>
1430                        KademliaEvent::QueryResult {
1431                            id: query_id,
1432                            stats: result.stats,
1433                            result: QueryResult::StartProviding(Err(
1434                                AddProviderError::Timeout { key }
1435                            ))
1436                        },
1437                    AddProviderContext::Republish =>
1438                        KademliaEvent::QueryResult {
1439                            id: query_id,
1440                            stats: result.stats,
1441                            result: QueryResult::RepublishProvider(Err(
1442                                AddProviderError::Timeout { key }
1443                            ))
1444                        }
1445                }),
1446
1447            QueryInfo::GetClosestPeers { key } => {
1448                Some(KademliaEvent::QueryResult {
1449                    id: query_id,
1450                    stats: result.stats,
1451                    result: QueryResult::GetClosestPeers(Err(
1452                        GetClosestPeersError::Timeout {
1453                            key,
1454                            peers: result.peers.collect()
1455                        }
1456                    ))
1457                })
1458            },
1459
1460            QueryInfo::PutRecord { record, quorum, context, phase } => {
1461                let err = Err(PutRecordError::Timeout {
1462                    key: record.key,
1463                    quorum,
1464                    success: match phase {
1465                        PutRecordPhase::GetClosestPeers => vec![],
1466                        PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1467                    }
1468                });
1469                match context {
1470                    PutRecordContext::Publish =>
1471                        Some(KademliaEvent::QueryResult {
1472                            id: query_id,
1473                            stats: result.stats,
1474                            result: QueryResult::PutRecord(err)
1475                        }),
1476                    PutRecordContext::Republish =>
1477                        Some(KademliaEvent::QueryResult {
1478                            id: query_id,
1479                            stats: result.stats,
1480                            result: QueryResult::RepublishRecord(err)
1481                        }),
1482                    PutRecordContext::Replicate => match phase {
1483                        PutRecordPhase::GetClosestPeers => {
1484                            warn!("Locating closest peers for replication failed: {:?}", err);
1485                            None
1486                        }
1487                        PutRecordPhase::PutRecord { .. } => {
1488                            debug!("Replicating record failed: {:?}", err);
1489                            None
1490                        }
1491                    }
1492                    PutRecordContext::Cache => match phase {
1493                        PutRecordPhase::GetClosestPeers => {
1494                            // Caching a record at the closest peer to a key that did not return
1495                            // a record is never preceded by a lookup for the closest peers, i.e.
1496                            // it is a direct query to a single peer.
1497                            unreachable!()
1498                        }
1499                        PutRecordPhase::PutRecord { .. } => {
1500                            debug!("Caching record failed: {:?}", err);
1501                            None
1502                        }
1503                    }
1504                }
1505            }
1506
1507            QueryInfo::GetRecord { key, records, quorum, .. } =>
1508                Some(KademliaEvent::QueryResult {
1509                    id: query_id,
1510                    stats: result.stats,
1511                    result: QueryResult::GetRecord(Err(
1512                        GetRecordError::Timeout { key, records, quorum },
1513                    ))
1514                }),
1515
1516            QueryInfo::GetProviders { key, providers } =>
1517                Some(KademliaEvent::QueryResult {
1518                    id: query_id,
1519                    stats: result.stats,
1520                    result: QueryResult::GetProviders(Err(
1521                        GetProvidersError::Timeout {
1522                            key,
1523                            providers,
1524                            closest_peers: result.peers.collect()
1525                        }
1526                    ))
1527                })
1528            }
1529    }
1530
1531    /// Processes a record received from a peer.
1532    fn record_received(
1533        &mut self,
1534        source: PeerId,
1535        connection: ConnectionId,
1536        request_id: KademliaRequestId,
1537        mut record: Record
1538    ) {
1539        if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1540            // If the (alleged) publisher is the local node, do nothing. The record of
1541            // the original publisher should never change as a result of replication
1542            // and the publisher is always assumed to have the "right" value.
1543            self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1544                peer_id: source,
1545                handler: NotifyHandler::One(connection),
1546                event: KademliaHandlerIn::PutRecordRes {
1547                    key: record.key,
1548                    value: record.value,
1549                    request_id,
1550                },
1551            });
1552            return
1553        }
1554
1555        let now = Instant::now();
1556
1557        // Calculate the expiration exponentially inversely proportional to the
1558        // number of nodes between the local node and the closest node to the key
1559        // (beyond the replication factor). This ensures avoiding over-caching
1560        // outside of the k closest nodes to a key.
1561        let target = kbucket::Key::new(record.key.clone());
1562        let num_between = self.kbuckets.count_nodes_between(&target);
1563        let k = self.queries.config().replication_factor.get();
1564        let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1565        let expiration = self.record_ttl.map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1566        // The smaller TTL prevails. Only if neither TTL is set is the record
1567        // stored "forever".
1568        record.expires = record.expires.or(expiration).min(expiration);
1569
1570        if let Some(job) = self.put_record_job.as_mut() {
1571            // Ignore the record in the next run of the replication
1572            // job, since we can assume the sender replicated the
1573            // record to the k closest peers. Effectively, only
1574            // one of the k closest peers performs a replication
1575            // in the configured interval, assuming a shared interval.
1576            job.skip(record.key.clone())
1577        }
1578
1579        // While records received from a publisher, as well as records that do
1580        // not exist locally should always (attempted to) be stored, there is a
1581        // choice here w.r.t. the handling of replicated records whose keys refer
1582        // to records that exist locally: The value and / or the publisher may
1583        // either be overridden or left unchanged. At the moment and in the
1584        // absence of a decisive argument for another option, both are always
1585        // overridden as it avoids having to load the existing record in the
1586        // first place.
1587
1588        if !record.is_expired(now) {
1589            // The record is cloned because of the weird libp2p protocol
1590            // requirement to send back the value in the response, although this
1591            // is a waste of resources.
1592            match self.store.put(record.clone()) {
1593                Ok(()) => {
1594                    self.metrics.store_put();
1595                    debug!("Record stored: {:?}; {} bytes", record.key, record.value.len());
1596                },
1597                Err(e) => {
1598                    info!("Record not stored: {:?}", e);
1599                    self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1600                        peer_id: source,
1601                        handler: NotifyHandler::One(connection),
1602                        event: KademliaHandlerIn::Reset(request_id)
1603                    });
1604
1605                    return
1606                }
1607            }
1608        }
1609
1610        // The remote receives a [`KademliaHandlerIn::PutRecordRes`] even in the
1611        // case where the record is discarded due to being expired. Given that
1612        // the remote sent the local node a [`KademliaHandlerEvent::PutRecord`]
1613        // request, the remote perceives the local node as one node among the k
1614        // closest nodes to the target. In addition returning
1615        // [`KademliaHandlerIn::PutRecordRes`] does not reveal any internal
1616        // information to a possibly malicious remote node.
1617        self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1618            peer_id: source,
1619            handler: NotifyHandler::One(connection),
1620            event: KademliaHandlerIn::PutRecordRes {
1621                key: record.key,
1622                value: record.value,
1623                request_id,
1624            },
1625        })
1626    }
1627
1628    fn closest_keys<'a, T>(table: &'a mut KBucketsTable<kbucket::Key<PeerId>, Contact>, target: &'a T)
1629        -> impl Iterator<Item = WeightedPeer> + 'a
1630        where
1631            T: Clone + AsRef<KeyBytes>,
1632    {
1633        table.closest(target).map(|e| WeightedPeer {
1634            peer_id: e.node.key,
1635            // TODO: is node weight up to date?
1636            weight: e.node.weight
1637        })
1638    }
1639
1640    /// Processes a provider record received from a peer.
1641    fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1642        // Add certificates to trust graph
1643        let cur_time = trust_graph::current_time();
1644        for cert in provider.certificates.iter() {
1645            self.trust.add(cert, cur_time).unwrap_or_else(|err| {
1646                log::warn!("unable to add certificate for peer {}: {}", provider.node_id, err);
1647            });
1648        }
1649
1650        if &provider.node_id != self.kbuckets.local_key().preimage() {
1651            // TODO: calculate weight
1652            let record = ProviderRecord {
1653                key,
1654                provider: provider.node_id,
1655                expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1656                addresses: provider.multiaddrs,
1657            };
1658            if let Err(e) = self.store.add_provider(record) {
1659                info!("Provider record not stored: {:?}", e);
1660            }
1661        }
1662    }
1663
1664    fn print_bucket_table(&mut self) {
1665        use log::trace as log;
1666        use log::Level::Trace as TargetLevel;
1667
1668        if log::max_level() < TargetLevel {
1669            return
1670        }
1671
1672        let mut size = 0;
1673        let buckets = self.kbuckets.iter().filter_map(|KBucketRef { index, bucket }| {
1674            use multiaddr::Protocol::{Ip4, Ip6, Tcp};
1675            let elems = bucket.iter().collect::<Vec<_>>();
1676            if elems.len() == 0 {
1677                return None
1678            } else {
1679                size += elems.len();
1680            }
1681
1682            let header = format!("Bucket {:?}, elements: {}", index.get(), elems.len());
1683            let elems = elems.into_iter().map(|(node, status)| {
1684                let status_s = match status {
1685                    NodeStatus::Connected => "C",
1686                    NodeStatus::Disconnected => "D"
1687                };
1688
1689                let address_s = node.value.addresses
1690                    .iter()
1691                    .next()
1692                    .map(|ma|
1693                        ma.iter().fold(String::new(), |acc, proto|
1694                            match proto {
1695                                Ip4(addr) => format!("{}", addr),
1696                                Ip6(addr) => format!("{}", addr),
1697                                Tcp(port) => format!("{}:{}", acc, port),
1698                                _ => acc
1699                            }
1700                        )
1701                    ).unwrap_or("NOADDR".to_string());
1702
1703                let address_plus = node.value.addresses
1704                    .len()
1705                    .checked_sub(1)
1706                    .filter(|l| *l != 0)
1707                    .map(|l| format!(" (+{})", l))
1708                    .unwrap_or("".to_string());
1709
1710                let kademlia_key = bs58::encode(node.key.as_ref()).into_string();
1711                let len = kademlia_key.len();
1712                let kademlia_key = &kademlia_key[len - 10..];
1713
1714                let peer_id = node.key.preimage().to_string();
1715                let len = peer_id.len();
1716                let peer_id = &peer_id[len - 10..];
1717
1718                format!(
1719                    "[bcktdbg]\t{} {} {} {}{} {}\n",
1720                    status_s,
1721                    node.weight,
1722                    peer_id,
1723                    address_s,
1724                    address_plus,
1725                    kademlia_key
1726                )
1727            }).collect::<String>();
1728
1729            Some(format!("[bcktdbg] {}\n{}\n", header, elems))
1730        }).collect::<String>();
1731
1732        self.metrics.report_routing_table_size(size);
1733
1734        if size == 0 {
1735            log!("[bcktdbg] Bucket table is empty.")
1736        } else {
1737            log!("\n{}", buckets);
1738        }
1739    }
1740
1741    fn get_certificates(&self, key: &PublicKey) -> Vec<Certificate> {
1742        get_certificates(&self.trust, key)
1743    }
1744
1745    fn get_weight(&self, key: &PublicKey) -> u32 {
1746        get_weight(&self.trust, key)
1747    }
1748}
1749
1750fn get_certificates(trust: &TrustGraph, key: &PublicKey) -> Vec<Certificate> {
1751    trust.get_all_certs(fluence_identity::PublicKey::from(key.clone()), &[]).unwrap_or_default()
1752}
1753
1754fn get_weight(trust: &TrustGraph, key: &PublicKey) -> u32 {
1755    trust.weight(fluence_identity::PublicKey::from(key.clone())).unwrap_or_default().unwrap_or_default()
1756}
1757
1758/// Exponentially decrease the given duration (base 2).
1759fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
1760    Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
1761}
1762
1763impl<TStore> NetworkBehaviour for Kademlia<TStore>
1764where
1765    for<'a> TStore: RecordStore<'a>,
1766    TStore: Send + 'static,
1767{
1768    type ProtocolsHandler = KademliaHandlerProto<QueryId>;
1769    type OutEvent = KademliaEvent;
1770
1771    fn new_handler(&mut self) -> Self::ProtocolsHandler {
1772        KademliaHandlerProto::new(KademliaHandlerConfig {
1773            protocol_config: self.protocol_config.clone(),
1774            allow_listening: true,
1775            idle_timeout: self.connection_idle_timeout,
1776        })
1777    }
1778
1779    fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
1780        // We should order addresses from decreasing likelyhood of connectivity, so start with
1781        // the addresses of that peer in the k-buckets.
1782        let key = kbucket::Key::from(*peer_id);
1783        let mut peer_addrs =
1784            if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
1785                let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
1786                debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
1787                addrs
1788            } else {
1789                Vec::new()
1790            };
1791
1792        // We add to that a temporary list of addresses from the ongoing queries.
1793        for query in self.queries.iter() {
1794            if let Some(addrs) = query.inner.contacts.get(peer_id) {
1795                peer_addrs.extend(addrs.iter().cloned())
1796            }
1797        }
1798
1799        peer_addrs
1800    }
1801
1802    fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {
1803        // When a connection is established, we don't know yet whether the
1804        // remote supports the configured protocol name. Only once a connection
1805        // handler reports [`KademliaHandlerEvent::ProtocolConfirmed`] do we
1806        // update the local routing table.
1807    }
1808
1809    fn inject_connected(&mut self, peer: &PeerId) {
1810        // Queue events for sending pending RPCs to the connected peer.
1811        // There can be only one pending RPC for a particular peer and query per definition.
1812        for (peer_id, event) in self.queries.iter_mut().filter_map(|q|
1813            q.inner.pending_rpcs.iter()
1814                .position(|(p, _)| p == peer)
1815                .map(|p| q.inner.pending_rpcs.remove(p)))
1816        {
1817            self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1818                peer_id, event, handler: NotifyHandler::Any
1819            });
1820        }
1821
1822        self.connected_peers.insert(*peer);
1823        self.metrics.node_connected();
1824    }
1825
1826    fn inject_address_change(
1827        &mut self,
1828        peer: &PeerId,
1829        _: &ConnectionId,
1830        old: &ConnectedPoint,
1831        new: &ConnectedPoint
1832    ) {
1833        let (old, new) = (old.get_remote_address(), new.get_remote_address());
1834
1835        // Update routing table.
1836        if let Some(contact) = self.kbuckets.entry(&kbucket::Key::from(*peer)).value() {
1837            if contact.addresses.replace(old, new) {
1838                debug!("Address '{}' replaced with '{}' for peer '{}'.", old, new, peer);
1839            } else {
1840                debug!(
1841                    "Address '{}' not replaced with '{}' for peer '{}' as old address wasn't \
1842                     present.",
1843                    old, new, peer,
1844                );
1845            }
1846        } else {
1847            debug!(
1848                "Address '{}' not replaced with '{}' for peer '{}' as peer is not present in the \
1849                 routing table.",
1850                old, new, peer,
1851            );
1852        }
1853
1854        // Update query address cache.
1855        //
1856        // Given two connected nodes: local node A and remote node B. Say node B
1857        // is not in node A's routing table. Additionally node B is part of the
1858        // `QueryInner::addresses` list of an ongoing query on node A. Say Node
1859        // B triggers an address change and then disconnects. Later on the
1860        // earlier mentioned query on node A would like to connect to node B.
1861        // Without replacing the address in the `QueryInner::addresses` set node
1862        // A would attempt to dial the old and not the new address.
1863        //
1864        // While upholding correctness, iterating through all discovered
1865        // addresses of a peer in all currently ongoing queries might have a
1866        // large performance impact. If so, the code below might be worth
1867        // revisiting.
1868        for query in self.queries.iter_mut() {
1869            if let Some(contact) = query.inner.contacts.get_mut(peer) {
1870                for addr in contact.addresses.iter_mut() {
1871                    if addr == old {
1872                        *addr = new.clone();
1873                    }
1874                }
1875            }
1876        }
1877    }
1878
1879    fn inject_addr_reach_failure(
1880        &mut self,
1881        peer_id: Option<&PeerId>,
1882        addr: &Multiaddr,
1883        err: &dyn error::Error
1884    ) {
1885        if let Some(peer_id) = peer_id {
1886            let key = kbucket::Key::from(*peer_id);
1887
1888            if let Some(contact) = self.kbuckets.entry(&key).value() {
1889                // TODO: Ideally, the address should only be removed if the error can
1890                // be classified as "permanent" but since `err` is currently a borrowed
1891                // trait object without a `'static` bound, even downcasting for inspection
1892                // of the error is not possible (and also not truly desirable or ergonomic).
1893                // The error passed in should rather be a dedicated enum.
1894                if contact.addresses.remove(addr, Remove::KeepLast).is_ok() {
1895                    debug!("Address '{}' removed from peer '{}' due to error: {}.",
1896                        addr, peer_id, err);
1897                } else {
1898                    // Despite apparently having no reachable address (any longer),
1899                    // the peer is kept in the routing table with the last address to avoid
1900                    // (temporary) loss of network connectivity to "flush" the routing
1901                    // table. Once in, a peer is only removed from the routing table
1902                    // if it is the least recently connected peer, currently disconnected
1903                    // and is unreachable in the context of another peer pending insertion
1904                    // into the same bucket. This is handled transparently by the
1905                    // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
1906                    // within `Kademlia::poll`.
1907                    debug!("Last remaining address '{}' of peer '{}' is unreachable: {}.",
1908                        addr, peer_id, err)
1909                }
1910            }
1911
1912            for query in self.queries.iter_mut() {
1913                if let Some(contact) = query.inner.contacts.get_mut(peer_id) {
1914                    // It's safe to unwrap because there's no errors on Remove::Completely
1915                    contact.addresses.remove(addr, Remove::Completely).unwrap();
1916                }
1917            }
1918        }
1919    }
1920
1921    fn inject_dial_failure(&mut self, peer_id: &PeerId) {
1922        for query in self.queries.iter_mut() {
1923            query.on_failure(peer_id);
1924        }
1925    }
1926
1927    fn inject_disconnected(&mut self, id: &PeerId) {
1928        for query in self.queries.iter_mut() {
1929            query.on_failure(id);
1930        }
1931        self.connection_updated(*id, None, NodeStatus::Disconnected);
1932        self.connected_peers.remove(id);
1933    }
1934
1935    fn inject_event(
1936        &mut self,
1937        source: PeerId,
1938        connection: ConnectionId,
1939        event: KademliaHandlerEvent<QueryId>
1940    ) {
1941        self.metrics.received(&event);
1942        match event {
1943            KademliaHandlerEvent::ProtocolConfirmed { endpoint } => {
1944                debug_assert!(self.connected_peers.contains(&source));
1945                // The remote's address can only be put into the routing table,
1946                // and thus shared with other nodes, if the local node is the dialer,
1947                // since the remote address on an inbound connection may be specific
1948                // to that connection (e.g. typically the TCP port numbers).
1949                let new_address = match endpoint {
1950                    ConnectedPoint::Dialer { address } => Some(address),
1951                    ConnectedPoint::Listener { .. } => None,
1952                };
1953
1954                let contact = self.queries
1955                    .iter_mut()
1956                    .find_map(|q| q.inner.contacts.get(&source))
1957                    .cloned()
1958                    .and_then(|mut c|
1959                        new_address.as_ref().map(|addr| {
1960                            c.insert(addr.clone());
1961                            c
1962                        }));
1963
1964                let contact = contact.or({
1965                    let pk = source.as_public_key();
1966                    let address = new_address.map(Addresses::new);
1967                    address.zip(pk).map(|(addr, pk)| Contact::new(addr, pk))
1968                });
1969
1970                self.connection_updated(source, contact, NodeStatus::Connected);
1971            }
1972
1973            KademliaHandlerEvent::FindNodeReq { key, request_id } => {
1974                self.print_bucket_table();
1975                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
1976                self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
1977                    peer_id: source,
1978                    handler: NotifyHandler::One(connection),
1979                    event: KademliaHandlerIn::FindNodeRes {
1980                        closer_peers,
1981                        request_id,
1982                    },
1983                });
1984            }
1985
1986            KademliaHandlerEvent::FindNodeRes {
1987                closer_peers,
1988                user_data,
1989            } => {
1990                self.discovered(&user_data, &source, closer_peers.iter());
1991            }
1992
1993            KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
1994                self.print_bucket_table();
1995                let provider_peers = self.provider_peers(&key, &source);
1996                debug!(
1997                    "provider peers: {}",
1998                    provider_peers.iter().map(|p| p.node_id.to_string() + ", ").collect::<String>()
1999                );
2000                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2001                self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
2002                    peer_id: source,
2003                    handler: NotifyHandler::One(connection),
2004                    event: KademliaHandlerIn::GetProvidersRes {
2005                        closer_peers,
2006                        provider_peers,
2007                        request_id,
2008                    },
2009                });
2010            }
2011
2012            KademliaHandlerEvent::GetProvidersRes {
2013                closer_peers,
2014                provider_peers,
2015                user_data,
2016            } => {
2017                let peers = closer_peers.iter().chain(provider_peers.iter());
2018                self.discovered(&user_data, &source, peers);
2019                if let Some(query) = self.queries.get_mut(&user_data) {
2020                    if let QueryInfo::GetProviders {
2021                        providers, ..
2022                    } = &mut query.inner.info {
2023                        for peer in provider_peers {
2024                            providers.insert(peer.node_id);
2025                        }
2026                    }
2027                }
2028            }
2029
2030            KademliaHandlerEvent::QueryError { user_data, error } => {
2031                log::debug!("Request to {:?} in query {:?} failed with {:?}",
2032                            source, user_data, error);
2033                // If the query to which the error relates is still active,
2034                // signal the failure w.r.t. `source`.
2035                if let Some(query) = self.queries.get_mut(&user_data) {
2036                    query.on_failure(&source)
2037                }
2038            }
2039
2040            KademliaHandlerEvent::AddProvider { key, provider } => {
2041                // Only accept a provider record from a legitimate peer.
2042                if provider.node_id != source {
2043                    return
2044                }
2045
2046                self.provider_received(key, provider)
2047            }
2048
2049            KademliaHandlerEvent::GetRecord { key, request_id } => {
2050                // Lookup the record locally.
2051                let record = match self.store.get(&key) {
2052                    Some(record) => {
2053                        if record.is_expired(Instant::now()) {
2054                            self.store.remove(&key);
2055                            self.metrics.record_removed();
2056                            None
2057                        } else {
2058                            Some(record.into_owned())
2059                        }
2060                    },
2061                    None => None
2062                };
2063
2064                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
2065
2066                self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
2067                    peer_id: source,
2068                    handler: NotifyHandler::One(connection),
2069                    event: KademliaHandlerIn::GetRecordRes {
2070                        record,
2071                        closer_peers,
2072                        request_id,
2073                    },
2074                });
2075            }
2076
2077            KademliaHandlerEvent::GetRecordRes {
2078                record,
2079                closer_peers,
2080                user_data,
2081            } => {
2082                if let Some(query) = self.queries.get_mut(&user_data) {
2083                    if let QueryInfo::GetRecord {
2084                        key, records, quorum, cache_at
2085                    } = &mut query.inner.info {
2086                        if let Some(record) = record {
2087                            records.push(PeerRecord{ peer: Some(source), record });
2088
2089                            let quorum = quorum.get();
2090                            if records.len() >= quorum {
2091                                // Desired quorum reached. The query may finish. See
2092                                // [`Query::try_finish`] for details.
2093                                let peers = records.iter()
2094                                    .filter_map(|PeerRecord{ peer, .. }| peer.as_ref())
2095                                    .cloned()
2096                                    .collect::<Vec<_>>();
2097                                let finished = query.try_finish(peers.iter());
2098                                if !finished {
2099                                    debug!(
2100                                        "GetRecord query ({:?}) reached quorum ({}/{}) with \
2101                                         response from peer {} but could not yet finish.",
2102                                        user_data, peers.len(), quorum, source,
2103                                    );
2104                                }
2105                            }
2106                        } else if quorum.get() == 1 {
2107                            // It is a "standard" Kademlia query, for which the
2108                            // closest node to the key that did *not* return the
2109                            // value is tracked in order to cache the record on
2110                            // that node if the query turns out to be successful.
2111                            let source_key = kbucket::Key::from(source);
2112                            if let Some(cache_key) = cache_at {
2113                                let key = kbucket::Key::new(key.clone());
2114                                if source_key.distance(&key) < cache_key.distance(&key) {
2115                                    *cache_at = Some(source_key)
2116                                }
2117                            } else {
2118                                *cache_at = Some(source_key)
2119                            }
2120                        }
2121                    }
2122                }
2123
2124                self.discovered(&user_data, &source, closer_peers.iter());
2125            }
2126
2127            KademliaHandlerEvent::PutRecord {
2128                record,
2129                request_id
2130            } => {
2131                self.record_received(source, connection, request_id, record);
2132            }
2133
2134            KademliaHandlerEvent::PutRecordRes {
2135                user_data, ..
2136            } => {
2137                if let Some(query) = self.queries.get_mut(&user_data) {
2138                    query.on_success(&source, vec![]);
2139                    if let QueryInfo::PutRecord {
2140                        phase: PutRecordPhase::PutRecord { success, .. }, quorum, ..
2141                    } = &mut query.inner.info {
2142                        success.push(source);
2143
2144                        let quorum = quorum.get();
2145                        if success.len() >= quorum {
2146                            let peers = success.clone();
2147                            let finished = query.try_finish(peers.iter());
2148                            if !finished {
2149                                debug!(
2150                                    "PutRecord query ({:?}) reached quorum ({}/{}) with response \
2151                                     from peer {} but could not yet finish.",
2152                                    user_data, peers.len(), quorum, source,
2153                                );
2154                            }
2155                        }
2156                    }
2157                }
2158            }
2159        };
2160    }
2161
2162    fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
2163        self.local_addrs.insert(addr.clone());
2164    }
2165
2166    fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
2167        self.local_addrs.remove(addr);
2168    }
2169
2170    fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
2171        if self.local_addrs.len() < MAX_LOCAL_EXTERNAL_ADDRS {
2172            self.local_addrs.insert(addr.clone());
2173        }
2174    }
2175
2176    fn poll(&mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters) -> Poll<
2177        NetworkBehaviourAction<
2178            KademliaHandlerIn<QueryId>,
2179            Self::OutEvent,
2180        >,
2181    > {
2182        let now = Instant::now();
2183
2184        // Calculate the available capacity for queries triggered by background jobs.
2185        let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2186
2187        // Run the periodic provider announcement job.
2188        if let Some(mut job) = self.add_provider_job.take() {
2189            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2190            for _ in 0 .. num {
2191                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2192                    self.start_add_provider(r.key, AddProviderContext::Republish)
2193                } else {
2194                    break
2195                }
2196            }
2197            jobs_query_capacity -= num;
2198            self.add_provider_job = Some(job);
2199        }
2200
2201        // Run the periodic record replication / publication job.
2202        if let Some(mut job) = self.put_record_job.take() {
2203            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2204            for _ in 0 .. num {
2205                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2206                    let context = if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2207                        PutRecordContext::Republish
2208                    } else {
2209                        PutRecordContext::Replicate
2210                    };
2211                    self.start_put_record(r, Quorum::All, context)
2212                } else {
2213                    break
2214                }
2215            }
2216            self.put_record_job = Some(job);
2217        }
2218
2219        loop {
2220            // Drain queued events first.
2221            if let Some(event) = self.queued_events.pop_front() {
2222                self.metrics.polled_event(&event);
2223                return Poll::Ready(event);
2224            }
2225
2226            // Drain applied pending entries from the routing table.
2227            if let Some(entry) = self.kbuckets.take_applied_pending() {
2228                self.print_bucket_table();
2229                let kbucket::Node { key, value, .. } = entry.inserted;
2230                let event = KademliaEvent::RoutingUpdated {
2231                    peer: key.into_preimage(),
2232                    addresses: value.into(),
2233                    old_peer: entry.evicted.map(|n| n.key.into_preimage())
2234                };
2235                let event = NetworkBehaviourAction::GenerateEvent(event);
2236                self.metrics.polled_event(&event);
2237                return Poll::Ready(event);
2238            }
2239
2240            // Look for a finished query.
2241            loop {
2242                match self.queries.poll(now) {
2243                    QueryPoolState::Finished(mut q) => {
2244                        q.finish();
2245                        if let Some(event) = self.query_finished(q, parameters) {
2246                            let event = NetworkBehaviourAction::GenerateEvent(event);
2247                            self.metrics.polled_event(&event);
2248                            return Poll::Ready(event);
2249                        }
2250                    }
2251                    QueryPoolState::Timeout(mut q) => {
2252                        q.finish();
2253                        if let Some(event) = self.query_timeout(q) {
2254                            let event = NetworkBehaviourAction::GenerateEvent(event);
2255                            self.metrics.polled_event(&event);
2256                            return Poll::Ready(event);
2257                        }
2258                    }
2259                    QueryPoolState::Waiting(Some((query, peer_id))) => {
2260                        let event = query.inner.info.to_request(query.id());
2261                        // TODO: AddProvider requests yield no response, so the query completes
2262                        // as soon as all requests have been sent. However, the handler should
2263                        // better emit an event when the request has been sent (and report
2264                        // an error if sending fails), instead of immediately reporting
2265                        // "success" somewhat prematurely here.
2266                        if let QueryInfo::AddProvider {
2267                            phase: AddProviderPhase::AddProvider { .. },
2268                            ..
2269                        } = &query.inner.info {
2270                            query.on_success(&peer_id, vec![])
2271                        }
2272                        if self.connected_peers.contains(&peer_id) {
2273                            self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
2274                                peer_id, event, handler: NotifyHandler::Any
2275                            });
2276                        } else if &peer_id != self.kbuckets.local_key().preimage() {
2277                            query.inner.pending_rpcs.push((peer_id, event));
2278                            self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
2279                                peer_id, condition: DialPeerCondition::Disconnected
2280                            });
2281                        }
2282                    }
2283                    QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2284                }
2285            }
2286
2287            // No immediate event was produced as a result of a finished query.
2288            // If no new events have been queued either, signal `NotReady` to
2289            // be polled again later.
2290            if self.queued_events.is_empty() {
2291                return Poll::Pending
2292            }
2293        }
2294    }
2295}
2296
2297/// A quorum w.r.t. the configured replication factor specifies the minimum
2298/// number of distinct nodes that must be successfully contacted in order
2299/// for a query to succeed.
2300#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2301pub enum Quorum {
2302    One,
2303    Majority,
2304    All,
2305    N(NonZeroUsize)
2306}
2307
2308impl Quorum {
2309    /// Evaluate the quorum w.r.t a given total (number of peers).
2310    fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2311        match self {
2312            Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2313            Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2314            Quorum::All => total,
2315            Quorum::N(n) => NonZeroUsize::min(total, *n)
2316        }
2317    }
2318}
2319
2320/// A record either received by the given peer or retrieved from the local
2321/// record store.
2322#[derive(Debug, Clone, PartialEq, Eq)]
2323pub struct PeerRecord {
2324    /// The peer from whom the record was received. `None` if the record was
2325    /// retrieved from local storage.
2326    pub peer: Option<PeerId>,
2327    pub record: Record,
2328}
2329
2330//////////////////////////////////////////////////////////////////////////////
2331// Events
2332
2333/// The events produced by the `Kademlia` behaviour.
2334///
2335/// See [`NetworkBehaviour::poll`].
2336#[derive(Derivative)]
2337#[derivative(Debug)]
2338pub enum KademliaEvent {
2339    /// A query has produced a result.
2340    QueryResult {
2341        /// The ID of the query that finished.
2342        id: QueryId,
2343        /// The result of the query.
2344        result: QueryResult,
2345        /// Execution statistics from the query.
2346        stats: QueryStats
2347    },
2348
2349    /// The routing table has been updated with a new peer and / or
2350    /// address, thereby possibly evicting another peer.
2351    RoutingUpdated {
2352        /// The ID of the peer that was added or updated.
2353        peer: PeerId,
2354        /// The full list of known addresses of `peer`.
2355        addresses: Addresses,
2356        /// The ID of the peer that was evicted from the routing table to make
2357        /// room for the new peer, if any.
2358        old_peer: Option<PeerId>,
2359    },
2360
2361    /// A peer has connected for whom no listen address is known.
2362    ///
2363    /// If the peer is to be added to the routing table, a known
2364    /// listen address for the peer must be provided via [`Kademlia::add_address`].
2365    UnroutablePeer {
2366        peer: PeerId
2367    },
2368
2369    /// A connection to a peer has been established for whom a listen address
2370    /// is known but the peer has not been added to the routing table either
2371    /// because [`KademliaBucketInserts::Manual`] is configured or because
2372    /// the corresponding bucket is full.
2373    ///
2374    /// If the peer is to be included in the routing table, it must
2375    /// must be explicitly added via [`Kademlia::add_address`], possibly after
2376    /// removing another peer.
2377    ///
2378    /// See [`Kademlia::kbucket`] for insight into the contents of
2379    /// the k-bucket of `peer`.
2380    RoutablePeer {
2381        peer: PeerId,
2382        address: Multiaddr,
2383    },
2384
2385    /// A connection to a peer has been established for whom a listen address
2386    /// is known but the peer is only pending insertion into the routing table
2387    /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2388    /// may not make it into the routing table.
2389    ///
2390    /// If the peer is to be unconditionally included in the routing table,
2391    /// it should be explicitly added via [`Kademlia::add_address`] after
2392    /// removing another peer.
2393    ///
2394    /// See [`Kademlia::kbucket`] for insight into the contents of
2395    /// the k-bucket of `peer`.
2396    PendingRoutablePeer {
2397        peer: PeerId,
2398        address: Multiaddr,
2399    }
2400}
2401
2402/// The results of Kademlia queries.
2403#[derive(Debug)]
2404pub enum QueryResult {
2405    /// The result of [`Kademlia::bootstrap`].
2406    Bootstrap(BootstrapResult),
2407
2408    /// The result of [`Kademlia::get_closest_peers`].
2409    GetClosestPeers(GetClosestPeersResult),
2410
2411    /// The result of [`Kademlia::get_providers`].
2412    GetProviders(GetProvidersResult),
2413
2414    /// The result of [`Kademlia::start_providing`].
2415    StartProviding(AddProviderResult),
2416
2417    /// The result of a (automatic) republishing of a provider record.
2418    RepublishProvider(AddProviderResult),
2419
2420    /// The result of [`Kademlia::get_record`].
2421    GetRecord(GetRecordResult),
2422
2423    /// The result of [`Kademlia::put_record`].
2424    PutRecord(PutRecordResult),
2425
2426    /// The result of a (automatic) republishing of a (value-)record.
2427    RepublishRecord(PutRecordResult),
2428}
2429
2430/// The result of [`Kademlia::get_record`].
2431pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2432
2433/// The successful result of [`Kademlia::get_record`].
2434#[derive(Debug, Clone)]
2435pub struct GetRecordOk {
2436    pub records: Vec<PeerRecord>
2437}
2438
2439/// The error result of [`Kademlia::get_record`].
2440#[derive(Debug, Clone)]
2441pub enum GetRecordError {
2442    NotFound {
2443        key: record::Key,
2444        closest_peers: Vec<PeerId>
2445    },
2446    QuorumFailed {
2447        key: record::Key,
2448        records: Vec<PeerRecord>,
2449        quorum: NonZeroUsize
2450    },
2451    Timeout {
2452        key: record::Key,
2453        records: Vec<PeerRecord>,
2454        quorum: NonZeroUsize
2455    }
2456}
2457
2458impl GetRecordError {
2459    /// Gets the key of the record for which the operation failed.
2460    pub fn key(&self) -> &record::Key {
2461        match self {
2462            GetRecordError::QuorumFailed { key, .. } => key,
2463            GetRecordError::Timeout { key, .. } => key,
2464            GetRecordError::NotFound { key, .. } => key,
2465        }
2466    }
2467
2468    /// Extracts the key of the record for which the operation failed,
2469    /// consuming the error.
2470    pub fn into_key(self) -> record::Key {
2471        match self {
2472            GetRecordError::QuorumFailed { key, .. } => key,
2473            GetRecordError::Timeout { key, .. } => key,
2474            GetRecordError::NotFound { key, .. } => key,
2475        }
2476    }
2477}
2478
2479/// The result of [`Kademlia::put_record`].
2480pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2481
2482/// The successful result of [`Kademlia::put_record`].
2483#[derive(Debug, Clone)]
2484pub struct PutRecordOk {
2485    pub key: record::Key
2486}
2487
2488/// The error result of [`Kademlia::put_record`].
2489#[derive(Debug)]
2490pub enum PutRecordError {
2491    QuorumFailed {
2492        key: record::Key,
2493        /// [`PeerId`]s of the peers the record was successfully stored on.
2494        success: Vec<PeerId>,
2495        quorum: NonZeroUsize
2496    },
2497    Timeout {
2498        key: record::Key,
2499        /// [`PeerId`]s of the peers the record was successfully stored on.
2500        success: Vec<PeerId>,
2501        quorum: NonZeroUsize
2502    },
2503}
2504
2505impl PutRecordError {
2506    /// Gets the key of the record for which the operation failed.
2507    pub fn key(&self) -> &record::Key {
2508        match self {
2509            PutRecordError::QuorumFailed { key, .. } => key,
2510            PutRecordError::Timeout { key, .. } => key,
2511        }
2512    }
2513
2514    /// Extracts the key of the record for which the operation failed,
2515    /// consuming the error.
2516    pub fn into_key(self) -> record::Key {
2517        match self {
2518            PutRecordError::QuorumFailed { key, .. } => key,
2519            PutRecordError::Timeout { key, .. } => key,
2520        }
2521    }
2522}
2523
2524/// The result of [`Kademlia::bootstrap`].
2525pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
2526
2527/// The successful result of [`Kademlia::bootstrap`].
2528#[derive(Debug, Clone)]
2529pub struct BootstrapOk {
2530    pub peer: PeerId,
2531    pub num_remaining: u32,
2532}
2533
2534/// The error result of [`Kademlia::bootstrap`].
2535#[derive(Debug, Clone)]
2536pub enum BootstrapError {
2537    Timeout {
2538        peer: PeerId,
2539        num_remaining: Option<u32>,
2540    }
2541}
2542
2543/// The result of [`Kademlia::get_closest_peers`].
2544pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
2545
2546/// The successful result of [`Kademlia::get_closest_peers`].
2547#[derive(Debug, Clone)]
2548pub struct GetClosestPeersOk {
2549    pub key: Vec<u8>,
2550    pub peers: Vec<PeerId>
2551}
2552
2553/// The error result of [`Kademlia::get_closest_peers`].
2554#[derive(Debug, Clone)]
2555pub enum GetClosestPeersError {
2556    Timeout {
2557        key: Vec<u8>,
2558        peers: Vec<PeerId>
2559    }
2560}
2561
2562impl GetClosestPeersError {
2563    /// Gets the key for which the operation failed.
2564    pub fn key(&self) -> &Vec<u8> {
2565        match self {
2566            GetClosestPeersError::Timeout { key, .. } => key,
2567        }
2568    }
2569
2570    /// Extracts the key for which the operation failed,
2571    /// consuming the error.
2572    pub fn into_key(self) -> Vec<u8> {
2573        match self {
2574            GetClosestPeersError::Timeout { key, .. } => key,
2575        }
2576    }
2577}
2578
2579/// The result of [`Kademlia::get_providers`].
2580pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
2581
2582/// The successful result of [`Kademlia::get_providers`].
2583#[derive(Debug, Clone)]
2584pub struct GetProvidersOk {
2585    pub key: record::Key,
2586    pub providers: HashSet<PeerId>,
2587    pub closest_peers: Vec<PeerId>
2588}
2589
2590/// The error result of [`Kademlia::get_providers`].
2591#[derive(Debug, Clone)]
2592pub enum GetProvidersError {
2593    Timeout {
2594        key: record::Key,
2595        providers: HashSet<PeerId>,
2596        closest_peers: Vec<PeerId>
2597    }
2598}
2599
2600impl GetProvidersError {
2601    /// Gets the key for which the operation failed.
2602    pub fn key(&self) -> &record::Key {
2603        match self {
2604            GetProvidersError::Timeout { key, .. } => key,
2605        }
2606    }
2607
2608    /// Extracts the key for which the operation failed,
2609    /// consuming the error.
2610    pub fn into_key(self) -> record::Key {
2611        match self {
2612            GetProvidersError::Timeout { key, .. } => key,
2613        }
2614    }
2615}
2616
2617/// The result of publishing a provider record.
2618pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
2619
2620/// The successful result of publishing a provider record.
2621#[derive(Debug, Clone)]
2622pub struct AddProviderOk {
2623    pub key: record::Key,
2624}
2625
2626/// The possible errors when publishing a provider record.
2627#[derive(Debug)]
2628pub enum AddProviderError {
2629    /// The query timed out.
2630    Timeout {
2631        key: record::Key,
2632    },
2633}
2634
2635impl AddProviderError {
2636    /// Gets the key for which the operation failed.
2637    pub fn key(&self) -> &record::Key {
2638        match self {
2639            AddProviderError::Timeout { key, .. } => key,
2640        }
2641    }
2642
2643    /// Extracts the key for which the operation failed,
2644    pub fn into_key(self) -> record::Key {
2645        match self {
2646            AddProviderError::Timeout { key, .. } => key,
2647        }
2648    }
2649}
2650
2651impl From<kbucket::EntryView<kbucket::Key<PeerId>, Contact>> for KadPeer {
2652    fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Contact>) -> KadPeer {
2653        let Contact { addresses, public_key } = e.node.value;
2654        KadPeer {
2655            public_key,
2656            node_id: e.node.key.into_preimage(),
2657            multiaddrs: addresses.into_vec(),
2658            connection_ty: match e.status {
2659                NodeStatus::Connected => KadConnectionType::Connected,
2660                NodeStatus::Disconnected => KadConnectionType::NotConnected
2661            },
2662            certificates: vec![]
2663        }
2664    }
2665}
2666
2667//////////////////////////////////////////////////////////////////////////////
2668// Internal query state
2669
2670struct QueryInner {
2671    /// The query-specific state.
2672    info: QueryInfo,
2673    /// Contacts of peers discovered during a query.
2674    contacts: FnvHashMap<PeerId, Contact>,
2675    /// A map of pending requests to peers.
2676    ///
2677    /// A request is pending if the targeted peer is not currently connected
2678    /// and these requests are sent as soon as a connection to the peer is established.
2679    pending_rpcs: SmallVec<[(PeerId, KademliaHandlerIn<QueryId>); K_VALUE.get()]>
2680}
2681
2682impl QueryInner {
2683    fn new(info: QueryInfo) -> Self {
2684        QueryInner {
2685            info,
2686            contacts: Default::default(),
2687            pending_rpcs: SmallVec::default()
2688        }
2689    }
2690}
2691
2692/// The context of a [`QueryInfo::AddProvider`] query.
2693#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2694pub enum AddProviderContext {
2695    Publish,
2696    Republish,
2697}
2698
2699/// The context of a [`QueryInfo::PutRecord`] query.
2700#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2701pub enum PutRecordContext {
2702    Publish,
2703    Republish,
2704    Replicate,
2705    Cache,
2706}
2707
2708/// Information about a running query.
2709#[derive(Debug, Clone)]
2710pub enum QueryInfo {
2711    /// A query initiated by [`Kademlia::bootstrap`].
2712    Bootstrap {
2713        /// The targeted peer ID.
2714        peer: PeerId,
2715        /// The remaining random peer IDs to query, one per
2716        /// bucket that still needs refreshing.
2717        ///
2718        /// This is `None` if the initial self-lookup has not
2719        /// yet completed and `Some` with an exhausted iterator
2720        /// if bootstrapping is complete.
2721        remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>
2722    },
2723
2724    /// A query initiated by [`Kademlia::get_closest_peers`].
2725    GetClosestPeers { key: Vec<u8> },
2726
2727    /// A query initiated by [`Kademlia::get_providers`].
2728    GetProviders {
2729        /// The key for which to search for providers.
2730        key: record::Key,
2731        /// The found providers.
2732        providers: HashSet<PeerId>,
2733    },
2734
2735    /// A (repeated) query initiated by [`Kademlia::start_providing`].
2736    AddProvider {
2737        /// The record key.
2738        key: record::Key,
2739        /// The current phase of the query.
2740        phase: AddProviderPhase,
2741        /// The execution context of the query.
2742        context: AddProviderContext,
2743        /// Public key of the provider
2744        provider_key: PublicKey,
2745        /// Certificates known for the provider
2746        certificates: Vec<Certificate>,
2747    },
2748
2749    /// A (repeated) query initiated by [`Kademlia::put_record`].
2750    PutRecord {
2751        record: Record,
2752        /// The expected quorum of responses w.r.t. the replication factor.
2753        quorum: NonZeroUsize,
2754        /// The current phase of the query.
2755        phase: PutRecordPhase,
2756        /// The execution context of the query.
2757        context: PutRecordContext,
2758    },
2759
2760    /// A query initiated by [`Kademlia::get_record`].
2761    GetRecord {
2762        /// The key to look for.
2763        key: record::Key,
2764        /// The records with the id of the peer that returned them. `None` when
2765        /// the record was found in the local store.
2766        records: Vec<PeerRecord>,
2767        /// The number of records to look for.
2768        quorum: NonZeroUsize,
2769        /// The closest peer to `key` that did not return a record.
2770        ///
2771        /// When a record is found in a standard Kademlia query (quorum == 1),
2772        /// it is cached at this peer as soon as a record is found.
2773        cache_at: Option<kbucket::Key<PeerId>>,
2774    },
2775}
2776
2777impl QueryInfo {
2778    /// Creates an event for a handler to issue an outgoing request in the
2779    /// context of a query.
2780    fn to_request(&self, query_id: QueryId) -> KademliaHandlerIn<QueryId> {
2781        match &self {
2782            QueryInfo::Bootstrap { peer, .. } => KademliaHandlerIn::FindNodeReq {
2783                key: peer.to_bytes(),
2784                user_data: query_id,
2785            },
2786            QueryInfo::GetClosestPeers { key, .. } => KademliaHandlerIn::FindNodeReq {
2787                key: key.clone(),
2788                user_data: query_id,
2789            },
2790            QueryInfo::GetProviders { key, .. } => KademliaHandlerIn::GetProvidersReq {
2791                key: key.clone(),
2792                user_data: query_id,
2793            },
2794            QueryInfo::AddProvider { key, phase, provider_key, certificates, .. } => match phase {
2795                AddProviderPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq {
2796                    key: key.to_vec(),
2797                    user_data: query_id,
2798                },
2799                AddProviderPhase::AddProvider {
2800                    provider_id,
2801                    external_addresses,
2802                    ..
2803                } => {
2804                    KademliaHandlerIn::AddProvider {
2805                        key: key.clone(),
2806                        provider: crate::protocol::KadPeer {
2807                            public_key: provider_key.clone(),
2808                            node_id: *provider_id,
2809                            multiaddrs: external_addresses.clone(),
2810                            connection_ty: crate::protocol::KadConnectionType::Connected,
2811                            certificates: certificates.clone(),
2812                        }
2813                    }
2814                }
2815            },
2816            QueryInfo::GetRecord { key, .. } => KademliaHandlerIn::GetRecord {
2817                key: key.clone(),
2818                user_data: query_id,
2819            },
2820            QueryInfo::PutRecord { record, phase, .. } => match phase {
2821                PutRecordPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq {
2822                    key: record.key.to_vec(),
2823                    user_data: query_id,
2824                },
2825                PutRecordPhase::PutRecord { .. } => KademliaHandlerIn::PutRecord {
2826                    record: record.clone(),
2827                    user_data: query_id
2828                }
2829            }
2830        }
2831    }
2832}
2833
2834/// The phases of a [`QueryInfo::AddProvider`] query.
2835#[derive(Debug, Clone)]
2836pub enum AddProviderPhase {
2837    /// The query is searching for the closest nodes to the record key.
2838    GetClosestPeers,
2839
2840    /// The query advertises the local node as a provider for the key to
2841    /// the closest nodes to the key.
2842    AddProvider {
2843        /// The local peer ID that is advertised as a provider.
2844        provider_id: PeerId,
2845        /// The external addresses of the provider being advertised.
2846        external_addresses: Vec<Multiaddr>,
2847        /// Query statistics from the finished `GetClosestPeers` phase.
2848        get_closest_peers_stats: QueryStats,
2849    },
2850}
2851
2852/// The phases of a [`QueryInfo::PutRecord`] query.
2853#[derive(Debug, Clone, PartialEq, Eq)]
2854pub enum PutRecordPhase {
2855    /// The query is searching for the closest nodes to the record key.
2856    GetClosestPeers,
2857
2858    /// The query is replicating the record to the closest nodes to the key.
2859    PutRecord {
2860        /// A list of peers the given record has been successfully replicated to.
2861        success: Vec<PeerId>,
2862        /// Query statistics from the finished `GetClosestPeers` phase.
2863        get_closest_peers_stats: QueryStats,
2864    },
2865}
2866
2867/// A mutable reference to a running query.
2868pub struct QueryMut<'a> {
2869    query: &'a mut Query<QueryInner>,
2870}
2871
2872impl<'a> QueryMut<'a> {
2873    pub fn id(&self) -> QueryId {
2874        self.query.id()
2875    }
2876
2877    /// Gets information about the type and state of the query.
2878    pub fn info(&self) -> &QueryInfo {
2879        &self.query.inner.info
2880    }
2881
2882    /// Gets execution statistics about the query.
2883    ///
2884    /// For a multi-phase query such as `put_record`, these are the
2885    /// statistics of the current phase.
2886    pub fn stats(&self) -> &QueryStats {
2887        self.query.stats()
2888    }
2889
2890    /// Finishes the query asap, without waiting for the
2891    /// regular termination conditions.
2892    pub fn finish(&mut self) {
2893        self.query.finish()
2894    }
2895}
2896
2897/// An immutable reference to a running query.
2898pub struct QueryRef<'a> {
2899    query: &'a Query<QueryInner>,
2900}
2901
2902impl<'a> QueryRef<'a> {
2903    pub fn id(&self) -> QueryId {
2904        self.query.id()
2905    }
2906
2907    /// Gets information about the type and state of the query.
2908    pub fn info(&self) -> &QueryInfo {
2909        &self.query.inner.info
2910    }
2911
2912    /// Gets execution statistics about the query.
2913    ///
2914    /// For a multi-phase query such as `put_record`, these are the
2915    /// statistics of the current phase.
2916    pub fn stats(&self) -> &QueryStats {
2917        self.query.stats()
2918    }
2919}
2920
2921/// An operation failed to due no known peers in the routing table.
2922#[derive(Debug, Clone)]
2923pub struct NoKnownPeers();
2924
2925impl fmt::Display for NoKnownPeers {
2926    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2927        write!(f, "No known peers.")
2928    }
2929}
2930
2931impl std::error::Error for NoKnownPeers {}
2932
2933/// The possible outcomes of [`Kademlia::add_address`].
2934pub enum RoutingUpdate {
2935    /// The given peer and address has been added to the routing
2936    /// table.
2937    Success,
2938    /// The peer and address is pending insertion into
2939    /// the routing table, if a disconnected peer fails
2940    /// to respond. If the given peer and address ends up
2941    /// in the routing table, [`KademliaEvent::RoutingUpdated`]
2942    /// is eventually emitted.
2943    Pending,
2944    /// The routing table update failed, either because the
2945    /// corresponding bucket for the peer is full and the
2946    /// pending slot(s) are occupied, or because the given
2947    /// peer ID is deemed invalid (e.g. refers to the local
2948    /// peer ID).
2949    Failed,
2950}
2951
2952/// The maximum number of local external addresses. When reached any
2953/// further externally reported addresses are ignored. The behaviour always
2954/// tracks all its listen addresses.
2955const MAX_LOCAL_EXTERNAL_ADDRS: usize = 20;