ant_libp2p_kad/
behaviour.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use ant_libp2p_core as libp2p_core;
26use ant_libp2p_swarm as libp2p_swarm;
27
28use std::{
29    collections::{BTreeMap, HashMap, HashSet, VecDeque},
30    fmt,
31    num::NonZeroUsize,
32    task::{Context, Poll, Waker},
33    time::Duration,
34    vec,
35};
36
37use fnv::FnvHashSet;
38use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
39use libp2p_identity::PeerId;
40use libp2p_swarm::{
41    behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
42    dial_opts::{self, DialOpts},
43    ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
44    ListenAddresses, NetworkBehaviour, NotifyHandler, StreamProtocol, THandler, THandlerInEvent,
45    THandlerOutEvent, ToSwarm,
46};
47use thiserror::Error;
48use tracing::Level;
49use web_time::Instant;
50
51pub use crate::query::QueryStats;
52use crate::{
53    addresses::Addresses,
54    bootstrap,
55    handler::{Handler, HandlerEvent, HandlerIn, RequestId},
56    jobs::*,
57    kbucket::{self, Distance, KBucketConfig, KBucketsTable, NodeStatus},
58    protocol,
59    protocol::{ConnectionType, KadPeer, ProtocolConfig},
60    query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState},
61    record::{
62        self,
63        store::{self, RecordStore},
64        ProviderRecord, Record,
65    },
66    K_VALUE,
67};
68
69/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
70/// Kademlia protocol.
71pub struct Behaviour<TStore> {
72    /// The Kademlia routing table.
73    kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
74
75    /// The k-bucket insertion strategy.
76    kbucket_inserts: BucketInserts,
77
78    /// Configuration of the wire protocol.
79    protocol_config: ProtocolConfig,
80
81    /// Configuration of [`RecordStore`] filtering.
82    record_filtering: StoreInserts,
83
84    /// The currently active (i.e. in-progress) queries.
85    queries: QueryPool,
86
87    /// The currently connected peers.
88    ///
89    /// This is a superset of the connected peers currently in the routing table.
90    connected_peers: FnvHashSet<PeerId>,
91
92    /// Periodic job for re-publication of provider records for keys
93    /// provided by the local node.
94    add_provider_job: Option<AddProviderJob>,
95
96    /// Periodic job for (re-)replication and (re-)publishing of
97    /// regular (value-)records.
98    put_record_job: Option<PutRecordJob>,
99
100    /// The TTL of regular (value-)records.
101    record_ttl: Option<Duration>,
102
103    /// The TTL of provider records.
104    provider_record_ttl: Option<Duration>,
105
106    /// Queued events to return when the behaviour is being polled.
107    queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
108
109    listen_addresses: ListenAddresses,
110
111    external_addresses: ExternalAddresses,
112
113    connections: HashMap<ConnectionId, PeerId>,
114
115    /// See [`Config::caching`].
116    caching: Caching,
117
118    local_peer_id: PeerId,
119
120    mode: Mode,
121    auto_mode: bool,
122    no_events_waker: Option<Waker>,
123
124    /// The record storage.
125    store: TStore,
126
127    /// Tracks the status of the current bootstrap.
128    bootstrap_status: bootstrap::Status,
129}
130
131/// The configurable strategies for the insertion of peers
132/// and their addresses into the k-buckets of the Kademlia
133/// routing table.
134#[derive(Copy, Clone, Debug, PartialEq, Eq)]
135pub enum BucketInserts {
136    /// Whenever a connection to a peer is established as a
137    /// result of a dialing attempt and that peer is not yet
138    /// in the routing table, it is inserted as long as there
139    /// is a free slot in the corresponding k-bucket. If the
140    /// k-bucket is full but still has a free pending slot,
141    /// it may be inserted into the routing table at a later time if an unresponsive
142    /// disconnected peer is evicted from the bucket.
143    OnConnected,
144    /// New peers and addresses are only added to the routing table via
145    /// explicit calls to [`Behaviour::add_address`].
146    ///
147    /// > **Note**: Even though peers can only get into the
148    /// > routing table as a result of [`Behaviour::add_address`],
149    /// > routing table entries are still updated as peers
150    /// > connect and disconnect (i.e. the order of the entries
151    /// > as well as the network addresses).
152    Manual,
153}
154
155/// The configurable filtering strategies for the acceptance of
156/// incoming records.
157///
158/// This can be used for e.g. signature verification or validating
159/// the accompanying [`Key`].
160///
161/// [`Key`]: crate::record::Key
162#[derive(Copy, Clone, Debug, PartialEq, Eq)]
163pub enum StoreInserts {
164    /// Whenever a (provider) record is received,
165    /// the record is forwarded immediately to the [`RecordStore`].
166    Unfiltered,
167    /// Whenever a (provider) record is received, an event is emitted.
168    /// Provider records generate a [`InboundRequest::AddProvider`] under
169    /// [`Event::InboundRequest`], normal records generate a [`InboundRequest::PutRecord`]
170    /// under [`Event::InboundRequest`].
171    ///
172    /// When deemed valid, a (provider) record needs to be explicitly stored in
173    /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
174    /// whichever is applicable. A mutable reference to the [`RecordStore`] can
175    /// be retrieved via [`Behaviour::store_mut`].
176    FilterBoth,
177}
178
179/// The configuration for the `Kademlia` behaviour.
180///
181/// The configuration is consumed by [`Behaviour::new`].
182#[derive(Debug, Clone)]
183pub struct Config {
184    kbucket_config: KBucketConfig,
185    query_config: QueryConfig,
186    protocol_config: ProtocolConfig,
187    record_ttl: Option<Duration>,
188    record_replication_interval: Option<Duration>,
189    record_publication_interval: Option<Duration>,
190    record_filtering: StoreInserts,
191    provider_record_ttl: Option<Duration>,
192    provider_publication_interval: Option<Duration>,
193    kbucket_inserts: BucketInserts,
194    caching: Caching,
195    periodic_bootstrap_interval: Option<Duration>,
196    automatic_bootstrap_throttle: Option<Duration>,
197}
198
199impl Default for Config {
200    /// Returns the default configuration.
201    ///
202    /// Deprecated: use `Config::new` instead.
203    fn default() -> Self {
204        Self::new(protocol::DEFAULT_PROTO_NAME)
205    }
206}
207
208/// The configuration for Kademlia "write-back" caching after successful
209/// lookups via [`Behaviour::get_record`].
210#[derive(Debug, Clone)]
211pub enum Caching {
212    /// Caching is disabled and the peers closest to records being looked up
213    /// that do not return a record are not tracked, i.e.
214    /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
215    Disabled,
216    /// Up to `max_peers` peers not returning a record that are closest to the key
217    /// being looked up are tracked and returned in
218    /// [`GetRecordOk::FinishedWithNoAdditionalRecord`]. The write-back operation must be
219    /// performed explicitly, if desired and after choosing a record from the results, via
220    /// [`Behaviour::put_record_to`].
221    Enabled { max_peers: u16 },
222}
223
224impl Config {
225    /// Builds a new `Config` with the given protocol name.
226    pub fn new(protocol_name: StreamProtocol) -> Self {
227        Config {
228            kbucket_config: KBucketConfig::default(),
229            query_config: QueryConfig::default(),
230            protocol_config: ProtocolConfig::new(protocol_name),
231            record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
232            record_replication_interval: Some(Duration::from_secs(60 * 60)),
233            record_publication_interval: Some(Duration::from_secs(22 * 60 * 60)),
234            record_filtering: StoreInserts::Unfiltered,
235            provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
236            provider_record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
237            kbucket_inserts: BucketInserts::OnConnected,
238            caching: Caching::Enabled { max_peers: 1 },
239            periodic_bootstrap_interval: Some(Duration::from_secs(5 * 60)),
240            automatic_bootstrap_throttle: Some(bootstrap::DEFAULT_AUTOMATIC_THROTTLE),
241        }
242    }
243
244    /// Returns the default configuration.
245    #[deprecated(note = "Use `Config::new` instead")]
246    #[allow(clippy::should_implement_trait)]
247    pub fn default() -> Self {
248        Default::default()
249    }
250
251    /// Sets custom protocol names.
252    ///
253    /// Kademlia nodes only communicate with other nodes using the same protocol
254    /// name. Using custom name(s) therefore allows to segregate the DHT from
255    /// others, if that is desired.
256    ///
257    /// More than one protocol name can be supplied. In this case the node will
258    /// be able to talk to other nodes supporting any of the provided names.
259    /// Multiple names must be used with caution to avoid network partitioning.
260    #[deprecated(note = "Use `Config::new` instead")]
261    #[allow(deprecated)]
262    pub fn set_protocol_names(&mut self, names: Vec<StreamProtocol>) -> &mut Self {
263        self.protocol_config.set_protocol_names(names);
264        self
265    }
266
267    /// Sets the timeout for a single query.
268    ///
269    /// > **Note**: A single query usually comprises at least as many requests
270    /// > as the replication factor, i.e. this is not a request timeout.
271    ///
272    /// The default is 60 seconds.
273    pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
274        self.query_config.timeout = timeout;
275        self
276    }
277
278    /// Sets the replication factor to use.
279    ///
280    /// The replication factor determines to how many closest peers
281    /// a record is replicated. The default is [`crate::K_VALUE`].
282    pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
283        self.query_config.replication_factor = replication_factor;
284        self
285    }
286
287    /// Sets the allowed level of parallelism for iterative queries.
288    ///
289    /// The `α` parameter in the Kademlia paper. The maximum number of peers
290    /// that an iterative query is allowed to wait for in parallel while
291    /// iterating towards the closest nodes to a target. Defaults to
292    /// `ALPHA_VALUE`.
293    ///
294    /// This only controls the level of parallelism of an iterative query, not
295    /// the level of parallelism of a query to a fixed set of peers.
296    ///
297    /// When used with [`Config::disjoint_query_paths`] it equals
298    /// the amount of disjoint paths used.
299    pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
300        self.query_config.parallelism = parallelism;
301        self
302    }
303
304    /// Require iterative queries to use disjoint paths for increased resiliency
305    /// in the presence of potentially adversarial nodes.
306    ///
307    /// When enabled the number of disjoint paths used equals the configured
308    /// parallelism.
309    ///
310    /// See the S/Kademlia paper for more information on the high level design
311    /// as well as its security improvements.
312    pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
313        self.query_config.disjoint_query_paths = enabled;
314        self
315    }
316
317    /// Sets the TTL for stored records.
318    ///
319    /// The TTL should be significantly longer than the (re-)publication
320    /// interval, to avoid premature expiration of records. The default is 36
321    /// hours.
322    ///
323    /// `None` means records never expire.
324    ///
325    /// Does not apply to provider records.
326    pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
327        self.record_ttl = record_ttl;
328        self
329    }
330
331    /// Sets whether or not records should be filtered before being stored.
332    ///
333    /// See [`StoreInserts`] for the different values.
334    /// Defaults to [`StoreInserts::Unfiltered`].
335    pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
336        self.record_filtering = filtering;
337        self
338    }
339
340    /// Sets the (re-)replication interval for stored records.
341    ///
342    /// Periodic replication of stored records ensures that the records
343    /// are always replicated to the available nodes closest to the key in the
344    /// context of DHT topology changes (i.e. nodes joining and leaving), thus
345    /// ensuring persistence until the record expires. Replication does not
346    /// prolong the regular lifetime of a record (for otherwise it would live
347    /// forever regardless of the configured TTL). The expiry of a record
348    /// is only extended through re-publication.
349    ///
350    /// This interval should be significantly shorter than the publication
351    /// interval, to ensure persistence between re-publications. The default
352    /// is 1 hour.
353    ///
354    /// `None` means that stored records are never re-replicated.
355    ///
356    /// Does not apply to provider records.
357    pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
358        self.record_replication_interval = interval;
359        self
360    }
361
362    /// Sets the (re-)publication interval of stored records.
363    ///
364    /// Records persist in the DHT until they expire. By default, published
365    /// records are re-published in regular intervals for as long as the record
366    /// exists in the local storage of the original publisher, thereby extending
367    /// the records lifetime.
368    ///
369    /// This interval should be significantly shorter than the record TTL, to
370    /// ensure records do not expire prematurely. The default is 24 hours.
371    ///
372    /// `None` means that stored records are never automatically re-published.
373    ///
374    /// Does not apply to provider records.
375    pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
376        self.record_publication_interval = interval;
377        self
378    }
379
380    /// Sets the TTL for provider records.
381    ///
382    /// `None` means that stored provider records never expire.
383    ///
384    /// Must be significantly larger than the provider publication interval.
385    pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
386        self.provider_record_ttl = ttl;
387        self
388    }
389
390    /// Sets the interval at which provider records for keys provided
391    /// by the local node are re-published.
392    ///
393    /// `None` means that stored provider records are never automatically
394    /// re-published.
395    ///
396    /// Must be significantly less than the provider record TTL.
397    pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
398        self.provider_publication_interval = interval;
399        self
400    }
401
402    /// Modifies the maximum allowed size of individual Kademlia packets.
403    ///
404    /// It might be necessary to increase this value if trying to put large
405    /// records.
406    pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
407        self.protocol_config.set_max_packet_size(size);
408        self
409    }
410
411    /// Sets the k-bucket insertion strategy for the Kademlia routing table.
412    pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
413        self.kbucket_inserts = inserts;
414        self
415    }
416
417    /// Sets the [`Caching`] strategy to use for successful lookups.
418    ///
419    /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
420    /// Hence, with default settings and a lookup quorum of 1, a successful lookup
421    /// will result in the record being cached at the closest node to the key that
422    /// did not return the record, i.e. the standard Kademlia behaviour.
423    pub fn set_caching(&mut self, c: Caching) -> &mut Self {
424        self.caching = c;
425        self
426    }
427
428    /// Sets the interval on which [`Behaviour::bootstrap`] is called periodically.
429    ///
430    /// * Default to `5` minutes.
431    /// * Set to `None` to disable periodic bootstrap.
432    pub fn set_periodic_bootstrap_interval(&mut self, interval: Option<Duration>) -> &mut Self {
433        self.periodic_bootstrap_interval = interval;
434        self
435    }
436
437    /// Sets the configuration for the k-buckets.
438    ///
439    /// * Default to K_VALUE.
440    pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
441        self.kbucket_config.set_bucket_size(size);
442        self
443    }
444
445    /// Sets the timeout duration after creation of a pending entry after which
446    /// it becomes eligible for insertion into a full bucket, replacing the
447    /// least-recently (dis)connected node.
448    ///
449    /// * Default to `60` s.
450    pub fn set_kbucket_pending_timeout(&mut self, timeout: Duration) -> &mut Self {
451        self.kbucket_config.set_pending_timeout(timeout);
452        self
453    }
454
455    /// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted
456    /// in the routing table. This prevent cascading bootstrap requests when multiple peers are
457    /// inserted into the routing table "at the same time". This also allows to wait a little
458    /// bit for other potential peers to be inserted into the routing table before triggering a
459    /// bootstrap, giving more context to the future bootstrap request.
460    ///
461    /// * Default to `500` ms.
462    /// * Set to `Some(Duration::ZERO)` to never wait before triggering a bootstrap request when a
463    ///   new peer is inserted in the routing table.
464    /// * Set to `None` to disable automatic bootstrap (no bootstrap request will be triggered when
465    ///   a new peer is inserted in the routing table).
466    #[cfg(test)]
467    pub(crate) fn set_automatic_bootstrap_throttle(
468        &mut self,
469        duration: Option<Duration>,
470    ) -> &mut Self {
471        self.automatic_bootstrap_throttle = duration;
472        self
473    }
474}
475
476impl<TStore> Behaviour<TStore>
477where
478    TStore: RecordStore + Send + 'static,
479{
480    /// Creates a new `Kademlia` network behaviour with a default configuration.
481    pub fn new(id: PeerId, store: TStore) -> Self {
482        Self::with_config(id, store, Default::default())
483    }
484
485    /// Get the protocol name of this kademlia instance.
486    pub fn protocol_names(&self) -> &[StreamProtocol] {
487        self.protocol_config.protocol_names()
488    }
489
490    /// Creates a new `Kademlia` network behaviour with the given configuration.
491    pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
492        let local_key = kbucket::Key::from(id);
493
494        let put_record_job = config
495            .record_replication_interval
496            .or(config.record_publication_interval)
497            .map(|interval| {
498                PutRecordJob::new(
499                    id,
500                    interval,
501                    config.record_publication_interval,
502                    config.record_ttl,
503                )
504            });
505
506        let add_provider_job = config
507            .provider_publication_interval
508            .map(AddProviderJob::new);
509
510        Behaviour {
511            store,
512            caching: config.caching,
513            kbuckets: KBucketsTable::new(local_key, config.kbucket_config),
514            kbucket_inserts: config.kbucket_inserts,
515            protocol_config: config.protocol_config,
516            record_filtering: config.record_filtering,
517            queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
518            listen_addresses: Default::default(),
519            queries: QueryPool::new(config.query_config),
520            connected_peers: Default::default(),
521            add_provider_job,
522            put_record_job,
523            record_ttl: config.record_ttl,
524            provider_record_ttl: config.provider_record_ttl,
525            external_addresses: Default::default(),
526            local_peer_id: id,
527            connections: Default::default(),
528            mode: Mode::Client,
529            auto_mode: true,
530            no_events_waker: None,
531            bootstrap_status: bootstrap::Status::new(
532                config.periodic_bootstrap_interval,
533                config.automatic_bootstrap_throttle,
534            ),
535        }
536    }
537
538    /// Gets an iterator over immutable references to all running queries.
539    pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
540        self.queries.iter().filter_map(|query| {
541            if !query.is_finished() {
542                Some(QueryRef { query })
543            } else {
544                None
545            }
546        })
547    }
548
549    /// Gets an iterator over mutable references to all running queries.
550    pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
551        self.queries.iter_mut().filter_map(|query| {
552            if !query.is_finished() {
553                Some(QueryMut { query })
554            } else {
555                None
556            }
557        })
558    }
559
560    /// Gets an immutable reference to a running query, if it exists.
561    pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
562        self.queries.get(id).and_then(|query| {
563            if !query.is_finished() {
564                Some(QueryRef { query })
565            } else {
566                None
567            }
568        })
569    }
570
571    /// Gets a mutable reference to a running query, if it exists.
572    pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
573        self.queries.get_mut(id).and_then(|query| {
574            if !query.is_finished() {
575                Some(QueryMut { query })
576            } else {
577                None
578            }
579        })
580    }
581
582    /// Adds a known listen address of a peer participating in the DHT to the
583    /// routing table.
584    ///
585    /// Explicitly adding addresses of peers serves two purposes:
586    ///
587    ///   1. In order for a node to join the DHT, it must know about at least one other node of the
588    ///      DHT.
589    ///
590    ///   2. When a remote peer initiates a connection and that peer is not yet in the routing
591    ///      table, the `Kademlia` behaviour must be informed of an address on which that peer is
592    ///      listening for connections before it can be added to the routing table from where it can
593    ///      subsequently be discovered by all peers in the DHT.
594    ///
595    /// If the routing table has been updated as a result of this operation,
596    /// a [`Event::RoutingUpdated`] event is emitted.
597    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
598        // ensuring address is a fully-qualified /p2p multiaddr
599        let Ok(address) = address.with_p2p(*peer) else {
600            return RoutingUpdate::Failed;
601        };
602        let key = kbucket::Key::from(*peer);
603        match self.kbuckets.entry(&key) {
604            Some(kbucket::Entry::Present(mut entry, _)) => {
605                if entry.value().insert(address) {
606                    self.queued_events
607                        .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
608                            peer: *peer,
609                            is_new_peer: false,
610                            addresses: entry.value().clone(),
611                            old_peer: None,
612                            bucket_range: self
613                                .kbuckets
614                                .bucket(&key)
615                                .map(|b| b.range())
616                                .expect("Not kbucket::Entry::SelfEntry."),
617                        }))
618                }
619                RoutingUpdate::Success
620            }
621            Some(kbucket::Entry::Pending(mut entry, _)) => {
622                entry.value().insert(address);
623                RoutingUpdate::Pending
624            }
625            Some(kbucket::Entry::Absent(entry)) => {
626                let addresses = Addresses::new(address);
627                let status = if self.connected_peers.contains(peer) {
628                    NodeStatus::Connected
629                } else {
630                    NodeStatus::Disconnected
631                };
632                match entry.insert(addresses.clone(), status) {
633                    kbucket::InsertResult::Inserted => {
634                        self.bootstrap_on_low_peers();
635
636                        self.queued_events.push_back(ToSwarm::GenerateEvent(
637                            Event::RoutingUpdated {
638                                peer: *peer,
639                                is_new_peer: true,
640                                addresses,
641                                old_peer: None,
642                                bucket_range: self
643                                    .kbuckets
644                                    .bucket(&key)
645                                    .map(|b| b.range())
646                                    .expect("Not kbucket::Entry::SelfEntry."),
647                            },
648                        ));
649                        RoutingUpdate::Success
650                    }
651                    kbucket::InsertResult::Full => {
652                        tracing::debug!(%peer, "Bucket full. Peer not added to routing table");
653                        RoutingUpdate::Failed
654                    }
655                    kbucket::InsertResult::Pending { disconnected } => {
656                        self.queued_events.push_back(ToSwarm::Dial {
657                            opts: DialOpts::peer_id(disconnected.into_preimage()).build(),
658                        });
659                        RoutingUpdate::Pending
660                    }
661                }
662            }
663            None => RoutingUpdate::Failed,
664        }
665    }
666
667    /// Removes an address of a peer from the routing table.
668    ///
669    /// If the given address is the last address of the peer in the
670    /// routing table, the peer is removed from the routing table
671    /// and `Some` is returned with a view of the removed entry.
672    /// The same applies if the peer is currently pending insertion
673    /// into the routing table.
674    ///
675    /// If the given peer or address is not in the routing table,
676    /// this is a no-op.
677    pub fn remove_address(
678        &mut self,
679        peer: &PeerId,
680        address: &Multiaddr,
681    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
682        let address = &address.to_owned().with_p2p(*peer).ok()?;
683        let key = kbucket::Key::from(*peer);
684        match self.kbuckets.entry(&key)? {
685            kbucket::Entry::Present(mut entry, _) => {
686                if entry.value().remove(address).is_err() {
687                    Some(entry.remove()) // it is the last address, thus remove the peer.
688                } else {
689                    None
690                }
691            }
692            kbucket::Entry::Pending(mut entry, _) => {
693                if entry.value().remove(address).is_err() {
694                    Some(entry.remove()) // it is the last address, thus remove the peer.
695                } else {
696                    None
697                }
698            }
699            kbucket::Entry::Absent(..) => None,
700        }
701    }
702
703    /// Removes a peer from the routing table.
704    ///
705    /// Returns `None` if the peer was not in the routing table,
706    /// not even pending insertion.
707    pub fn remove_peer(
708        &mut self,
709        peer: &PeerId,
710    ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
711        let key = kbucket::Key::from(*peer);
712        match self.kbuckets.entry(&key)? {
713            kbucket::Entry::Present(entry, _) => Some(entry.remove()),
714            kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
715            kbucket::Entry::Absent(..) => None,
716        }
717    }
718
719    /// Returns an iterator over all non-empty buckets in the routing table.
720    pub fn kbuckets(
721        &mut self,
722    ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
723        self.kbuckets.iter().filter(|b| !b.is_empty())
724    }
725
726    /// Returns the k-bucket for the distance to the given key.
727    ///
728    /// Returns `None` if the given key refers to the local key.
729    pub fn kbucket<K>(
730        &mut self,
731        key: K,
732    ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
733    where
734        K: Into<kbucket::Key<K>> + Clone,
735    {
736        self.kbuckets.bucket(&key.into())
737    }
738
739    /// Initiates an iterative query for the closest peers to the given key.
740    ///
741    /// The result of the query is delivered in a
742    /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
743    pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
744    where
745        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
746    {
747        self.get_closest_peers_inner(key, None)
748    }
749
750    /// Initiates an iterative query for the closest peers to the given key.
751    /// The expected responding peers is specified by `num_results`
752    /// Note that the result is capped after exceeds K_VALUE
753    ///
754    /// The result of the query is delivered in a
755    /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
756    pub fn get_n_closest_peers<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
757    where
758        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
759    {
760        // The inner code never expect higher than K_VALUE results to be returned.
761        // And removing such cap will be tricky,
762        // since it would involve forging a new key and additional requests.
763        // Hence bound to K_VALUE here to set clear expectation and prevent unexpected behaviour.
764        let capped_num_results = std::cmp::min(num_results, K_VALUE);
765        self.get_closest_peers_inner(key, Some(capped_num_results))
766    }
767
768    fn get_closest_peers_inner<K>(&mut self, key: K, num_results: Option<NonZeroUsize>) -> QueryId
769    where
770        K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
771    {
772        let target: kbucket::Key<K> = key.clone().into();
773        let key: Vec<u8> = key.into();
774        let info = QueryInfo::GetClosestPeers {
775            key,
776            step: ProgressStep::first(),
777            num_results,
778        };
779        let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
780        self.queries.add_iter_closest(target, peer_keys, info)
781    }
782
783    /// Returns all peers ordered by distance to the given key; takes peers from local routing table
784    /// only.
785    pub fn get_closest_local_peers<'a, K: Clone>(
786        &'a mut self,
787        key: &'a kbucket::Key<K>,
788    ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
789        self.kbuckets.closest_keys(key)
790    }
791
792    /// Finds the closest peers to a `key` in the context of a request by the `source` peer, such
793    /// that the `source` peer is never included in the result.
794    ///
795    /// Takes peers from local routing table only. Only returns number of peers equal to configured
796    /// replication factor.
797    pub fn find_closest_local_peers<'a, K: Clone>(
798        &'a mut self,
799        key: &'a kbucket::Key<K>,
800        source: &'a PeerId,
801    ) -> impl Iterator<Item = KadPeer> + 'a {
802        self.kbuckets
803            .closest(key)
804            .filter(move |e| e.node.key.preimage() != source)
805            .take(self.queries.config().replication_factor.get())
806            .map(KadPeer::from)
807    }
808
809    /// Performs a lookup for a record in the DHT.
810    ///
811    /// The result of this operation is delivered in a
812    /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
813    pub fn get_record(&mut self, key: record::Key) -> QueryId {
814        let record = if let Some(record) = self.store.get(&key) {
815            if record.is_expired(Instant::now()) {
816                self.store.remove(&key);
817                None
818            } else {
819                Some(PeerRecord {
820                    peer: None,
821                    record: record.into_owned(),
822                })
823            }
824        } else {
825            None
826        };
827
828        let step = ProgressStep::first();
829
830        let target = kbucket::Key::new(key.clone());
831        let info = if record.is_some() {
832            QueryInfo::GetRecord {
833                key,
834                step: step.next(),
835                found_a_record: true,
836                cache_candidates: BTreeMap::new(),
837            }
838        } else {
839            QueryInfo::GetRecord {
840                key,
841                step: step.clone(),
842                found_a_record: false,
843                cache_candidates: BTreeMap::new(),
844            }
845        };
846        let peers = self.kbuckets.closest_keys(&target);
847        let id = self.queries.add_iter_closest(target.clone(), peers, info);
848
849        // No queries were actually done for the results yet.
850        let stats = QueryStats::empty();
851
852        if let Some(record) = record {
853            self.queued_events
854                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
855                    id,
856                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
857                    step,
858                    stats,
859                }));
860        }
861
862        id
863    }
864
865    /// Stores a record in the DHT, locally as well as at the nodes
866    /// closest to the key as per the xor distance metric.
867    ///
868    /// Returns `Ok` if a record has been stored locally, providing the
869    /// `QueryId` of the initial query that replicates the record in the DHT.
870    /// The result of the query is eventually reported as a
871    /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
872    ///
873    /// The record is always stored locally with the given expiration. If the record's
874    /// expiration is `None`, the common case, it does not expire in local storage
875    /// but is still replicated with the configured record TTL. To remove the record
876    /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
877    ///
878    /// After the initial publication of the record, it is subject to (re-)replication
879    /// and (re-)publication as per the configured intervals. Periodic (re-)publication
880    /// does not update the record's expiration in local storage, thus a given record
881    /// with an explicit expiration will always expire at that instant and until then
882    /// is subject to regular (re-)replication and (re-)publication.
883    pub fn put_record(
884        &mut self,
885        mut record: Record,
886        quorum: Quorum,
887    ) -> Result<QueryId, store::Error> {
888        record.publisher = Some(*self.kbuckets.local_key().preimage());
889        self.store.put(record.clone())?;
890        record.expires = record
891            .expires
892            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
893        let quorum = quorum.eval(self.queries.config().replication_factor);
894        let target = kbucket::Key::new(record.key.clone());
895        let peers = self.kbuckets.closest_keys(&target);
896        let context = PutRecordContext::Publish;
897        let info = QueryInfo::PutRecord {
898            context,
899            record,
900            quorum,
901            phase: PutRecordPhase::GetClosestPeers,
902        };
903        Ok(self.queries.add_iter_closest(target.clone(), peers, info))
904    }
905
906    /// Stores a record at specific peers, without storing it locally.
907    ///
908    /// The given [`Quorum`] is understood in the context of the total
909    /// number of distinct peers given.
910    ///
911    /// If the record's expiration is `None`, the configured record TTL is used.
912    ///
913    /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
914    /// > used to selectively update or store a record to specific peers
915    /// > for the purpose of e.g. making sure these peers have the latest
916    /// > "version" of a record or to "cache" a record at further peers
917    /// > to increase the lookup success rate on the DHT for other peers.
918    /// >
919    /// > In particular, there is no automatic storing of records performed, and this
920    /// > method must be used to ensure the standard Kademlia
921    /// > procedure of "caching" (i.e. storing) a found record at the closest
922    /// > node to the key that _did not_ return it.
923    pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
924    where
925        I: ExactSizeIterator<Item = PeerId>,
926    {
927        let quorum = if peers.len() > 0 {
928            quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
929        } else {
930            // If no peers are given, we just let the query fail immediately
931            // due to the fact that the quorum must be at least one, instead of
932            // introducing a new kind of error.
933            NonZeroUsize::new(1).expect("1 > 0")
934        };
935        record.expires = record
936            .expires
937            .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
938        let context = PutRecordContext::Custom;
939        let info = QueryInfo::PutRecord {
940            context,
941            record,
942            quorum,
943            phase: PutRecordPhase::PutRecord {
944                success: Vec::new(),
945                get_closest_peers_stats: QueryStats::empty(),
946            },
947        };
948        self.queries.add_fixed(peers, info)
949    }
950
951    /// Removes the record with the given key from _local_ storage,
952    /// if the local node is the publisher of the record.
953    ///
954    /// Has no effect if a record for the given key is stored locally but
955    /// the local node is not a publisher of the record.
956    ///
957    /// This is a _local_ operation. However, it also has the effect that
958    /// the record will no longer be periodically re-published, allowing the
959    /// record to eventually expire throughout the DHT.
960    pub fn remove_record(&mut self, key: &record::Key) {
961        if let Some(r) = self.store.get(key) {
962            if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
963                self.store.remove(key)
964            }
965        }
966    }
967
968    /// Gets a mutable reference to the record store.
969    pub fn store_mut(&mut self) -> &mut TStore {
970        &mut self.store
971    }
972
973    /// Bootstraps the local node to join the DHT.
974    ///
975    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
976    /// own ID in the DHT. This introduces the local node to the other nodes
977    /// in the DHT and populates its routing table with the closest neighbours.
978    ///
979    /// Subsequently, all buckets farther from the bucket of the closest neighbour are
980    /// refreshed by initiating an additional bootstrapping query for each such
981    /// bucket with random keys.
982    ///
983    /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
984    /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
985    /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
986    /// with one such event per bootstrapping query.
987    ///
988    /// Returns `Err` if bootstrapping is impossible due an empty routing table.
989    ///
990    /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
991    /// > See [`Behaviour::add_address`].
992    ///
993    /// > **Note**: Bootstrap does not require to be called manually. It is periodically
994    /// > invoked at regular intervals based on the configured `periodic_bootstrap_interval` (see
995    /// > [`Config::set_periodic_bootstrap_interval`] for details) and it is also automatically
996    /// > invoked
997    /// > when a new peer is inserted in the routing table.
998    /// > This parameter is used to call [`Behaviour::bootstrap`] periodically and automatically
999    /// > to ensure a healthy routing table.
1000    pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
1001        let local_key = *self.kbuckets.local_key();
1002        let info = QueryInfo::Bootstrap {
1003            peer: *local_key.preimage(),
1004            remaining: None,
1005            step: ProgressStep::first(),
1006        };
1007        let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
1008        if peers.is_empty() {
1009            self.bootstrap_status.reset_timers();
1010            Err(NoKnownPeers())
1011        } else {
1012            self.bootstrap_status.on_started();
1013            Ok(self.queries.add_iter_closest(local_key, peers, info))
1014        }
1015    }
1016
1017    /// Establishes the local node as a provider of a value for the given key.
1018    ///
1019    /// This operation publishes a provider record with the given key and
1020    /// identity of the local node to the peers closest to the key, thus establishing
1021    /// the local node as a provider.
1022    ///
1023    /// Returns `Ok` if a provider record has been stored locally, providing the
1024    /// `QueryId` of the initial query that announces the local node as a provider.
1025    ///
1026    /// The publication of the provider records is periodically repeated as per the
1027    /// configured interval, to renew the expiry and account for changes to the DHT
1028    /// topology. A provider record may be removed from local storage and
1029    /// thus no longer re-published by calling [`Behaviour::stop_providing`].
1030    ///
1031    /// In contrast to the standard Kademlia push-based model for content distribution
1032    /// implemented by [`Behaviour::put_record`], the provider API implements a
1033    /// pull-based model that may be used in addition or as an alternative.
1034    /// The means by which the actual value is obtained from a provider is out of scope
1035    /// of the libp2p Kademlia provider API.
1036    ///
1037    /// The results of the (repeated) provider announcements sent by this node are
1038    /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
1039    pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
1040        // Note: We store our own provider records locally without local addresses
1041        // to avoid redundant storage and outdated addresses. Instead these are
1042        // acquired on demand when returning a `ProviderRecord` for the local node.
1043        let local_addrs = Vec::new();
1044        let record = ProviderRecord::new(
1045            key.clone(),
1046            *self.kbuckets.local_key().preimage(),
1047            local_addrs,
1048        );
1049        self.store.add_provider(record)?;
1050        let target = kbucket::Key::new(key.clone());
1051        let peers = self.kbuckets.closest_keys(&target);
1052        let context = AddProviderContext::Publish;
1053        let info = QueryInfo::AddProvider {
1054            context,
1055            key,
1056            phase: AddProviderPhase::GetClosestPeers,
1057        };
1058        let id = self.queries.add_iter_closest(target.clone(), peers, info);
1059        Ok(id)
1060    }
1061
1062    /// Stops the local node from announcing that it is a provider for the given key.
1063    ///
1064    /// This is a local operation. The local node will still be considered as a
1065    /// provider for the key by other nodes until these provider records expire.
1066    pub fn stop_providing(&mut self, key: &record::Key) {
1067        self.store
1068            .remove_provider(key, self.kbuckets.local_key().preimage());
1069    }
1070
1071    /// Performs a lookup for providers of a value to the given key.
1072    ///
1073    /// The result of this operation is delivered in a
1074    /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
1075    pub fn get_providers(&mut self, key: record::Key) -> QueryId {
1076        let providers: HashSet<_> = self
1077            .store
1078            .providers(&key)
1079            .into_iter()
1080            .filter(|p| !p.is_expired(Instant::now()))
1081            .map(|p| p.provider)
1082            .collect();
1083
1084        let step = ProgressStep::first();
1085
1086        let info = QueryInfo::GetProviders {
1087            key: key.clone(),
1088            providers_found: providers.len(),
1089            step: if providers.is_empty() {
1090                step.clone()
1091            } else {
1092                step.next()
1093            },
1094        };
1095
1096        let target = kbucket::Key::new(key.clone());
1097        let peers = self.kbuckets.closest_keys(&target);
1098        let id = self.queries.add_iter_closest(target.clone(), peers, info);
1099
1100        // No queries were actually done for the results yet.
1101        let stats = QueryStats::empty();
1102
1103        if !providers.is_empty() {
1104            self.queued_events
1105                .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
1106                    id,
1107                    result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
1108                        key,
1109                        providers,
1110                    })),
1111                    step,
1112                    stats,
1113                }));
1114        }
1115        id
1116    }
1117
1118    /// Set the [`Mode`] in which we should operate.
1119    ///
1120    /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we
1121    /// have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
1122    ///
1123    /// Setting a mode via this function disables this automatic behaviour and unconditionally
1124    /// operates in the specified mode. To reactivate the automatic configuration, pass [`None`]
1125    /// instead.
1126    pub fn set_mode(&mut self, mode: Option<Mode>) {
1127        match mode {
1128            Some(mode) => {
1129                self.mode = mode;
1130                self.auto_mode = false;
1131                self.reconfigure_mode();
1132            }
1133            None => {
1134                self.auto_mode = true;
1135                self.determine_mode_from_external_addresses();
1136            }
1137        }
1138
1139        if let Some(waker) = self.no_events_waker.take() {
1140            waker.wake();
1141        }
1142    }
1143
1144    /// Get the [`Mode`] in which the DHT is currently operating.
1145    pub fn mode(&self) -> Mode {
1146        self.mode
1147    }
1148
1149    fn reconfigure_mode(&mut self) {
1150        if self.connections.is_empty() {
1151            return;
1152        }
1153
1154        let num_connections = self.connections.len();
1155
1156        tracing::debug!(
1157            "Re-configuring {} established connection{}",
1158            num_connections,
1159            if num_connections > 1 { "s" } else { "" }
1160        );
1161
1162        self.queued_events
1163            .extend(
1164                self.connections
1165                    .iter()
1166                    .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1167                        peer_id: *peer_id,
1168                        handler: NotifyHandler::One(*conn_id),
1169                        event: HandlerIn::ReconfigureMode {
1170                            new_mode: self.mode,
1171                        },
1172                    }),
1173            );
1174    }
1175
1176    fn determine_mode_from_external_addresses(&mut self) {
1177        let old_mode = self.mode;
1178
1179        self.mode = match (self.external_addresses.as_slice(), self.mode) {
1180            ([], Mode::Server) => {
1181                tracing::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1182
1183                Mode::Client
1184            }
1185            ([], Mode::Client) => {
1186                // Previously client-mode, now also client-mode because no external addresses.
1187
1188                Mode::Client
1189            }
1190            (confirmed_external_addresses, Mode::Client) => {
1191                if tracing::enabled!(Level::DEBUG) {
1192                    let confirmed_external_addresses =
1193                        to_comma_separated_list(confirmed_external_addresses);
1194
1195                    tracing::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1196                }
1197
1198                Mode::Server
1199            }
1200            (confirmed_external_addresses, Mode::Server) => {
1201                debug_assert!(
1202                    !confirmed_external_addresses.is_empty(),
1203                    "Previous match arm handled empty list"
1204                );
1205
1206                // Previously, server-mode, now also server-mode because > 1 external address.
1207                //  Don't log anything to avoid spam.
1208                Mode::Server
1209            }
1210        };
1211
1212        self.reconfigure_mode();
1213
1214        if old_mode != self.mode {
1215            self.queued_events
1216                .push_back(ToSwarm::GenerateEvent(Event::ModeChanged {
1217                    new_mode: self.mode,
1218                }));
1219        }
1220    }
1221
1222    /// Processes discovered peers from a successful request in an iterative `Query`.
1223    fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1224    where
1225        I: Iterator<Item = &'a KadPeer> + Clone,
1226    {
1227        let local_id = self.kbuckets.local_key().preimage();
1228        let others_iter = peers.filter(|p| &p.node_id != local_id);
1229        if let Some(query) = self.queries.get_mut(query_id) {
1230            tracing::trace!(peer=%source, query=?query_id, "Request to peer in query succeeded");
1231            for peer in others_iter.clone() {
1232                tracing::trace!(
1233                    ?peer,
1234                    %source,
1235                    query=?query_id,
1236                    "Peer reported by source in query"
1237                );
1238                let addrs = peer.multiaddrs.iter().cloned().collect();
1239                query.peers.addresses.insert(peer.node_id, addrs);
1240            }
1241            query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1242        }
1243    }
1244
1245    /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1246    fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
1247        let kbuckets = &mut self.kbuckets;
1248        let connected = &mut self.connected_peers;
1249        let listen_addresses = &self.listen_addresses;
1250        let external_addresses = &self.external_addresses;
1251
1252        self.store
1253            .providers(key)
1254            .into_iter()
1255            .filter_map(move |p| {
1256                if &p.provider != source {
1257                    let node_id = p.provider;
1258                    let multiaddrs = p.addresses;
1259                    let connection_ty = if connected.contains(&node_id) {
1260                        ConnectionType::Connected
1261                    } else {
1262                        ConnectionType::NotConnected
1263                    };
1264                    if multiaddrs.is_empty() {
1265                        // The provider is either the local node and we fill in
1266                        // the local addresses on demand, or it is a legacy
1267                        // provider record without addresses, in which case we
1268                        // try to find addresses in the routing table, as was
1269                        // done before provider records were stored along with
1270                        // their addresses.
1271                        if &node_id == kbuckets.local_key().preimage() {
1272                            Some(
1273                                listen_addresses
1274                                    .iter()
1275                                    .chain(external_addresses.iter())
1276                                    .cloned()
1277                                    .collect::<Vec<_>>(),
1278                            )
1279                        } else {
1280                            let key = kbucket::Key::from(node_id);
1281                            kbuckets
1282                                .entry(&key)
1283                                .as_mut()
1284                                .and_then(|e| e.view())
1285                                .map(|e| e.node.value.clone().into_vec())
1286                        }
1287                    } else {
1288                        Some(multiaddrs)
1289                    }
1290                    .map(|multiaddrs| KadPeer {
1291                        node_id,
1292                        multiaddrs,
1293                        connection_ty,
1294                    })
1295                } else {
1296                    None
1297                }
1298            })
1299            .take(self.queries.config().replication_factor.get())
1300            .collect()
1301    }
1302
1303    /// Starts an iterative `ADD_PROVIDER` query for the given key.
1304    fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
1305        let info = QueryInfo::AddProvider {
1306            context,
1307            key: key.clone(),
1308            phase: AddProviderPhase::GetClosestPeers,
1309        };
1310        let target = kbucket::Key::new(key);
1311        let peers = self.kbuckets.closest_keys(&target);
1312        self.queries.add_iter_closest(target.clone(), peers, info);
1313    }
1314
1315    /// Starts an iterative `PUT_VALUE` query for the given record.
1316    fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1317        let quorum = quorum.eval(self.queries.config().replication_factor);
1318        let target = kbucket::Key::new(record.key.clone());
1319        let peers = self.kbuckets.closest_keys(&target);
1320        let info = QueryInfo::PutRecord {
1321            record,
1322            quorum,
1323            context,
1324            phase: PutRecordPhase::GetClosestPeers,
1325        };
1326        self.queries.add_iter_closest(target.clone(), peers, info);
1327    }
1328
1329    /// Updates the routing table with a new connection status and address of a peer.
1330    fn connection_updated(
1331        &mut self,
1332        peer: PeerId,
1333        address: Option<Multiaddr>,
1334        new_status: NodeStatus,
1335    ) {
1336        let key = kbucket::Key::from(peer);
1337        match self.kbuckets.entry(&key) {
1338            Some(kbucket::Entry::Present(mut entry, old_status)) => {
1339                if old_status != new_status {
1340                    entry.update(new_status)
1341                }
1342                if let Some(address) = address {
1343                    if entry.value().insert(address) {
1344                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1345                            Event::RoutingUpdated {
1346                                peer,
1347                                is_new_peer: false,
1348                                addresses: entry.value().clone(),
1349                                old_peer: None,
1350                                bucket_range: self
1351                                    .kbuckets
1352                                    .bucket(&key)
1353                                    .map(|b| b.range())
1354                                    .expect("Not kbucket::Entry::SelfEntry."),
1355                            },
1356                        ))
1357                    }
1358                }
1359            }
1360
1361            Some(kbucket::Entry::Pending(mut entry, old_status)) => {
1362                if let Some(address) = address {
1363                    entry.value().insert(address);
1364                }
1365                if old_status != new_status {
1366                    entry.update(new_status);
1367                }
1368            }
1369
1370            Some(kbucket::Entry::Absent(entry)) => {
1371                // Only connected nodes with a known address are newly inserted.
1372                if new_status != NodeStatus::Connected {
1373                    return;
1374                }
1375                match (address, self.kbucket_inserts) {
1376                    (None, _) => {
1377                        self.queued_events
1378                            .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1379                    }
1380                    (Some(a), BucketInserts::Manual) => {
1381                        self.queued_events
1382                            .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1383                                peer,
1384                                address: a,
1385                            }));
1386                    }
1387                    (Some(a), BucketInserts::OnConnected) => {
1388                        let addresses = Addresses::new(a);
1389                        match entry.insert(addresses.clone(), new_status) {
1390                            kbucket::InsertResult::Inserted => {
1391                                self.bootstrap_on_low_peers();
1392
1393                                let event = Event::RoutingUpdated {
1394                                    peer,
1395                                    is_new_peer: true,
1396                                    addresses,
1397                                    old_peer: None,
1398                                    bucket_range: self
1399                                        .kbuckets
1400                                        .bucket(&key)
1401                                        .map(|b| b.range())
1402                                        .expect("Not kbucket::Entry::SelfEntry."),
1403                                };
1404                                self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1405                            }
1406                            kbucket::InsertResult::Full => {
1407                                tracing::debug!(
1408                                    %peer,
1409                                    "Bucket full. Peer not added to routing table"
1410                                );
1411                                let address = addresses.first().clone();
1412                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1413                                    Event::RoutablePeer { peer, address },
1414                                ));
1415                            }
1416                            kbucket::InsertResult::Pending { disconnected } => {
1417                                let address = addresses.first().clone();
1418                                self.queued_events.push_back(ToSwarm::GenerateEvent(
1419                                    Event::PendingRoutablePeer { peer, address },
1420                                ));
1421
1422                                // `disconnected` might already be in the process of re-connecting.
1423                                // In other words `disconnected` might have already re-connected but
1424                                // is not yet confirmed to support the Kademlia protocol via
1425                                // [`HandlerEvent::ProtocolConfirmed`].
1426                                //
1427                                // Only try dialing peer if not currently connected.
1428                                if !self.connected_peers.contains(disconnected.preimage()) {
1429                                    self.queued_events.push_back(ToSwarm::Dial {
1430                                        opts: DialOpts::peer_id(disconnected.into_preimage())
1431                                            .build(),
1432                                    })
1433                                }
1434                            }
1435                        }
1436                    }
1437                }
1438            }
1439            _ => {}
1440        }
1441    }
1442
1443    /// A new peer has been inserted in the routing table but we check if the routing
1444    /// table is currently small (less that `K_VALUE` peers are present) and only
1445    /// trigger a bootstrap in that case
1446    fn bootstrap_on_low_peers(&mut self) {
1447        if self
1448            .kbuckets()
1449            .map(|kbucket| kbucket.num_entries())
1450            .sum::<usize>()
1451            < K_VALUE.get()
1452        {
1453            self.bootstrap_status.trigger();
1454        }
1455    }
1456
1457    /// Handles a finished (i.e. successful) query.
1458    fn query_finished(&mut self, q: Query) -> Option<Event> {
1459        let query_id = q.id();
1460        tracing::trace!(query=?query_id, "Query finished");
1461        match q.info {
1462            QueryInfo::Bootstrap {
1463                peer,
1464                remaining,
1465                mut step,
1466            } => {
1467                let local_key = *self.kbuckets.local_key();
1468                let mut remaining = remaining.unwrap_or_else(|| {
1469                    debug_assert_eq!(&peer, local_key.preimage());
1470                    // The lookup for the local key finished. To complete the bootstrap process,
1471                    // a bucket refresh should be performed for every bucket farther away than
1472                    // the first non-empty bucket (which are most likely no more than the last
1473                    // few, i.e. farthest, buckets).
1474                    self.kbuckets
1475                        .iter()
1476                        .skip_while(|b| b.is_empty())
1477                        .skip(1) // Skip the bucket with the closest neighbour.
1478                        .map(|b| {
1479                            // Try to find a key that falls into the bucket. While such keys can
1480                            // be generated fully deterministically, the current libp2p kademlia
1481                            // wire protocol requires transmission of the preimages of the actual
1482                            // keys in the DHT keyspace, hence for now this is just a "best effort"
1483                            // to find a key that hashes into a specific bucket. The probabilities
1484                            // of finding a key in the bucket `b` with as most 16 trials are as
1485                            // follows:
1486                            //
1487                            // Pr(bucket-255) = 1 - (1/2)^16   ~= 1
1488                            // Pr(bucket-254) = 1 - (3/4)^16   ~= 1
1489                            // Pr(bucket-253) = 1 - (7/8)^16   ~= 0.88
1490                            // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1491                            // ...
1492                            let mut target = kbucket::Key::from(PeerId::random());
1493                            for _ in 0..16 {
1494                                let d = local_key.distance(&target);
1495                                if b.contains(&d) {
1496                                    break;
1497                                }
1498                                target = kbucket::Key::from(PeerId::random());
1499                            }
1500                            target
1501                        })
1502                        .collect::<Vec<_>>()
1503                        .into_iter()
1504                });
1505
1506                let num_remaining = remaining.len() as u32;
1507
1508                if let Some(target) = remaining.next() {
1509                    let info = QueryInfo::Bootstrap {
1510                        peer: *target.preimage(),
1511                        remaining: Some(remaining),
1512                        step: step.next(),
1513                    };
1514                    let peers = self.kbuckets.closest_keys(&target);
1515                    self.queries
1516                        .continue_iter_closest(query_id, target, peers, info);
1517                } else {
1518                    step.last = true;
1519                    self.bootstrap_status.on_finish();
1520                };
1521
1522                Some(Event::OutboundQueryProgressed {
1523                    id: query_id,
1524                    stats: q.stats,
1525                    result: QueryResult::Bootstrap(Ok(BootstrapOk {
1526                        peer,
1527                        num_remaining,
1528                    })),
1529                    step,
1530                })
1531            }
1532
1533            QueryInfo::GetClosestPeers { key, mut step, .. } => {
1534                step.last = true;
1535
1536                Some(Event::OutboundQueryProgressed {
1537                    id: query_id,
1538                    stats: q.stats,
1539                    result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1540                        key,
1541                        peers: q.peers.into_peerinfos_iter().collect(),
1542                    })),
1543                    step,
1544                })
1545            }
1546
1547            QueryInfo::GetProviders { mut step, .. } => {
1548                step.last = true;
1549
1550                Some(Event::OutboundQueryProgressed {
1551                    id: query_id,
1552                    stats: q.stats,
1553                    result: QueryResult::GetProviders(Ok(
1554                        GetProvidersOk::FinishedWithNoAdditionalRecord {
1555                            closest_peers: q.peers.into_peerids_iter().collect(),
1556                        },
1557                    )),
1558                    step,
1559                })
1560            }
1561
1562            QueryInfo::AddProvider {
1563                context,
1564                key,
1565                phase: AddProviderPhase::GetClosestPeers,
1566            } => {
1567                let provider_id = self.local_peer_id;
1568                let external_addresses = self.external_addresses.iter().cloned().collect();
1569                let info = QueryInfo::AddProvider {
1570                    context,
1571                    key,
1572                    phase: AddProviderPhase::AddProvider {
1573                        provider_id,
1574                        external_addresses,
1575                        get_closest_peers_stats: q.stats,
1576                    },
1577                };
1578                self.queries
1579                    .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1580                None
1581            }
1582
1583            QueryInfo::AddProvider {
1584                context,
1585                key,
1586                phase:
1587                    AddProviderPhase::AddProvider {
1588                        get_closest_peers_stats,
1589                        ..
1590                    },
1591            } => match context {
1592                AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1593                    id: query_id,
1594                    stats: get_closest_peers_stats.merge(q.stats),
1595                    result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1596                    step: ProgressStep::first_and_last(),
1597                }),
1598                AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1599                    id: query_id,
1600                    stats: get_closest_peers_stats.merge(q.stats),
1601                    result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1602                    step: ProgressStep::first_and_last(),
1603                }),
1604            },
1605
1606            QueryInfo::GetRecord {
1607                key,
1608                mut step,
1609                found_a_record,
1610                cache_candidates,
1611            } => {
1612                step.last = true;
1613
1614                let results = if found_a_record {
1615                    Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1616                } else {
1617                    Err(GetRecordError::NotFound {
1618                        key,
1619                        closest_peers: q.peers.into_peerids_iter().collect(),
1620                    })
1621                };
1622                Some(Event::OutboundQueryProgressed {
1623                    id: query_id,
1624                    stats: q.stats,
1625                    result: QueryResult::GetRecord(results),
1626                    step,
1627                })
1628            }
1629
1630            QueryInfo::PutRecord {
1631                context,
1632                record,
1633                quorum,
1634                phase: PutRecordPhase::GetClosestPeers,
1635            } => {
1636                let info = QueryInfo::PutRecord {
1637                    context,
1638                    record,
1639                    quorum,
1640                    phase: PutRecordPhase::PutRecord {
1641                        success: vec![],
1642                        get_closest_peers_stats: q.stats,
1643                    },
1644                };
1645                self.queries
1646                    .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1647                None
1648            }
1649
1650            QueryInfo::PutRecord {
1651                context,
1652                record,
1653                quorum,
1654                phase:
1655                    PutRecordPhase::PutRecord {
1656                        success,
1657                        get_closest_peers_stats,
1658                    },
1659            } => {
1660                let mk_result = |key: record::Key| {
1661                    if success.len() >= quorum.get() {
1662                        Ok(PutRecordOk { key })
1663                    } else {
1664                        Err(PutRecordError::QuorumFailed {
1665                            key,
1666                            quorum,
1667                            success,
1668                        })
1669                    }
1670                };
1671                match context {
1672                    PutRecordContext::Publish | PutRecordContext::Custom => {
1673                        Some(Event::OutboundQueryProgressed {
1674                            id: query_id,
1675                            stats: get_closest_peers_stats.merge(q.stats),
1676                            result: QueryResult::PutRecord(mk_result(record.key)),
1677                            step: ProgressStep::first_and_last(),
1678                        })
1679                    }
1680                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1681                        id: query_id,
1682                        stats: get_closest_peers_stats.merge(q.stats),
1683                        result: QueryResult::RepublishRecord(mk_result(record.key)),
1684                        step: ProgressStep::first_and_last(),
1685                    }),
1686                    PutRecordContext::Replicate => {
1687                        tracing::debug!(record=?record.key, "Record replicated");
1688                        None
1689                    }
1690                }
1691            }
1692        }
1693    }
1694
1695    /// Handles a query that timed out.
1696    fn query_timeout(&mut self, query: Query) -> Option<Event> {
1697        let query_id = query.id();
1698        tracing::trace!(query=?query_id, "Query timed out");
1699        match query.info {
1700            QueryInfo::Bootstrap {
1701                peer,
1702                mut remaining,
1703                mut step,
1704            } => {
1705                let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1706
1707                // Continue with the next bootstrap query if `remaining` is not empty.
1708                if let Some((target, remaining)) =
1709                    remaining.take().and_then(|mut r| Some((r.next()?, r)))
1710                {
1711                    let info = QueryInfo::Bootstrap {
1712                        peer: target.into_preimage(),
1713                        remaining: Some(remaining),
1714                        step: step.next(),
1715                    };
1716                    let peers = self.kbuckets.closest_keys(&target);
1717                    self.queries
1718                        .continue_iter_closest(query_id, target, peers, info);
1719                } else {
1720                    step.last = true;
1721                    self.bootstrap_status.on_finish();
1722                }
1723
1724                Some(Event::OutboundQueryProgressed {
1725                    id: query_id,
1726                    stats: query.stats,
1727                    result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1728                        peer,
1729                        num_remaining,
1730                    })),
1731                    step,
1732                })
1733            }
1734
1735            QueryInfo::AddProvider { context, key, .. } => Some(match context {
1736                AddProviderContext::Publish => Event::OutboundQueryProgressed {
1737                    id: query_id,
1738                    stats: query.stats,
1739                    result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1740                    step: ProgressStep::first_and_last(),
1741                },
1742                AddProviderContext::Republish => Event::OutboundQueryProgressed {
1743                    id: query_id,
1744                    stats: query.stats,
1745                    result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1746                    step: ProgressStep::first_and_last(),
1747                },
1748            }),
1749
1750            QueryInfo::GetClosestPeers { key, mut step, .. } => {
1751                step.last = true;
1752                Some(Event::OutboundQueryProgressed {
1753                    id: query_id,
1754                    stats: query.stats,
1755                    result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1756                        key,
1757                        peers: query.peers.into_peerinfos_iter().collect(),
1758                    })),
1759                    step,
1760                })
1761            }
1762
1763            QueryInfo::PutRecord {
1764                record,
1765                quorum,
1766                context,
1767                phase,
1768            } => {
1769                let err = Err(PutRecordError::Timeout {
1770                    key: record.key,
1771                    quorum,
1772                    success: match phase {
1773                        PutRecordPhase::GetClosestPeers => vec![],
1774                        PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1775                    },
1776                });
1777                match context {
1778                    PutRecordContext::Publish | PutRecordContext::Custom => {
1779                        Some(Event::OutboundQueryProgressed {
1780                            id: query_id,
1781                            stats: query.stats,
1782                            result: QueryResult::PutRecord(err),
1783                            step: ProgressStep::first_and_last(),
1784                        })
1785                    }
1786                    PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1787                        id: query_id,
1788                        stats: query.stats,
1789                        result: QueryResult::RepublishRecord(err),
1790                        step: ProgressStep::first_and_last(),
1791                    }),
1792                    PutRecordContext::Replicate => match phase {
1793                        PutRecordPhase::GetClosestPeers => {
1794                            tracing::warn!(
1795                                "Locating closest peers for replication failed: {:?}",
1796                                err
1797                            );
1798                            None
1799                        }
1800                        PutRecordPhase::PutRecord { .. } => {
1801                            tracing::debug!("Replicating record failed: {:?}", err);
1802                            None
1803                        }
1804                    },
1805                }
1806            }
1807
1808            QueryInfo::GetRecord { key, mut step, .. } => {
1809                step.last = true;
1810
1811                Some(Event::OutboundQueryProgressed {
1812                    id: query_id,
1813                    stats: query.stats,
1814                    result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1815                    step,
1816                })
1817            }
1818
1819            QueryInfo::GetProviders { key, mut step, .. } => {
1820                step.last = true;
1821
1822                Some(Event::OutboundQueryProgressed {
1823                    id: query_id,
1824                    stats: query.stats,
1825                    result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1826                        key,
1827                        closest_peers: query.peers.into_peerids_iter().collect(),
1828                    })),
1829                    step,
1830                })
1831            }
1832        }
1833    }
1834
1835    /// Processes a record received from a peer.
1836    fn record_received(
1837        &mut self,
1838        source: PeerId,
1839        connection: ConnectionId,
1840        request_id: RequestId,
1841        mut record: Record,
1842    ) {
1843        if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1844            // If the (alleged) publisher is the local node, do nothing. The record of
1845            // the original publisher should never change as a result of replication
1846            // and the publisher is always assumed to have the "right" value.
1847            self.queued_events.push_back(ToSwarm::NotifyHandler {
1848                peer_id: source,
1849                handler: NotifyHandler::One(connection),
1850                event: HandlerIn::PutRecordRes {
1851                    key: record.key,
1852                    value: record.value,
1853                    request_id,
1854                },
1855            });
1856            return;
1857        }
1858
1859        let now = Instant::now();
1860
1861        // Calculate the expiration exponentially inversely proportional to the
1862        // number of nodes between the local node and the closest node to the key
1863        // (beyond the replication factor). This ensures avoiding over-caching
1864        // outside of the k closest nodes to a key.
1865        let target = kbucket::Key::new(record.key.clone());
1866        let num_between = self.kbuckets.count_nodes_between(&target);
1867        let k = self.queries.config().replication_factor.get();
1868        let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1869        let expiration = self
1870            .record_ttl
1871            .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1872        // The smaller TTL prevails. Only if neither TTL is set is the record
1873        // stored "forever".
1874        record.expires = record.expires.or(expiration).min(expiration);
1875
1876        if let Some(job) = self.put_record_job.as_mut() {
1877            // Ignore the record in the next run of the replication
1878            // job, since we can assume the sender replicated the
1879            // record to the k closest peers. Effectively, only
1880            // one of the k closest peers performs a replication
1881            // in the configured interval, assuming a shared interval.
1882            job.skip(record.key.clone())
1883        }
1884
1885        // While records received from a publisher, as well as records that do
1886        // not exist locally should always (attempted to) be stored, there is a
1887        // choice here w.r.t. the handling of replicated records whose keys refer
1888        // to records that exist locally: The value and / or the publisher may
1889        // either be overridden or left unchanged. At the moment and in the
1890        // absence of a decisive argument for another option, both are always
1891        // overridden as it avoids having to load the existing record in the
1892        // first place.
1893
1894        if !record.is_expired(now) {
1895            // The record is cloned because of the weird libp2p protocol
1896            // requirement to send back the value in the response, although this
1897            // is a waste of resources.
1898            match self.record_filtering {
1899                StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1900                    Ok(()) => {
1901                        tracing::debug!(
1902                            record=?record.key,
1903                            "Record stored: {} bytes",
1904                            record.value.len()
1905                        );
1906                        self.queued_events.push_back(ToSwarm::GenerateEvent(
1907                            Event::InboundRequest {
1908                                request: InboundRequest::PutRecord {
1909                                    source,
1910                                    connection,
1911                                    record: None,
1912                                },
1913                            },
1914                        ));
1915                    }
1916                    Err(e) => {
1917                        tracing::info!("Record not stored: {:?}", e);
1918                        self.queued_events.push_back(ToSwarm::NotifyHandler {
1919                            peer_id: source,
1920                            handler: NotifyHandler::One(connection),
1921                            event: HandlerIn::Reset(request_id),
1922                        });
1923
1924                        return;
1925                    }
1926                },
1927                StoreInserts::FilterBoth => {
1928                    self.queued_events
1929                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1930                            request: InboundRequest::PutRecord {
1931                                source,
1932                                connection,
1933                                record: Some(record.clone()),
1934                            },
1935                        }));
1936                }
1937            }
1938        }
1939
1940        // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1941        // case where the record is discarded due to being expired. Given that
1942        // the remote sent the local node a [`HandlerEvent::PutRecord`]
1943        // request, the remote perceives the local node as one node among the k
1944        // closest nodes to the target. In addition returning
1945        // [`HandlerIn::PutRecordRes`] does not reveal any internal
1946        // information to a possibly malicious remote node.
1947        self.queued_events.push_back(ToSwarm::NotifyHandler {
1948            peer_id: source,
1949            handler: NotifyHandler::One(connection),
1950            event: HandlerIn::PutRecordRes {
1951                key: record.key,
1952                value: record.value,
1953                request_id,
1954            },
1955        })
1956    }
1957
1958    /// Processes a provider record received from a peer.
1959    fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1960        if &provider.node_id != self.kbuckets.local_key().preimage() {
1961            let record = ProviderRecord {
1962                key,
1963                provider: provider.node_id,
1964                expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1965                addresses: provider.multiaddrs,
1966            };
1967            match self.record_filtering {
1968                StoreInserts::Unfiltered => {
1969                    if let Err(e) = self.store.add_provider(record) {
1970                        tracing::info!("Provider record not stored: {:?}", e);
1971                        return;
1972                    }
1973
1974                    self.queued_events
1975                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1976                            request: InboundRequest::AddProvider { record: None },
1977                        }));
1978                }
1979                StoreInserts::FilterBoth => {
1980                    self.queued_events
1981                        .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1982                            request: InboundRequest::AddProvider {
1983                                record: Some(record),
1984                            },
1985                        }));
1986                }
1987            }
1988        }
1989    }
1990
1991    fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1992        let key = kbucket::Key::from(peer_id);
1993
1994        if let Some(addrs) = self.kbuckets.entry(&key).as_mut().and_then(|e| e.value()) {
1995            // TODO: Ideally, the address should only be removed if the error can
1996            // be classified as "permanent" but since `err` is currently a borrowed
1997            // trait object without a `'static` bound, even downcasting for inspection
1998            // of the error is not possible (and also not truly desirable or ergonomic).
1999            // The error passed in should rather be a dedicated enum.
2000            if addrs.remove(address).is_ok() {
2001                tracing::debug!(
2002                    peer=%peer_id,
2003                    %address,
2004                    "Address removed from peer due to error."
2005                );
2006            } else {
2007                // Despite apparently having no reachable address (any longer),
2008                // the peer is kept in the routing table with the last address to avoid
2009                // (temporary) loss of network connectivity to "flush" the routing
2010                // table. Once in, a peer is only removed from the routing table
2011                // if it is the least recently connected peer, currently disconnected
2012                // and is unreachable in the context of another peer pending insertion
2013                // into the same bucket. This is handled transparently by the
2014                // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
2015                // within `Behaviour::poll`.
2016                tracing::debug!(
2017                    peer=%peer_id,
2018                    %address,
2019                    "Last remaining address of peer is unreachable."
2020                );
2021            }
2022        }
2023
2024        for query in self.queries.iter_mut() {
2025            if let Some(addrs) = query.peers.addresses.get_mut(&peer_id) {
2026                addrs.retain(|a| a != address);
2027            }
2028        }
2029    }
2030
2031    fn on_connection_established(
2032        &mut self,
2033        ConnectionEstablished {
2034            peer_id,
2035            failed_addresses,
2036            other_established,
2037            ..
2038        }: ConnectionEstablished,
2039    ) {
2040        for addr in failed_addresses {
2041            self.address_failed(peer_id, addr);
2042        }
2043
2044        // Peer's first connection.
2045        if other_established == 0 {
2046            self.connected_peers.insert(peer_id);
2047        }
2048    }
2049
2050    fn on_address_change(
2051        &mut self,
2052        AddressChange {
2053            peer_id: peer,
2054            old,
2055            new,
2056            ..
2057        }: AddressChange,
2058    ) {
2059        let (old, new) = (old.get_remote_address(), new.get_remote_address());
2060
2061        // Update routing table.
2062        if let Some(addrs) = self
2063            .kbuckets
2064            .entry(&kbucket::Key::from(peer))
2065            .as_mut()
2066            .and_then(|e| e.value())
2067        {
2068            if addrs.replace(old, new) {
2069                tracing::debug!(
2070                    %peer,
2071                    old_address=%old,
2072                    new_address=%new,
2073                    "Old address replaced with new address for peer."
2074                );
2075            } else {
2076                tracing::debug!(
2077                    %peer,
2078                    old_address=%old,
2079                    new_address=%new,
2080                    "Old address not replaced with new address for peer as old address wasn't present.",
2081                );
2082            }
2083        } else {
2084            tracing::debug!(
2085                %peer,
2086                old_address=%old,
2087                new_address=%new,
2088                "Old address not replaced with new address for peer as peer is not present in the \
2089                 routing table."
2090            );
2091        }
2092
2093        // Update query address cache.
2094        //
2095        // Given two connected nodes: local node A and remote node B. Say node B
2096        // is not in node A's routing table. Additionally node B is part of the
2097        // `Query::addresses` list of an ongoing query on node A. Say Node
2098        // B triggers an address change and then disconnects. Later on the
2099        // earlier mentioned query on node A would like to connect to node B.
2100        // Without replacing the address in the `Query::addresses` set node
2101        // A would attempt to dial the old and not the new address.
2102        //
2103        // While upholding correctness, iterating through all discovered
2104        // addresses of a peer in all currently ongoing queries might have a
2105        // large performance impact. If so, the code below might be worth
2106        // revisiting.
2107        for query in self.queries.iter_mut() {
2108            if let Some(addrs) = query.peers.addresses.get_mut(&peer) {
2109                for addr in addrs.iter_mut() {
2110                    if addr == old {
2111                        *addr = new.clone();
2112                    }
2113                }
2114            }
2115        }
2116    }
2117
2118    fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
2119        let Some(peer_id) = peer_id else { return };
2120
2121        match error {
2122            DialError::LocalPeerId { .. }
2123            | DialError::WrongPeerId { .. }
2124            | DialError::Aborted
2125            | DialError::Denied { .. }
2126            | DialError::Transport(_)
2127            | DialError::NoAddresses => {
2128                if let DialError::Transport(addresses) = error {
2129                    for (addr, _) in addresses {
2130                        self.address_failed(peer_id, addr)
2131                    }
2132                }
2133
2134                for query in self.queries.iter_mut() {
2135                    query.on_failure(&peer_id);
2136                }
2137            }
2138            DialError::DialPeerConditionFalse(
2139                dial_opts::PeerCondition::Disconnected
2140                | dial_opts::PeerCondition::NotDialing
2141                | dial_opts::PeerCondition::DisconnectedAndNotDialing,
2142            ) => {
2143                // We might (still) be connected, or about to be connected, thus do not report the
2144                // failure to the queries.
2145            }
2146            DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2147                unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2148            }
2149        }
2150    }
2151
2152    fn on_connection_closed(
2153        &mut self,
2154        ConnectionClosed {
2155            peer_id,
2156            remaining_established,
2157            connection_id,
2158            ..
2159        }: ConnectionClosed,
2160    ) {
2161        self.connections.remove(&connection_id);
2162
2163        if remaining_established == 0 {
2164            for query in self.queries.iter_mut() {
2165                query.on_failure(&peer_id);
2166            }
2167            self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2168            self.connected_peers.remove(&peer_id);
2169        }
2170    }
2171
2172    /// Preloads a new [`Handler`] with requests that are waiting
2173    /// to be sent to the newly connected peer.
2174    fn preload_new_handler(
2175        &mut self,
2176        handler: &mut Handler,
2177        connection_id: ConnectionId,
2178        peer: PeerId,
2179    ) {
2180        self.connections.insert(connection_id, peer);
2181        // Queue events for sending pending RPCs to the connected peer.
2182        // There can be only one pending RPC for a particular peer and query per definition.
2183        for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2184            q.pending_rpcs
2185                .iter()
2186                .position(|(p, _)| p == &peer)
2187                .map(|p| q.pending_rpcs.remove(p))
2188        }) {
2189            handler.on_behaviour_event(event)
2190        }
2191    }
2192}
2193
2194/// Exponentially decrease the given duration (base 2).
2195fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2196    Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2197}
2198
2199impl<TStore> NetworkBehaviour for Behaviour<TStore>
2200where
2201    TStore: RecordStore + Send + 'static,
2202{
2203    type ConnectionHandler = Handler;
2204    type ToSwarm = Event;
2205
2206    fn handle_established_inbound_connection(
2207        &mut self,
2208        connection_id: ConnectionId,
2209        peer: PeerId,
2210        local_addr: &Multiaddr,
2211        remote_addr: &Multiaddr,
2212    ) -> Result<THandler<Self>, ConnectionDenied> {
2213        let connected_point = ConnectedPoint::Listener {
2214            local_addr: local_addr.clone(),
2215            send_back_addr: remote_addr.clone(),
2216        };
2217
2218        let mut handler = Handler::new(
2219            self.protocol_config.clone(),
2220            connected_point,
2221            peer,
2222            self.mode,
2223        );
2224        self.preload_new_handler(&mut handler, connection_id, peer);
2225
2226        Ok(handler)
2227    }
2228
2229    fn handle_established_outbound_connection(
2230        &mut self,
2231        connection_id: ConnectionId,
2232        peer: PeerId,
2233        addr: &Multiaddr,
2234        role_override: Endpoint,
2235        port_use: PortUse,
2236    ) -> Result<THandler<Self>, ConnectionDenied> {
2237        let connected_point = ConnectedPoint::Dialer {
2238            address: addr.clone(),
2239            role_override,
2240            port_use,
2241        };
2242
2243        let mut handler = Handler::new(
2244            self.protocol_config.clone(),
2245            connected_point,
2246            peer,
2247            self.mode,
2248        );
2249        self.preload_new_handler(&mut handler, connection_id, peer);
2250
2251        Ok(handler)
2252    }
2253
2254    fn handle_pending_outbound_connection(
2255        &mut self,
2256        _connection_id: ConnectionId,
2257        maybe_peer: Option<PeerId>,
2258        _addresses: &[Multiaddr],
2259        _effective_role: Endpoint,
2260    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2261        let peer_id = match maybe_peer {
2262            None => return Ok(vec![]),
2263            Some(peer) => peer,
2264        };
2265
2266        // We should order addresses from decreasing likelihood of connectivity, so start with
2267        // the addresses of that peer in the k-buckets.
2268        let key = kbucket::Key::from(peer_id);
2269        let mut peer_addrs =
2270            if let Some(kbucket::Entry::Present(mut entry, _)) = self.kbuckets.entry(&key) {
2271                let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2272                debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2273                addrs
2274            } else {
2275                Vec::new()
2276            };
2277
2278        // We add to that a temporary list of addresses from the ongoing queries.
2279        for query in self.queries.iter() {
2280            if let Some(addrs) = query.peers.addresses.get(&peer_id) {
2281                peer_addrs.extend(addrs.iter().cloned())
2282            }
2283        }
2284
2285        Ok(peer_addrs)
2286    }
2287
2288    fn on_connection_handler_event(
2289        &mut self,
2290        source: PeerId,
2291        connection: ConnectionId,
2292        event: THandlerOutEvent<Self>,
2293    ) {
2294        match event {
2295            HandlerEvent::ProtocolConfirmed { endpoint } => {
2296                debug_assert!(self.connected_peers.contains(&source));
2297                // The remote's address can only be put into the routing table,
2298                // and thus shared with other nodes, if the local node is the dialer,
2299                // since the remote address on an inbound connection may be specific
2300                // to that connection (e.g. typically the TCP port numbers).
2301                let address = match endpoint {
2302                    ConnectedPoint::Dialer { address, .. } => Some(address),
2303                    ConnectedPoint::Listener { .. } => None,
2304                };
2305
2306                self.connection_updated(source, address, NodeStatus::Connected);
2307            }
2308
2309            HandlerEvent::ProtocolNotSupported { endpoint } => {
2310                let address = match endpoint {
2311                    ConnectedPoint::Dialer { address, .. } => Some(address),
2312                    ConnectedPoint::Listener { .. } => None,
2313                };
2314                self.connection_updated(source, address, NodeStatus::Disconnected);
2315            }
2316
2317            HandlerEvent::FindNodeReq { key, request_id } => {
2318                let closer_peers = self
2319                    .find_closest_local_peers(&kbucket::Key::new(key), &source)
2320                    .collect::<Vec<_>>();
2321
2322                self.queued_events
2323                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2324                        request: InboundRequest::FindNode {
2325                            num_closer_peers: closer_peers.len(),
2326                        },
2327                    }));
2328
2329                self.queued_events.push_back(ToSwarm::NotifyHandler {
2330                    peer_id: source,
2331                    handler: NotifyHandler::One(connection),
2332                    event: HandlerIn::FindNodeRes {
2333                        closer_peers,
2334                        request_id,
2335                    },
2336                });
2337            }
2338
2339            HandlerEvent::FindNodeRes {
2340                closer_peers,
2341                query_id,
2342            } => {
2343                self.discovered(&query_id, &source, closer_peers.iter());
2344            }
2345
2346            HandlerEvent::GetProvidersReq { key, request_id } => {
2347                let provider_peers = self.provider_peers(&key, &source);
2348                let closer_peers = self
2349                    .find_closest_local_peers(&kbucket::Key::new(key), &source)
2350                    .collect::<Vec<_>>();
2351
2352                self.queued_events
2353                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2354                        request: InboundRequest::GetProvider {
2355                            num_closer_peers: closer_peers.len(),
2356                            num_provider_peers: provider_peers.len(),
2357                        },
2358                    }));
2359
2360                self.queued_events.push_back(ToSwarm::NotifyHandler {
2361                    peer_id: source,
2362                    handler: NotifyHandler::One(connection),
2363                    event: HandlerIn::GetProvidersRes {
2364                        closer_peers,
2365                        provider_peers,
2366                        request_id,
2367                    },
2368                });
2369            }
2370
2371            HandlerEvent::GetProvidersRes {
2372                closer_peers,
2373                provider_peers,
2374                query_id,
2375            } => {
2376                let peers = closer_peers.iter().chain(provider_peers.iter());
2377                self.discovered(&query_id, &source, peers);
2378                if let Some(query) = self.queries.get_mut(&query_id) {
2379                    let stats = query.stats().clone();
2380                    if let QueryInfo::GetProviders {
2381                        ref key,
2382                        ref mut providers_found,
2383                        ref mut step,
2384                        ..
2385                    } = query.info
2386                    {
2387                        *providers_found += provider_peers.len();
2388                        let providers = provider_peers.iter().map(|p| p.node_id).collect();
2389
2390                        self.queued_events.push_back(ToSwarm::GenerateEvent(
2391                            Event::OutboundQueryProgressed {
2392                                id: query_id,
2393                                result: QueryResult::GetProviders(Ok(
2394                                    GetProvidersOk::FoundProviders {
2395                                        key: key.clone(),
2396                                        providers,
2397                                    },
2398                                )),
2399                                step: step.clone(),
2400                                stats,
2401                            },
2402                        ));
2403                        *step = step.next();
2404                    }
2405                }
2406            }
2407            HandlerEvent::QueryError { query_id, error } => {
2408                tracing::debug!(
2409                    peer=%source,
2410                    query=?query_id,
2411                    "Request to peer in query failed with {:?}",
2412                    error
2413                );
2414                // If the query to which the error relates is still active,
2415                // signal the failure w.r.t. `source`.
2416                if let Some(query) = self.queries.get_mut(&query_id) {
2417                    query.on_failure(&source)
2418                }
2419            }
2420
2421            HandlerEvent::AddProvider { key, provider } => {
2422                // Only accept a provider record from a legitimate peer.
2423                if provider.node_id != source {
2424                    return;
2425                }
2426
2427                self.provider_received(key, provider);
2428            }
2429
2430            HandlerEvent::GetRecord { key, request_id } => {
2431                // Lookup the record locally.
2432                let record = match self.store.get(&key) {
2433                    Some(record) => {
2434                        if record.is_expired(Instant::now()) {
2435                            self.store.remove(&key);
2436                            None
2437                        } else {
2438                            Some(record.into_owned())
2439                        }
2440                    }
2441                    None => None,
2442                };
2443
2444                let closer_peers = self
2445                    .find_closest_local_peers(&kbucket::Key::new(key), &source)
2446                    .collect::<Vec<_>>();
2447
2448                self.queued_events
2449                    .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2450                        request: InboundRequest::GetRecord {
2451                            num_closer_peers: closer_peers.len(),
2452                            present_locally: record.is_some(),
2453                        },
2454                    }));
2455
2456                self.queued_events.push_back(ToSwarm::NotifyHandler {
2457                    peer_id: source,
2458                    handler: NotifyHandler::One(connection),
2459                    event: HandlerIn::GetRecordRes {
2460                        record,
2461                        closer_peers,
2462                        request_id,
2463                    },
2464                });
2465            }
2466
2467            HandlerEvent::GetRecordRes {
2468                record,
2469                closer_peers,
2470                query_id,
2471            } => {
2472                if let Some(query) = self.queries.get_mut(&query_id) {
2473                    let stats = query.stats().clone();
2474                    if let QueryInfo::GetRecord {
2475                        key,
2476                        ref mut step,
2477                        ref mut found_a_record,
2478                        cache_candidates,
2479                    } = &mut query.info
2480                    {
2481                        if let Some(record) = record {
2482                            *found_a_record = true;
2483                            let record = PeerRecord {
2484                                peer: Some(source),
2485                                record,
2486                            };
2487
2488                            self.queued_events.push_back(ToSwarm::GenerateEvent(
2489                                Event::OutboundQueryProgressed {
2490                                    id: query_id,
2491                                    result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2492                                        record,
2493                                    ))),
2494                                    step: step.clone(),
2495                                    stats,
2496                                },
2497                            ));
2498
2499                            *step = step.next();
2500                        } else {
2501                            tracing::trace!(record=?key, %source, "Record not found at source");
2502                            if let Caching::Enabled { max_peers } = self.caching {
2503                                let source_key = kbucket::Key::from(source);
2504                                let target_key = kbucket::Key::from(key.clone());
2505                                let distance = source_key.distance(&target_key);
2506                                cache_candidates.insert(distance, source);
2507                                if cache_candidates.len() > max_peers as usize {
2508                                    // TODO: `pop_last()` would be nice once stabilised.
2509                                    // See https://github.com/rust-lang/rust/issues/62924.
2510                                    let last =
2511                                        *cache_candidates.keys().next_back().expect("len > 0");
2512                                    cache_candidates.remove(&last);
2513                                }
2514                            }
2515                        }
2516                    }
2517                }
2518
2519                self.discovered(&query_id, &source, closer_peers.iter());
2520            }
2521
2522            HandlerEvent::PutRecord { record, request_id } => {
2523                self.record_received(source, connection, request_id, record);
2524            }
2525
2526            HandlerEvent::PutRecordRes { query_id, .. } => {
2527                if let Some(query) = self.queries.get_mut(&query_id) {
2528                    query.on_success(&source, vec![]);
2529                    if let QueryInfo::PutRecord {
2530                        phase: PutRecordPhase::PutRecord { success, .. },
2531                        quorum,
2532                        ..
2533                    } = &mut query.info
2534                    {
2535                        success.push(source);
2536
2537                        let quorum = quorum.get();
2538                        if success.len() >= quorum {
2539                            let peers = success.clone();
2540                            let finished = query.try_finish(peers.iter());
2541                            if !finished {
2542                                tracing::debug!(
2543                                    peer=%source,
2544                                    query=?query_id,
2545                                    "PutRecord query reached quorum ({}/{}) with response \
2546                                     from peer but could not yet finish.",
2547                                    peers.len(),
2548                                    quorum,
2549                                );
2550                            }
2551                        }
2552                    }
2553                }
2554            }
2555        };
2556    }
2557
2558    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
2559    fn poll(
2560        &mut self,
2561        cx: &mut Context<'_>,
2562    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2563        let now = Instant::now();
2564
2565        // Calculate the available capacity for queries triggered by background jobs.
2566        let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2567
2568        // Run the periodic provider announcement job.
2569        if let Some(mut job) = self.add_provider_job.take() {
2570            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2571            for i in 0..num {
2572                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2573                    self.start_add_provider(r.key, AddProviderContext::Republish)
2574                } else {
2575                    jobs_query_capacity -= i;
2576                    break;
2577                }
2578            }
2579            self.add_provider_job = Some(job);
2580        }
2581
2582        // Run the periodic record replication / publication job.
2583        if let Some(mut job) = self.put_record_job.take() {
2584            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2585            for _ in 0..num {
2586                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2587                    let context =
2588                        if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2589                            PutRecordContext::Republish
2590                        } else {
2591                            PutRecordContext::Replicate
2592                        };
2593                    self.start_put_record(r, Quorum::All, context)
2594                } else {
2595                    break;
2596                }
2597            }
2598            self.put_record_job = Some(job);
2599        }
2600
2601        // Poll bootstrap periodically and automatically.
2602        if let Poll::Ready(()) = self.bootstrap_status.poll_next_bootstrap(cx) {
2603            if let Err(e) = self.bootstrap() {
2604                tracing::warn!("Failed to trigger bootstrap: {e}");
2605            }
2606        }
2607
2608        loop {
2609            // Drain queued events first.
2610            if let Some(event) = self.queued_events.pop_front() {
2611                return Poll::Ready(event);
2612            }
2613
2614            // Drain applied pending entries from the routing table.
2615            if let Some(entry) = self.kbuckets.take_applied_pending() {
2616                let kbucket::Node { key, value } = entry.inserted;
2617                let peer_id = key.into_preimage();
2618                self.queued_events
2619                    .push_back(ToSwarm::NewExternalAddrOfPeer {
2620                        peer_id,
2621                        address: value.first().clone(),
2622                    });
2623                let event = Event::RoutingUpdated {
2624                    bucket_range: self
2625                        .kbuckets
2626                        .bucket(&key)
2627                        .map(|b| b.range())
2628                        .expect("Self to never be applied from pending."),
2629                    peer: peer_id,
2630                    is_new_peer: true,
2631                    addresses: value,
2632                    old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2633                };
2634                return Poll::Ready(ToSwarm::GenerateEvent(event));
2635            }
2636
2637            // Look for a finished query.
2638            loop {
2639                match self.queries.poll(now) {
2640                    QueryPoolState::Finished(q) => {
2641                        if let Some(event) = self.query_finished(q) {
2642                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2643                        }
2644                    }
2645                    QueryPoolState::Timeout(q) => {
2646                        if let Some(event) = self.query_timeout(q) {
2647                            return Poll::Ready(ToSwarm::GenerateEvent(event));
2648                        }
2649                    }
2650                    QueryPoolState::Waiting(Some((query, peer_id))) => {
2651                        let event = query.info.to_request(query.id());
2652                        // TODO: AddProvider requests yield no response, so the query completes
2653                        // as soon as all requests have been sent. However, the handler should
2654                        // better emit an event when the request has been sent (and report
2655                        // an error if sending fails), instead of immediately reporting
2656                        // "success" somewhat prematurely here.
2657                        if let QueryInfo::AddProvider {
2658                            phase: AddProviderPhase::AddProvider { .. },
2659                            ..
2660                        } = &query.info
2661                        {
2662                            query.on_success(&peer_id, vec![])
2663                        }
2664
2665                        if self.connected_peers.contains(&peer_id) {
2666                            self.queued_events.push_back(ToSwarm::NotifyHandler {
2667                                peer_id,
2668                                event,
2669                                handler: NotifyHandler::Any,
2670                            });
2671                        } else if &peer_id != self.kbuckets.local_key().preimage() {
2672                            query.pending_rpcs.push((peer_id, event));
2673                            self.queued_events.push_back(ToSwarm::Dial {
2674                                opts: DialOpts::peer_id(peer_id).build(),
2675                            });
2676                        }
2677                    }
2678                    QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2679                }
2680            }
2681
2682            // No immediate event was produced as a result of a finished query.
2683            // If no new events have been queued either, signal `NotReady` to
2684            // be polled again later.
2685            if self.queued_events.is_empty() {
2686                self.no_events_waker = Some(cx.waker().clone());
2687
2688                return Poll::Pending;
2689            }
2690        }
2691    }
2692
2693    fn on_swarm_event(&mut self, event: FromSwarm) {
2694        self.listen_addresses.on_swarm_event(&event);
2695        let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2696
2697        if self.auto_mode && external_addresses_changed {
2698            self.determine_mode_from_external_addresses();
2699        }
2700
2701        match event {
2702            FromSwarm::ConnectionEstablished(connection_established) => {
2703                self.on_connection_established(connection_established)
2704            }
2705            FromSwarm::ConnectionClosed(connection_closed) => {
2706                self.on_connection_closed(connection_closed)
2707            }
2708            FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2709            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2710            FromSwarm::NewListenAddr(_) if self.connected_peers.is_empty() => {
2711                // A new listen addr was just discovered and we have no connected peers,
2712                // it can mean that our network interfaces were not up but they are now
2713                // so it might be a good idea to trigger a bootstrap.
2714                self.bootstrap_status.trigger();
2715            }
2716            _ => {}
2717        }
2718    }
2719}
2720
2721/// Peer Info combines a Peer ID with a set of multiaddrs that the peer is listening on.
2722#[derive(Debug, Clone, PartialEq, Eq)]
2723pub struct PeerInfo {
2724    pub peer_id: PeerId,
2725    pub addrs: Vec<Multiaddr>,
2726}
2727
2728/// A quorum w.r.t. the configured replication factor specifies the minimum
2729/// number of distinct nodes that must be successfully contacted in order
2730/// for a query to succeed.
2731#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2732pub enum Quorum {
2733    One,
2734    Majority,
2735    All,
2736    N(NonZeroUsize),
2737}
2738
2739impl Quorum {
2740    /// Evaluate the quorum w.r.t a given total (number of peers).
2741    fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2742        match self {
2743            Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2744            Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2745            Quorum::All => total,
2746            Quorum::N(n) => NonZeroUsize::min(total, *n),
2747        }
2748    }
2749}
2750
2751/// A record either received by the given peer or retrieved from the local
2752/// record store.
2753#[derive(Debug, Clone, PartialEq, Eq)]
2754pub struct PeerRecord {
2755    /// The peer from whom the record was received. `None` if the record was
2756    /// retrieved from local storage.
2757    pub peer: Option<PeerId>,
2758    pub record: Record,
2759}
2760
2761//////////////////////////////////////////////////////////////////////////////
2762// Events
2763
2764/// The events produced by the `Kademlia` behaviour.
2765///
2766/// See [`NetworkBehaviour::poll`].
2767#[derive(Debug, Clone)]
2768#[allow(clippy::large_enum_variant)]
2769pub enum Event {
2770    /// An inbound request has been received and handled.
2771    // Note on the difference between 'request' and 'query': A request is a
2772    // single request-response style exchange with a single remote peer. A query
2773    // is made of multiple requests across multiple remote peers.
2774    InboundRequest { request: InboundRequest },
2775
2776    /// An outbound query has made progress.
2777    OutboundQueryProgressed {
2778        /// The ID of the query that finished.
2779        id: QueryId,
2780        /// The intermediate result of the query.
2781        result: QueryResult,
2782        /// Execution statistics from the query.
2783        stats: QueryStats,
2784        /// Indicates which event this is, if therer are multiple responses for a single query.
2785        step: ProgressStep,
2786    },
2787
2788    /// The routing table has been updated with a new peer and / or
2789    /// address, thereby possibly evicting another peer.
2790    RoutingUpdated {
2791        /// The ID of the peer that was added or updated.
2792        peer: PeerId,
2793        /// Whether this is a new peer and was thus just added to the routing
2794        /// table, or whether it is an existing peer who's addresses changed.
2795        is_new_peer: bool,
2796        /// The full list of known addresses of `peer`.
2797        addresses: Addresses,
2798        /// Returns the minimum inclusive and maximum inclusive distance for
2799        /// the bucket of the peer.
2800        bucket_range: (Distance, Distance),
2801        /// The ID of the peer that was evicted from the routing table to make
2802        /// room for the new peer, if any.
2803        old_peer: Option<PeerId>,
2804    },
2805
2806    /// A peer has connected for whom no listen address is known.
2807    ///
2808    /// If the peer is to be added to the routing table, a known
2809    /// listen address for the peer must be provided via [`Behaviour::add_address`].
2810    UnroutablePeer { peer: PeerId },
2811
2812    /// A connection to a peer has been established for whom a listen address
2813    /// is known but the peer has not been added to the routing table either
2814    /// because [`BucketInserts::Manual`] is configured or because
2815    /// the corresponding bucket is full.
2816    ///
2817    /// If the peer is to be included in the routing table, it must
2818    /// must be explicitly added via [`Behaviour::add_address`], possibly after
2819    /// removing another peer.
2820    ///
2821    /// See [`Behaviour::kbucket`] for insight into the contents of
2822    /// the k-bucket of `peer`.
2823    RoutablePeer { peer: PeerId, address: Multiaddr },
2824
2825    /// A connection to a peer has been established for whom a listen address
2826    /// is known but the peer is only pending insertion into the routing table
2827    /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2828    /// may not make it into the routing table.
2829    ///
2830    /// If the peer is to be unconditionally included in the routing table,
2831    /// it should be explicitly added via [`Behaviour::add_address`] after
2832    /// removing another peer.
2833    ///
2834    /// See [`Behaviour::kbucket`] for insight into the contents of
2835    /// the k-bucket of `peer`.
2836    PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2837
2838    /// This peer's mode has been updated automatically.
2839    ///
2840    /// This happens in response to an external
2841    /// address being added or removed.
2842    ModeChanged { new_mode: Mode },
2843}
2844
2845/// Information about progress events.
2846#[derive(Debug, Clone)]
2847pub struct ProgressStep {
2848    /// The index into the event
2849    pub count: NonZeroUsize,
2850    /// Is this the final event?
2851    pub last: bool,
2852}
2853
2854impl ProgressStep {
2855    fn first() -> Self {
2856        Self {
2857            count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2858            last: false,
2859        }
2860    }
2861
2862    fn first_and_last() -> Self {
2863        let mut first = ProgressStep::first();
2864        first.last = true;
2865        first
2866    }
2867
2868    fn next(&self) -> Self {
2869        assert!(!self.last);
2870        let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2871
2872        Self { count, last: false }
2873    }
2874}
2875
2876/// Information about a received and handled inbound request.
2877#[derive(Debug, Clone)]
2878pub enum InboundRequest {
2879    /// Request for the list of nodes whose IDs are the closest to `key`.
2880    FindNode { num_closer_peers: usize },
2881    /// Same as `FindNode`, but should also return the entries of the local
2882    /// providers list for this key.
2883    GetProvider {
2884        num_closer_peers: usize,
2885        num_provider_peers: usize,
2886    },
2887    /// A peer sent an add provider request.
2888    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2889    /// included.
2890    ///
2891    /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2892    AddProvider { record: Option<ProviderRecord> },
2893    /// Request to retrieve a record.
2894    GetRecord {
2895        num_closer_peers: usize,
2896        present_locally: bool,
2897    },
2898    /// A peer sent a put record request.
2899    /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2900    ///
2901    /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2902    PutRecord {
2903        source: PeerId,
2904        connection: ConnectionId,
2905        record: Option<Record>,
2906    },
2907}
2908
2909/// The results of Kademlia queries.
2910#[derive(Debug, Clone)]
2911pub enum QueryResult {
2912    /// The result of [`Behaviour::bootstrap`].
2913    Bootstrap(BootstrapResult),
2914
2915    /// The result of [`Behaviour::get_closest_peers`].
2916    GetClosestPeers(GetClosestPeersResult),
2917
2918    /// The result of [`Behaviour::get_providers`].
2919    GetProviders(GetProvidersResult),
2920
2921    /// The result of [`Behaviour::start_providing`].
2922    StartProviding(AddProviderResult),
2923
2924    /// The result of a (automatic) republishing of a provider record.
2925    RepublishProvider(AddProviderResult),
2926
2927    /// The result of [`Behaviour::get_record`].
2928    GetRecord(GetRecordResult),
2929
2930    /// The result of [`Behaviour::put_record`].
2931    PutRecord(PutRecordResult),
2932
2933    /// The result of a (automatic) republishing of a (value-)record.
2934    RepublishRecord(PutRecordResult),
2935}
2936
2937/// The result of [`Behaviour::get_record`].
2938pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2939
2940/// The successful result of [`Behaviour::get_record`].
2941#[derive(Debug, Clone)]
2942pub enum GetRecordOk {
2943    FoundRecord(PeerRecord),
2944    FinishedWithNoAdditionalRecord {
2945        /// If caching is enabled, these are the peers closest
2946        /// _to the record key_ (not the local node) that were queried but
2947        /// did not return the record, sorted by distance to the record key
2948        /// from closest to farthest. How many of these are tracked is configured
2949        /// by [`Config::set_caching`].
2950        ///
2951        /// Writing back the cache at these peers is a manual operation.
2952        /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2953        /// after selecting one of the returned records.
2954        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2955    },
2956}
2957
2958/// The error result of [`Behaviour::get_record`].
2959#[derive(Debug, Clone, Error)]
2960pub enum GetRecordError {
2961    #[error("the record was not found")]
2962    NotFound {
2963        key: record::Key,
2964        closest_peers: Vec<PeerId>,
2965    },
2966    #[error("the quorum failed; needed {quorum} peers")]
2967    QuorumFailed {
2968        key: record::Key,
2969        records: Vec<PeerRecord>,
2970        quorum: NonZeroUsize,
2971    },
2972    #[error("the request timed out")]
2973    Timeout { key: record::Key },
2974}
2975
2976impl GetRecordError {
2977    /// Gets the key of the record for which the operation failed.
2978    pub fn key(&self) -> &record::Key {
2979        match self {
2980            GetRecordError::QuorumFailed { key, .. } => key,
2981            GetRecordError::Timeout { key, .. } => key,
2982            GetRecordError::NotFound { key, .. } => key,
2983        }
2984    }
2985
2986    /// Extracts the key of the record for which the operation failed,
2987    /// consuming the error.
2988    pub fn into_key(self) -> record::Key {
2989        match self {
2990            GetRecordError::QuorumFailed { key, .. } => key,
2991            GetRecordError::Timeout { key, .. } => key,
2992            GetRecordError::NotFound { key, .. } => key,
2993        }
2994    }
2995}
2996
2997/// The result of [`Behaviour::put_record`].
2998pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2999
3000/// The successful result of [`Behaviour::put_record`].
3001#[derive(Debug, Clone)]
3002pub struct PutRecordOk {
3003    pub key: record::Key,
3004}
3005
3006/// The error result of [`Behaviour::put_record`].
3007#[derive(Debug, Clone, Error)]
3008pub enum PutRecordError {
3009    #[error("the quorum failed; needed {quorum} peers")]
3010    QuorumFailed {
3011        key: record::Key,
3012        /// [`PeerId`]s of the peers the record was successfully stored on.
3013        success: Vec<PeerId>,
3014        quorum: NonZeroUsize,
3015    },
3016    #[error("the request timed out")]
3017    Timeout {
3018        key: record::Key,
3019        /// [`PeerId`]s of the peers the record was successfully stored on.
3020        success: Vec<PeerId>,
3021        quorum: NonZeroUsize,
3022    },
3023}
3024
3025impl PutRecordError {
3026    /// Gets the key of the record for which the operation failed.
3027    pub fn key(&self) -> &record::Key {
3028        match self {
3029            PutRecordError::QuorumFailed { key, .. } => key,
3030            PutRecordError::Timeout { key, .. } => key,
3031        }
3032    }
3033
3034    /// Extracts the key of the record for which the operation failed,
3035    /// consuming the error.
3036    pub fn into_key(self) -> record::Key {
3037        match self {
3038            PutRecordError::QuorumFailed { key, .. } => key,
3039            PutRecordError::Timeout { key, .. } => key,
3040        }
3041    }
3042}
3043
3044/// The result of [`Behaviour::bootstrap`].
3045pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
3046
3047/// The successful result of [`Behaviour::bootstrap`].
3048#[derive(Debug, Clone)]
3049pub struct BootstrapOk {
3050    pub peer: PeerId,
3051    pub num_remaining: u32,
3052}
3053
3054/// The error result of [`Behaviour::bootstrap`].
3055#[derive(Debug, Clone, Error)]
3056pub enum BootstrapError {
3057    #[error("the request timed out")]
3058    Timeout {
3059        peer: PeerId,
3060        num_remaining: Option<u32>,
3061    },
3062}
3063
3064/// The result of [`Behaviour::get_closest_peers`].
3065pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
3066
3067/// The successful result of [`Behaviour::get_closest_peers`].
3068#[derive(Debug, Clone)]
3069pub struct GetClosestPeersOk {
3070    pub key: Vec<u8>,
3071    pub peers: Vec<PeerInfo>,
3072}
3073
3074/// The error result of [`Behaviour::get_closest_peers`].
3075#[derive(Debug, Clone, Error)]
3076pub enum GetClosestPeersError {
3077    #[error("the request timed out")]
3078    Timeout { key: Vec<u8>, peers: Vec<PeerInfo> },
3079}
3080
3081impl GetClosestPeersError {
3082    /// Gets the key for which the operation failed.
3083    pub fn key(&self) -> &Vec<u8> {
3084        match self {
3085            GetClosestPeersError::Timeout { key, .. } => key,
3086        }
3087    }
3088
3089    /// Extracts the key for which the operation failed,
3090    /// consuming the error.
3091    pub fn into_key(self) -> Vec<u8> {
3092        match self {
3093            GetClosestPeersError::Timeout { key, .. } => key,
3094        }
3095    }
3096}
3097
3098/// The result of [`Behaviour::get_providers`].
3099pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
3100
3101/// The successful result of [`Behaviour::get_providers`].
3102#[derive(Debug, Clone)]
3103pub enum GetProvidersOk {
3104    FoundProviders {
3105        key: record::Key,
3106        /// The new set of providers discovered.
3107        providers: HashSet<PeerId>,
3108    },
3109    FinishedWithNoAdditionalRecord {
3110        closest_peers: Vec<PeerId>,
3111    },
3112}
3113
3114/// The error result of [`Behaviour::get_providers`].
3115#[derive(Debug, Clone, Error)]
3116pub enum GetProvidersError {
3117    #[error("the request timed out")]
3118    Timeout {
3119        key: record::Key,
3120        closest_peers: Vec<PeerId>,
3121    },
3122}
3123
3124impl GetProvidersError {
3125    /// Gets the key for which the operation failed.
3126    pub fn key(&self) -> &record::Key {
3127        match self {
3128            GetProvidersError::Timeout { key, .. } => key,
3129        }
3130    }
3131
3132    /// Extracts the key for which the operation failed,
3133    /// consuming the error.
3134    pub fn into_key(self) -> record::Key {
3135        match self {
3136            GetProvidersError::Timeout { key, .. } => key,
3137        }
3138    }
3139}
3140
3141/// The result of publishing a provider record.
3142pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
3143
3144/// The successful result of publishing a provider record.
3145#[derive(Debug, Clone)]
3146pub struct AddProviderOk {
3147    pub key: record::Key,
3148}
3149
3150/// The possible errors when publishing a provider record.
3151#[derive(Debug, Clone, Error)]
3152pub enum AddProviderError {
3153    #[error("the request timed out")]
3154    Timeout { key: record::Key },
3155}
3156
3157impl AddProviderError {
3158    /// Gets the key for which the operation failed.
3159    pub fn key(&self) -> &record::Key {
3160        match self {
3161            AddProviderError::Timeout { key, .. } => key,
3162        }
3163    }
3164
3165    /// Extracts the key for which the operation failed,
3166    pub fn into_key(self) -> record::Key {
3167        match self {
3168            AddProviderError::Timeout { key, .. } => key,
3169        }
3170    }
3171}
3172
3173impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3174    fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3175        KadPeer {
3176            node_id: e.node.key.into_preimage(),
3177            multiaddrs: e.node.value.into_vec(),
3178            connection_ty: match e.status {
3179                NodeStatus::Connected => ConnectionType::Connected,
3180                NodeStatus::Disconnected => ConnectionType::NotConnected,
3181            },
3182        }
3183    }
3184}
3185
3186/// The context of a [`QueryInfo::AddProvider`] query.
3187#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3188pub enum AddProviderContext {
3189    /// The context is a [`Behaviour::start_providing`] operation.
3190    Publish,
3191    /// The context is periodic republishing of provider announcements
3192    /// initiated earlier via [`Behaviour::start_providing`].
3193    Republish,
3194}
3195
3196/// The context of a [`QueryInfo::PutRecord`] query.
3197#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3198pub enum PutRecordContext {
3199    /// The context is a [`Behaviour::put_record`] operation.
3200    Publish,
3201    /// The context is periodic republishing of records stored
3202    /// earlier via [`Behaviour::put_record`].
3203    Republish,
3204    /// The context is periodic replication (i.e. without extending
3205    /// the record TTL) of stored records received earlier from another peer.
3206    Replicate,
3207    /// The context is a custom store operation targeting specific
3208    /// peers initiated by [`Behaviour::put_record_to`].
3209    Custom,
3210}
3211
3212/// Information about a running query.
3213#[derive(Debug, Clone)]
3214pub enum QueryInfo {
3215    /// A query initiated by [`Behaviour::bootstrap`].
3216    Bootstrap {
3217        /// The targeted peer ID.
3218        peer: PeerId,
3219        /// The remaining random peer IDs to query, one per
3220        /// bucket that still needs refreshing.
3221        ///
3222        /// This is `None` if the initial self-lookup has not
3223        /// yet completed and `Some` with an exhausted iterator
3224        /// if bootstrapping is complete.
3225        remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3226        step: ProgressStep,
3227    },
3228
3229    /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3230    GetClosestPeers {
3231        /// The key being queried (the preimage).
3232        key: Vec<u8>,
3233        /// Current index of events.
3234        step: ProgressStep,
3235        /// If required, `num_results` specifies expected responding peers
3236        num_results: Option<NonZeroUsize>,
3237    },
3238
3239    /// A (repeated) query initiated by [`Behaviour::get_providers`].
3240    GetProviders {
3241        /// The key for which to search for providers.
3242        key: record::Key,
3243        /// The number of providers found so far.
3244        providers_found: usize,
3245        /// Current index of events.
3246        step: ProgressStep,
3247    },
3248
3249    /// A (repeated) query initiated by [`Behaviour::start_providing`].
3250    AddProvider {
3251        /// The record key.
3252        key: record::Key,
3253        /// The current phase of the query.
3254        phase: AddProviderPhase,
3255        /// The execution context of the query.
3256        context: AddProviderContext,
3257    },
3258
3259    /// A (repeated) query initiated by [`Behaviour::put_record`].
3260    PutRecord {
3261        record: Record,
3262        /// The expected quorum of responses w.r.t. the replication factor.
3263        quorum: NonZeroUsize,
3264        /// The current phase of the query.
3265        phase: PutRecordPhase,
3266        /// The execution context of the query.
3267        context: PutRecordContext,
3268    },
3269
3270    /// A (repeated) query initiated by [`Behaviour::get_record`].
3271    GetRecord {
3272        /// The key to look for.
3273        key: record::Key,
3274        /// Current index of events.
3275        step: ProgressStep,
3276        /// Did we find at least one record?
3277        found_a_record: bool,
3278        /// The peers closest to the `key` that were queried but did not return a record,
3279        /// i.e. the peers that are candidates for caching the record.
3280        cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3281    },
3282}
3283
3284impl QueryInfo {
3285    /// Creates an event for a handler to issue an outgoing request in the
3286    /// context of a query.
3287    fn to_request(&self, query_id: QueryId) -> HandlerIn {
3288        match &self {
3289            QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3290                key: peer.to_bytes(),
3291                query_id,
3292            },
3293            QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3294                key: key.clone(),
3295                query_id,
3296            },
3297            QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3298                key: key.clone(),
3299                query_id,
3300            },
3301            QueryInfo::AddProvider { key, phase, .. } => match phase {
3302                AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3303                    key: key.to_vec(),
3304                    query_id,
3305                },
3306                AddProviderPhase::AddProvider {
3307                    provider_id,
3308                    external_addresses,
3309                    ..
3310                } => HandlerIn::AddProvider {
3311                    key: key.clone(),
3312                    provider: crate::protocol::KadPeer {
3313                        node_id: *provider_id,
3314                        multiaddrs: external_addresses.clone(),
3315                        connection_ty: crate::protocol::ConnectionType::Connected,
3316                    },
3317                    query_id,
3318                },
3319            },
3320            QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3321                key: key.clone(),
3322                query_id,
3323            },
3324            QueryInfo::PutRecord { record, phase, .. } => match phase {
3325                PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3326                    key: record.key.to_vec(),
3327                    query_id,
3328                },
3329                PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3330                    record: record.clone(),
3331                    query_id,
3332                },
3333            },
3334        }
3335    }
3336}
3337
3338/// The phases of a [`QueryInfo::AddProvider`] query.
3339#[derive(Debug, Clone)]
3340pub enum AddProviderPhase {
3341    /// The query is searching for the closest nodes to the record key.
3342    GetClosestPeers,
3343
3344    /// The query advertises the local node as a provider for the key to
3345    /// the closest nodes to the key.
3346    AddProvider {
3347        /// The local peer ID that is advertised as a provider.
3348        provider_id: PeerId,
3349        /// The external addresses of the provider being advertised.
3350        external_addresses: Vec<Multiaddr>,
3351        /// Query statistics from the finished `GetClosestPeers` phase.
3352        get_closest_peers_stats: QueryStats,
3353    },
3354}
3355
3356/// The phases of a [`QueryInfo::PutRecord`] query.
3357#[derive(Debug, Clone, PartialEq, Eq)]
3358pub enum PutRecordPhase {
3359    /// The query is searching for the closest nodes to the record key.
3360    GetClosestPeers,
3361
3362    /// The query is replicating the record to the closest nodes to the key.
3363    PutRecord {
3364        /// A list of peers the given record has been successfully replicated to.
3365        success: Vec<PeerId>,
3366        /// Query statistics from the finished `GetClosestPeers` phase.
3367        get_closest_peers_stats: QueryStats,
3368    },
3369}
3370
3371/// A mutable reference to a running query.
3372pub struct QueryMut<'a> {
3373    query: &'a mut Query,
3374}
3375
3376impl QueryMut<'_> {
3377    pub fn id(&self) -> QueryId {
3378        self.query.id()
3379    }
3380
3381    /// Gets information about the type and state of the query.
3382    pub fn info(&self) -> &QueryInfo {
3383        &self.query.info
3384    }
3385
3386    /// Gets execution statistics about the query.
3387    ///
3388    /// For a multi-phase query such as `put_record`, these are the
3389    /// statistics of the current phase.
3390    pub fn stats(&self) -> &QueryStats {
3391        self.query.stats()
3392    }
3393
3394    /// Finishes the query asap, without waiting for the
3395    /// regular termination conditions.
3396    pub fn finish(&mut self) {
3397        self.query.finish()
3398    }
3399}
3400
3401/// An immutable reference to a running query.
3402pub struct QueryRef<'a> {
3403    query: &'a Query,
3404}
3405
3406impl QueryRef<'_> {
3407    pub fn id(&self) -> QueryId {
3408        self.query.id()
3409    }
3410
3411    /// Gets information about the type and state of the query.
3412    pub fn info(&self) -> &QueryInfo {
3413        &self.query.info
3414    }
3415
3416    /// Gets execution statistics about the query.
3417    ///
3418    /// For a multi-phase query such as `put_record`, these are the
3419    /// statistics of the current phase.
3420    pub fn stats(&self) -> &QueryStats {
3421        self.query.stats()
3422    }
3423}
3424
3425/// An operation failed to due no known peers in the routing table.
3426#[derive(Debug, Clone)]
3427pub struct NoKnownPeers();
3428
3429impl fmt::Display for NoKnownPeers {
3430    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3431        write!(f, "No known peers.")
3432    }
3433}
3434
3435impl std::error::Error for NoKnownPeers {}
3436
3437/// The possible outcomes of [`Behaviour::add_address`].
3438#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3439pub enum RoutingUpdate {
3440    /// The given peer and address has been added to the routing
3441    /// table.
3442    Success,
3443    /// The peer and address is pending insertion into
3444    /// the routing table, if a disconnected peer fails
3445    /// to respond. If the given peer and address ends up
3446    /// in the routing table, [`Event::RoutingUpdated`]
3447    /// is eventually emitted.
3448    Pending,
3449    /// The routing table update failed, either because the
3450    /// corresponding bucket for the peer is full and the
3451    /// pending slot(s) are occupied, or because the given
3452    /// peer ID is deemed invalid (e.g. refers to the local
3453    /// peer ID).
3454    Failed,
3455}
3456
3457#[derive(PartialEq, Copy, Clone, Debug)]
3458pub enum Mode {
3459    Client,
3460    Server,
3461}
3462
3463impl fmt::Display for Mode {
3464    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3465        match self {
3466            Mode::Client => write!(f, "client"),
3467            Mode::Server => write!(f, "server"),
3468        }
3469    }
3470}
3471
3472fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3473where
3474    T: ToString,
3475{
3476    confirmed_external_addresses
3477        .iter()
3478        .map(|addr| addr.to_string())
3479        .collect::<Vec<_>>()
3480        .join(", ")
3481}