ant_libp2p_kad/behaviour.rs
1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the `Kademlia` network behaviour.
22
23mod test;
24
25use ant_libp2p_core as libp2p_core;
26use ant_libp2p_swarm as libp2p_swarm;
27
28use std::{
29 collections::{BTreeMap, HashMap, HashSet, VecDeque},
30 fmt,
31 num::NonZeroUsize,
32 task::{Context, Poll, Waker},
33 time::Duration,
34 vec,
35};
36
37use fnv::FnvHashSet;
38use libp2p_core::{transport::PortUse, ConnectedPoint, Endpoint, Multiaddr};
39use libp2p_identity::PeerId;
40use libp2p_swarm::{
41 behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
42 dial_opts::{self, DialOpts},
43 ConnectionDenied, ConnectionHandler, ConnectionId, DialError, ExternalAddresses,
44 ListenAddresses, NetworkBehaviour, NotifyHandler, StreamProtocol, THandler, THandlerInEvent,
45 THandlerOutEvent, ToSwarm,
46};
47use thiserror::Error;
48use tracing::Level;
49use web_time::Instant;
50
51pub use crate::query::QueryStats;
52use crate::{
53 addresses::Addresses,
54 bootstrap,
55 handler::{Handler, HandlerEvent, HandlerIn, RequestId},
56 jobs::*,
57 kbucket::{self, Distance, KBucketConfig, KBucketsTable, NodeStatus},
58 protocol,
59 protocol::{ConnectionType, KadPeer, ProtocolConfig},
60 query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState},
61 record::{
62 self,
63 store::{self, RecordStore},
64 ProviderRecord, Record,
65 },
66 K_VALUE,
67};
68
69/// `Behaviour` is a `NetworkBehaviour` that implements the libp2p
70/// Kademlia protocol.
71pub struct Behaviour<TStore> {
72 /// The Kademlia routing table.
73 kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
74
75 /// The k-bucket insertion strategy.
76 kbucket_inserts: BucketInserts,
77
78 /// Configuration of the wire protocol.
79 protocol_config: ProtocolConfig,
80
81 /// Configuration of [`RecordStore`] filtering.
82 record_filtering: StoreInserts,
83
84 /// The currently active (i.e. in-progress) queries.
85 queries: QueryPool,
86
87 /// The currently connected peers.
88 ///
89 /// This is a superset of the connected peers currently in the routing table.
90 connected_peers: FnvHashSet<PeerId>,
91
92 /// Periodic job for re-publication of provider records for keys
93 /// provided by the local node.
94 add_provider_job: Option<AddProviderJob>,
95
96 /// Periodic job for (re-)replication and (re-)publishing of
97 /// regular (value-)records.
98 put_record_job: Option<PutRecordJob>,
99
100 /// The TTL of regular (value-)records.
101 record_ttl: Option<Duration>,
102
103 /// The TTL of provider records.
104 provider_record_ttl: Option<Duration>,
105
106 /// Queued events to return when the behaviour is being polled.
107 queued_events: VecDeque<ToSwarm<Event, HandlerIn>>,
108
109 listen_addresses: ListenAddresses,
110
111 external_addresses: ExternalAddresses,
112
113 connections: HashMap<ConnectionId, PeerId>,
114
115 /// See [`Config::caching`].
116 caching: Caching,
117
118 local_peer_id: PeerId,
119
120 mode: Mode,
121 auto_mode: bool,
122 no_events_waker: Option<Waker>,
123
124 /// The record storage.
125 store: TStore,
126
127 /// Tracks the status of the current bootstrap.
128 bootstrap_status: bootstrap::Status,
129}
130
131/// The configurable strategies for the insertion of peers
132/// and their addresses into the k-buckets of the Kademlia
133/// routing table.
134#[derive(Copy, Clone, Debug, PartialEq, Eq)]
135pub enum BucketInserts {
136 /// Whenever a connection to a peer is established as a
137 /// result of a dialing attempt and that peer is not yet
138 /// in the routing table, it is inserted as long as there
139 /// is a free slot in the corresponding k-bucket. If the
140 /// k-bucket is full but still has a free pending slot,
141 /// it may be inserted into the routing table at a later time if an unresponsive
142 /// disconnected peer is evicted from the bucket.
143 OnConnected,
144 /// New peers and addresses are only added to the routing table via
145 /// explicit calls to [`Behaviour::add_address`].
146 ///
147 /// > **Note**: Even though peers can only get into the
148 /// > routing table as a result of [`Behaviour::add_address`],
149 /// > routing table entries are still updated as peers
150 /// > connect and disconnect (i.e. the order of the entries
151 /// > as well as the network addresses).
152 Manual,
153}
154
155/// The configurable filtering strategies for the acceptance of
156/// incoming records.
157///
158/// This can be used for e.g. signature verification or validating
159/// the accompanying [`Key`].
160///
161/// [`Key`]: crate::record::Key
162#[derive(Copy, Clone, Debug, PartialEq, Eq)]
163pub enum StoreInserts {
164 /// Whenever a (provider) record is received,
165 /// the record is forwarded immediately to the [`RecordStore`].
166 Unfiltered,
167 /// Whenever a (provider) record is received, an event is emitted.
168 /// Provider records generate a [`InboundRequest::AddProvider`] under
169 /// [`Event::InboundRequest`], normal records generate a [`InboundRequest::PutRecord`]
170 /// under [`Event::InboundRequest`].
171 ///
172 /// When deemed valid, a (provider) record needs to be explicitly stored in
173 /// the [`RecordStore`] via [`RecordStore::put`] or [`RecordStore::add_provider`],
174 /// whichever is applicable. A mutable reference to the [`RecordStore`] can
175 /// be retrieved via [`Behaviour::store_mut`].
176 FilterBoth,
177}
178
179/// The configuration for the `Kademlia` behaviour.
180///
181/// The configuration is consumed by [`Behaviour::new`].
182#[derive(Debug, Clone)]
183pub struct Config {
184 kbucket_config: KBucketConfig,
185 query_config: QueryConfig,
186 protocol_config: ProtocolConfig,
187 record_ttl: Option<Duration>,
188 record_replication_interval: Option<Duration>,
189 record_publication_interval: Option<Duration>,
190 record_filtering: StoreInserts,
191 provider_record_ttl: Option<Duration>,
192 provider_publication_interval: Option<Duration>,
193 kbucket_inserts: BucketInserts,
194 caching: Caching,
195 periodic_bootstrap_interval: Option<Duration>,
196 automatic_bootstrap_throttle: Option<Duration>,
197}
198
199impl Default for Config {
200 /// Returns the default configuration.
201 ///
202 /// Deprecated: use `Config::new` instead.
203 fn default() -> Self {
204 Self::new(protocol::DEFAULT_PROTO_NAME)
205 }
206}
207
208/// The configuration for Kademlia "write-back" caching after successful
209/// lookups via [`Behaviour::get_record`].
210#[derive(Debug, Clone)]
211pub enum Caching {
212 /// Caching is disabled and the peers closest to records being looked up
213 /// that do not return a record are not tracked, i.e.
214 /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
215 Disabled,
216 /// Up to `max_peers` peers not returning a record that are closest to the key
217 /// being looked up are tracked and returned in
218 /// [`GetRecordOk::FinishedWithNoAdditionalRecord`]. The write-back operation must be
219 /// performed explicitly, if desired and after choosing a record from the results, via
220 /// [`Behaviour::put_record_to`].
221 Enabled { max_peers: u16 },
222}
223
224impl Config {
225 /// Builds a new `Config` with the given protocol name.
226 pub fn new(protocol_name: StreamProtocol) -> Self {
227 Config {
228 kbucket_config: KBucketConfig::default(),
229 query_config: QueryConfig::default(),
230 protocol_config: ProtocolConfig::new(protocol_name),
231 record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
232 record_replication_interval: Some(Duration::from_secs(60 * 60)),
233 record_publication_interval: Some(Duration::from_secs(22 * 60 * 60)),
234 record_filtering: StoreInserts::Unfiltered,
235 provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
236 provider_record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
237 kbucket_inserts: BucketInserts::OnConnected,
238 caching: Caching::Enabled { max_peers: 1 },
239 periodic_bootstrap_interval: Some(Duration::from_secs(5 * 60)),
240 automatic_bootstrap_throttle: Some(bootstrap::DEFAULT_AUTOMATIC_THROTTLE),
241 }
242 }
243
244 /// Returns the default configuration.
245 #[deprecated(note = "Use `Config::new` instead")]
246 #[allow(clippy::should_implement_trait)]
247 pub fn default() -> Self {
248 Default::default()
249 }
250
251 /// Sets custom protocol names.
252 ///
253 /// Kademlia nodes only communicate with other nodes using the same protocol
254 /// name. Using custom name(s) therefore allows to segregate the DHT from
255 /// others, if that is desired.
256 ///
257 /// More than one protocol name can be supplied. In this case the node will
258 /// be able to talk to other nodes supporting any of the provided names.
259 /// Multiple names must be used with caution to avoid network partitioning.
260 #[deprecated(note = "Use `Config::new` instead")]
261 #[allow(deprecated)]
262 pub fn set_protocol_names(&mut self, names: Vec<StreamProtocol>) -> &mut Self {
263 self.protocol_config.set_protocol_names(names);
264 self
265 }
266
267 /// Sets the timeout for a single query.
268 ///
269 /// > **Note**: A single query usually comprises at least as many requests
270 /// > as the replication factor, i.e. this is not a request timeout.
271 ///
272 /// The default is 60 seconds.
273 pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
274 self.query_config.timeout = timeout;
275 self
276 }
277
278 /// Sets the replication factor to use.
279 ///
280 /// The replication factor determines to how many closest peers
281 /// a record is replicated. The default is [`crate::K_VALUE`].
282 pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
283 self.query_config.replication_factor = replication_factor;
284 self
285 }
286
287 /// Sets the allowed level of parallelism for iterative queries.
288 ///
289 /// The `α` parameter in the Kademlia paper. The maximum number of peers
290 /// that an iterative query is allowed to wait for in parallel while
291 /// iterating towards the closest nodes to a target. Defaults to
292 /// `ALPHA_VALUE`.
293 ///
294 /// This only controls the level of parallelism of an iterative query, not
295 /// the level of parallelism of a query to a fixed set of peers.
296 ///
297 /// When used with [`Config::disjoint_query_paths`] it equals
298 /// the amount of disjoint paths used.
299 pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
300 self.query_config.parallelism = parallelism;
301 self
302 }
303
304 /// Require iterative queries to use disjoint paths for increased resiliency
305 /// in the presence of potentially adversarial nodes.
306 ///
307 /// When enabled the number of disjoint paths used equals the configured
308 /// parallelism.
309 ///
310 /// See the S/Kademlia paper for more information on the high level design
311 /// as well as its security improvements.
312 pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
313 self.query_config.disjoint_query_paths = enabled;
314 self
315 }
316
317 /// Sets the TTL for stored records.
318 ///
319 /// The TTL should be significantly longer than the (re-)publication
320 /// interval, to avoid premature expiration of records. The default is 36
321 /// hours.
322 ///
323 /// `None` means records never expire.
324 ///
325 /// Does not apply to provider records.
326 pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
327 self.record_ttl = record_ttl;
328 self
329 }
330
331 /// Sets whether or not records should be filtered before being stored.
332 ///
333 /// See [`StoreInserts`] for the different values.
334 /// Defaults to [`StoreInserts::Unfiltered`].
335 pub fn set_record_filtering(&mut self, filtering: StoreInserts) -> &mut Self {
336 self.record_filtering = filtering;
337 self
338 }
339
340 /// Sets the (re-)replication interval for stored records.
341 ///
342 /// Periodic replication of stored records ensures that the records
343 /// are always replicated to the available nodes closest to the key in the
344 /// context of DHT topology changes (i.e. nodes joining and leaving), thus
345 /// ensuring persistence until the record expires. Replication does not
346 /// prolong the regular lifetime of a record (for otherwise it would live
347 /// forever regardless of the configured TTL). The expiry of a record
348 /// is only extended through re-publication.
349 ///
350 /// This interval should be significantly shorter than the publication
351 /// interval, to ensure persistence between re-publications. The default
352 /// is 1 hour.
353 ///
354 /// `None` means that stored records are never re-replicated.
355 ///
356 /// Does not apply to provider records.
357 pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
358 self.record_replication_interval = interval;
359 self
360 }
361
362 /// Sets the (re-)publication interval of stored records.
363 ///
364 /// Records persist in the DHT until they expire. By default, published
365 /// records are re-published in regular intervals for as long as the record
366 /// exists in the local storage of the original publisher, thereby extending
367 /// the records lifetime.
368 ///
369 /// This interval should be significantly shorter than the record TTL, to
370 /// ensure records do not expire prematurely. The default is 24 hours.
371 ///
372 /// `None` means that stored records are never automatically re-published.
373 ///
374 /// Does not apply to provider records.
375 pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
376 self.record_publication_interval = interval;
377 self
378 }
379
380 /// Sets the TTL for provider records.
381 ///
382 /// `None` means that stored provider records never expire.
383 ///
384 /// Must be significantly larger than the provider publication interval.
385 pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
386 self.provider_record_ttl = ttl;
387 self
388 }
389
390 /// Sets the interval at which provider records for keys provided
391 /// by the local node are re-published.
392 ///
393 /// `None` means that stored provider records are never automatically
394 /// re-published.
395 ///
396 /// Must be significantly less than the provider record TTL.
397 pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
398 self.provider_publication_interval = interval;
399 self
400 }
401
402 /// Modifies the maximum allowed size of individual Kademlia packets.
403 ///
404 /// It might be necessary to increase this value if trying to put large
405 /// records.
406 pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
407 self.protocol_config.set_max_packet_size(size);
408 self
409 }
410
411 /// Sets the k-bucket insertion strategy for the Kademlia routing table.
412 pub fn set_kbucket_inserts(&mut self, inserts: BucketInserts) -> &mut Self {
413 self.kbucket_inserts = inserts;
414 self
415 }
416
417 /// Sets the [`Caching`] strategy to use for successful lookups.
418 ///
419 /// The default is [`Caching::Enabled`] with a `max_peers` of 1.
420 /// Hence, with default settings and a lookup quorum of 1, a successful lookup
421 /// will result in the record being cached at the closest node to the key that
422 /// did not return the record, i.e. the standard Kademlia behaviour.
423 pub fn set_caching(&mut self, c: Caching) -> &mut Self {
424 self.caching = c;
425 self
426 }
427
428 /// Sets the interval on which [`Behaviour::bootstrap`] is called periodically.
429 ///
430 /// * Default to `5` minutes.
431 /// * Set to `None` to disable periodic bootstrap.
432 pub fn set_periodic_bootstrap_interval(&mut self, interval: Option<Duration>) -> &mut Self {
433 self.periodic_bootstrap_interval = interval;
434 self
435 }
436
437 /// Sets the configuration for the k-buckets.
438 ///
439 /// * Default to K_VALUE.
440 pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
441 self.kbucket_config.set_bucket_size(size);
442 self
443 }
444
445 /// Sets the timeout duration after creation of a pending entry after which
446 /// it becomes eligible for insertion into a full bucket, replacing the
447 /// least-recently (dis)connected node.
448 ///
449 /// * Default to `60` s.
450 pub fn set_kbucket_pending_timeout(&mut self, timeout: Duration) -> &mut Self {
451 self.kbucket_config.set_pending_timeout(timeout);
452 self
453 }
454
455 /// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted
456 /// in the routing table. This prevent cascading bootstrap requests when multiple peers are
457 /// inserted into the routing table "at the same time". This also allows to wait a little
458 /// bit for other potential peers to be inserted into the routing table before triggering a
459 /// bootstrap, giving more context to the future bootstrap request.
460 ///
461 /// * Default to `500` ms.
462 /// * Set to `Some(Duration::ZERO)` to never wait before triggering a bootstrap request when a
463 /// new peer is inserted in the routing table.
464 /// * Set to `None` to disable automatic bootstrap (no bootstrap request will be triggered when
465 /// a new peer is inserted in the routing table).
466 #[cfg(test)]
467 pub(crate) fn set_automatic_bootstrap_throttle(
468 &mut self,
469 duration: Option<Duration>,
470 ) -> &mut Self {
471 self.automatic_bootstrap_throttle = duration;
472 self
473 }
474}
475
476impl<TStore> Behaviour<TStore>
477where
478 TStore: RecordStore + Send + 'static,
479{
480 /// Creates a new `Kademlia` network behaviour with a default configuration.
481 pub fn new(id: PeerId, store: TStore) -> Self {
482 Self::with_config(id, store, Default::default())
483 }
484
485 /// Get the protocol name of this kademlia instance.
486 pub fn protocol_names(&self) -> &[StreamProtocol] {
487 self.protocol_config.protocol_names()
488 }
489
490 /// Creates a new `Kademlia` network behaviour with the given configuration.
491 pub fn with_config(id: PeerId, store: TStore, config: Config) -> Self {
492 let local_key = kbucket::Key::from(id);
493
494 let put_record_job = config
495 .record_replication_interval
496 .or(config.record_publication_interval)
497 .map(|interval| {
498 PutRecordJob::new(
499 id,
500 interval,
501 config.record_publication_interval,
502 config.record_ttl,
503 )
504 });
505
506 let add_provider_job = config
507 .provider_publication_interval
508 .map(AddProviderJob::new);
509
510 Behaviour {
511 store,
512 caching: config.caching,
513 kbuckets: KBucketsTable::new(local_key, config.kbucket_config),
514 kbucket_inserts: config.kbucket_inserts,
515 protocol_config: config.protocol_config,
516 record_filtering: config.record_filtering,
517 queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
518 listen_addresses: Default::default(),
519 queries: QueryPool::new(config.query_config),
520 connected_peers: Default::default(),
521 add_provider_job,
522 put_record_job,
523 record_ttl: config.record_ttl,
524 provider_record_ttl: config.provider_record_ttl,
525 external_addresses: Default::default(),
526 local_peer_id: id,
527 connections: Default::default(),
528 mode: Mode::Client,
529 auto_mode: true,
530 no_events_waker: None,
531 bootstrap_status: bootstrap::Status::new(
532 config.periodic_bootstrap_interval,
533 config.automatic_bootstrap_throttle,
534 ),
535 }
536 }
537
538 /// Gets an iterator over immutable references to all running queries.
539 pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
540 self.queries.iter().filter_map(|query| {
541 if !query.is_finished() {
542 Some(QueryRef { query })
543 } else {
544 None
545 }
546 })
547 }
548
549 /// Gets an iterator over mutable references to all running queries.
550 pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
551 self.queries.iter_mut().filter_map(|query| {
552 if !query.is_finished() {
553 Some(QueryMut { query })
554 } else {
555 None
556 }
557 })
558 }
559
560 /// Gets an immutable reference to a running query, if it exists.
561 pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
562 self.queries.get(id).and_then(|query| {
563 if !query.is_finished() {
564 Some(QueryRef { query })
565 } else {
566 None
567 }
568 })
569 }
570
571 /// Gets a mutable reference to a running query, if it exists.
572 pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
573 self.queries.get_mut(id).and_then(|query| {
574 if !query.is_finished() {
575 Some(QueryMut { query })
576 } else {
577 None
578 }
579 })
580 }
581
582 /// Adds a known listen address of a peer participating in the DHT to the
583 /// routing table.
584 ///
585 /// Explicitly adding addresses of peers serves two purposes:
586 ///
587 /// 1. In order for a node to join the DHT, it must know about at least one other node of the
588 /// DHT.
589 ///
590 /// 2. When a remote peer initiates a connection and that peer is not yet in the routing
591 /// table, the `Kademlia` behaviour must be informed of an address on which that peer is
592 /// listening for connections before it can be added to the routing table from where it can
593 /// subsequently be discovered by all peers in the DHT.
594 ///
595 /// If the routing table has been updated as a result of this operation,
596 /// a [`Event::RoutingUpdated`] event is emitted.
597 pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> RoutingUpdate {
598 // ensuring address is a fully-qualified /p2p multiaddr
599 let Ok(address) = address.with_p2p(*peer) else {
600 return RoutingUpdate::Failed;
601 };
602 let key = kbucket::Key::from(*peer);
603 match self.kbuckets.entry(&key) {
604 Some(kbucket::Entry::Present(mut entry, _)) => {
605 if entry.value().insert(address) {
606 self.queued_events
607 .push_back(ToSwarm::GenerateEvent(Event::RoutingUpdated {
608 peer: *peer,
609 is_new_peer: false,
610 addresses: entry.value().clone(),
611 old_peer: None,
612 bucket_range: self
613 .kbuckets
614 .bucket(&key)
615 .map(|b| b.range())
616 .expect("Not kbucket::Entry::SelfEntry."),
617 }))
618 }
619 RoutingUpdate::Success
620 }
621 Some(kbucket::Entry::Pending(mut entry, _)) => {
622 entry.value().insert(address);
623 RoutingUpdate::Pending
624 }
625 Some(kbucket::Entry::Absent(entry)) => {
626 let addresses = Addresses::new(address);
627 let status = if self.connected_peers.contains(peer) {
628 NodeStatus::Connected
629 } else {
630 NodeStatus::Disconnected
631 };
632 match entry.insert(addresses.clone(), status) {
633 kbucket::InsertResult::Inserted => {
634 self.bootstrap_on_low_peers();
635
636 self.queued_events.push_back(ToSwarm::GenerateEvent(
637 Event::RoutingUpdated {
638 peer: *peer,
639 is_new_peer: true,
640 addresses,
641 old_peer: None,
642 bucket_range: self
643 .kbuckets
644 .bucket(&key)
645 .map(|b| b.range())
646 .expect("Not kbucket::Entry::SelfEntry."),
647 },
648 ));
649 RoutingUpdate::Success
650 }
651 kbucket::InsertResult::Full => {
652 tracing::debug!(%peer, "Bucket full. Peer not added to routing table");
653 RoutingUpdate::Failed
654 }
655 kbucket::InsertResult::Pending { disconnected } => {
656 self.queued_events.push_back(ToSwarm::Dial {
657 opts: DialOpts::peer_id(disconnected.into_preimage()).build(),
658 });
659 RoutingUpdate::Pending
660 }
661 }
662 }
663 None => RoutingUpdate::Failed,
664 }
665 }
666
667 /// Removes an address of a peer from the routing table.
668 ///
669 /// If the given address is the last address of the peer in the
670 /// routing table, the peer is removed from the routing table
671 /// and `Some` is returned with a view of the removed entry.
672 /// The same applies if the peer is currently pending insertion
673 /// into the routing table.
674 ///
675 /// If the given peer or address is not in the routing table,
676 /// this is a no-op.
677 pub fn remove_address(
678 &mut self,
679 peer: &PeerId,
680 address: &Multiaddr,
681 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
682 let address = &address.to_owned().with_p2p(*peer).ok()?;
683 let key = kbucket::Key::from(*peer);
684 match self.kbuckets.entry(&key)? {
685 kbucket::Entry::Present(mut entry, _) => {
686 if entry.value().remove(address).is_err() {
687 Some(entry.remove()) // it is the last address, thus remove the peer.
688 } else {
689 None
690 }
691 }
692 kbucket::Entry::Pending(mut entry, _) => {
693 if entry.value().remove(address).is_err() {
694 Some(entry.remove()) // it is the last address, thus remove the peer.
695 } else {
696 None
697 }
698 }
699 kbucket::Entry::Absent(..) => None,
700 }
701 }
702
703 /// Removes a peer from the routing table.
704 ///
705 /// Returns `None` if the peer was not in the routing table,
706 /// not even pending insertion.
707 pub fn remove_peer(
708 &mut self,
709 peer: &PeerId,
710 ) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> {
711 let key = kbucket::Key::from(*peer);
712 match self.kbuckets.entry(&key)? {
713 kbucket::Entry::Present(entry, _) => Some(entry.remove()),
714 kbucket::Entry::Pending(entry, _) => Some(entry.remove()),
715 kbucket::Entry::Absent(..) => None,
716 }
717 }
718
719 /// Returns an iterator over all non-empty buckets in the routing table.
720 pub fn kbuckets(
721 &mut self,
722 ) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>> {
723 self.kbuckets.iter().filter(|b| !b.is_empty())
724 }
725
726 /// Returns the k-bucket for the distance to the given key.
727 ///
728 /// Returns `None` if the given key refers to the local key.
729 pub fn kbucket<K>(
730 &mut self,
731 key: K,
732 ) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, Addresses>>
733 where
734 K: Into<kbucket::Key<K>> + Clone,
735 {
736 self.kbuckets.bucket(&key.into())
737 }
738
739 /// Initiates an iterative query for the closest peers to the given key.
740 ///
741 /// The result of the query is delivered in a
742 /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
743 pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
744 where
745 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
746 {
747 self.get_closest_peers_inner(key, None)
748 }
749
750 /// Initiates an iterative query for the closest peers to the given key.
751 /// The expected responding peers is specified by `num_results`
752 /// Note that the result is capped after exceeds K_VALUE
753 ///
754 /// The result of the query is delivered in a
755 /// [`Event::OutboundQueryProgressed{QueryResult::GetClosestPeers}`].
756 pub fn get_n_closest_peers<K>(&mut self, key: K, num_results: NonZeroUsize) -> QueryId
757 where
758 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
759 {
760 // The inner code never expect higher than K_VALUE results to be returned.
761 // And removing such cap will be tricky,
762 // since it would involve forging a new key and additional requests.
763 // Hence bound to K_VALUE here to set clear expectation and prevent unexpected behaviour.
764 let capped_num_results = std::cmp::min(num_results, K_VALUE);
765 self.get_closest_peers_inner(key, Some(capped_num_results))
766 }
767
768 fn get_closest_peers_inner<K>(&mut self, key: K, num_results: Option<NonZeroUsize>) -> QueryId
769 where
770 K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
771 {
772 let target: kbucket::Key<K> = key.clone().into();
773 let key: Vec<u8> = key.into();
774 let info = QueryInfo::GetClosestPeers {
775 key,
776 step: ProgressStep::first(),
777 num_results,
778 };
779 let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
780 self.queries.add_iter_closest(target, peer_keys, info)
781 }
782
783 /// Returns all peers ordered by distance to the given key; takes peers from local routing table
784 /// only.
785 pub fn get_closest_local_peers<'a, K: Clone>(
786 &'a mut self,
787 key: &'a kbucket::Key<K>,
788 ) -> impl Iterator<Item = kbucket::Key<PeerId>> + 'a {
789 self.kbuckets.closest_keys(key)
790 }
791
792 /// Finds the closest peers to a `key` in the context of a request by the `source` peer, such
793 /// that the `source` peer is never included in the result.
794 ///
795 /// Takes peers from local routing table only. Only returns number of peers equal to configured
796 /// replication factor.
797 pub fn find_closest_local_peers<'a, K: Clone>(
798 &'a mut self,
799 key: &'a kbucket::Key<K>,
800 source: &'a PeerId,
801 ) -> impl Iterator<Item = KadPeer> + 'a {
802 self.kbuckets
803 .closest(key)
804 .filter(move |e| e.node.key.preimage() != source)
805 .take(self.queries.config().replication_factor.get())
806 .map(KadPeer::from)
807 }
808
809 /// Performs a lookup for a record in the DHT.
810 ///
811 /// The result of this operation is delivered in a
812 /// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
813 pub fn get_record(&mut self, key: record::Key) -> QueryId {
814 let record = if let Some(record) = self.store.get(&key) {
815 if record.is_expired(Instant::now()) {
816 self.store.remove(&key);
817 None
818 } else {
819 Some(PeerRecord {
820 peer: None,
821 record: record.into_owned(),
822 })
823 }
824 } else {
825 None
826 };
827
828 let step = ProgressStep::first();
829
830 let target = kbucket::Key::new(key.clone());
831 let info = if record.is_some() {
832 QueryInfo::GetRecord {
833 key,
834 step: step.next(),
835 found_a_record: true,
836 cache_candidates: BTreeMap::new(),
837 }
838 } else {
839 QueryInfo::GetRecord {
840 key,
841 step: step.clone(),
842 found_a_record: false,
843 cache_candidates: BTreeMap::new(),
844 }
845 };
846 let peers = self.kbuckets.closest_keys(&target);
847 let id = self.queries.add_iter_closest(target.clone(), peers, info);
848
849 // No queries were actually done for the results yet.
850 let stats = QueryStats::empty();
851
852 if let Some(record) = record {
853 self.queued_events
854 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
855 id,
856 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
857 step,
858 stats,
859 }));
860 }
861
862 id
863 }
864
865 /// Stores a record in the DHT, locally as well as at the nodes
866 /// closest to the key as per the xor distance metric.
867 ///
868 /// Returns `Ok` if a record has been stored locally, providing the
869 /// `QueryId` of the initial query that replicates the record in the DHT.
870 /// The result of the query is eventually reported as a
871 /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`].
872 ///
873 /// The record is always stored locally with the given expiration. If the record's
874 /// expiration is `None`, the common case, it does not expire in local storage
875 /// but is still replicated with the configured record TTL. To remove the record
876 /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`].
877 ///
878 /// After the initial publication of the record, it is subject to (re-)replication
879 /// and (re-)publication as per the configured intervals. Periodic (re-)publication
880 /// does not update the record's expiration in local storage, thus a given record
881 /// with an explicit expiration will always expire at that instant and until then
882 /// is subject to regular (re-)replication and (re-)publication.
883 pub fn put_record(
884 &mut self,
885 mut record: Record,
886 quorum: Quorum,
887 ) -> Result<QueryId, store::Error> {
888 record.publisher = Some(*self.kbuckets.local_key().preimage());
889 self.store.put(record.clone())?;
890 record.expires = record
891 .expires
892 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
893 let quorum = quorum.eval(self.queries.config().replication_factor);
894 let target = kbucket::Key::new(record.key.clone());
895 let peers = self.kbuckets.closest_keys(&target);
896 let context = PutRecordContext::Publish;
897 let info = QueryInfo::PutRecord {
898 context,
899 record,
900 quorum,
901 phase: PutRecordPhase::GetClosestPeers,
902 };
903 Ok(self.queries.add_iter_closest(target.clone(), peers, info))
904 }
905
906 /// Stores a record at specific peers, without storing it locally.
907 ///
908 /// The given [`Quorum`] is understood in the context of the total
909 /// number of distinct peers given.
910 ///
911 /// If the record's expiration is `None`, the configured record TTL is used.
912 ///
913 /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
914 /// > used to selectively update or store a record to specific peers
915 /// > for the purpose of e.g. making sure these peers have the latest
916 /// > "version" of a record or to "cache" a record at further peers
917 /// > to increase the lookup success rate on the DHT for other peers.
918 /// >
919 /// > In particular, there is no automatic storing of records performed, and this
920 /// > method must be used to ensure the standard Kademlia
921 /// > procedure of "caching" (i.e. storing) a found record at the closest
922 /// > node to the key that _did not_ return it.
923 pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
924 where
925 I: ExactSizeIterator<Item = PeerId>,
926 {
927 let quorum = if peers.len() > 0 {
928 quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
929 } else {
930 // If no peers are given, we just let the query fail immediately
931 // due to the fact that the quorum must be at least one, instead of
932 // introducing a new kind of error.
933 NonZeroUsize::new(1).expect("1 > 0")
934 };
935 record.expires = record
936 .expires
937 .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
938 let context = PutRecordContext::Custom;
939 let info = QueryInfo::PutRecord {
940 context,
941 record,
942 quorum,
943 phase: PutRecordPhase::PutRecord {
944 success: Vec::new(),
945 get_closest_peers_stats: QueryStats::empty(),
946 },
947 };
948 self.queries.add_fixed(peers, info)
949 }
950
951 /// Removes the record with the given key from _local_ storage,
952 /// if the local node is the publisher of the record.
953 ///
954 /// Has no effect if a record for the given key is stored locally but
955 /// the local node is not a publisher of the record.
956 ///
957 /// This is a _local_ operation. However, it also has the effect that
958 /// the record will no longer be periodically re-published, allowing the
959 /// record to eventually expire throughout the DHT.
960 pub fn remove_record(&mut self, key: &record::Key) {
961 if let Some(r) = self.store.get(key) {
962 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
963 self.store.remove(key)
964 }
965 }
966 }
967
968 /// Gets a mutable reference to the record store.
969 pub fn store_mut(&mut self) -> &mut TStore {
970 &mut self.store
971 }
972
973 /// Bootstraps the local node to join the DHT.
974 ///
975 /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
976 /// own ID in the DHT. This introduces the local node to the other nodes
977 /// in the DHT and populates its routing table with the closest neighbours.
978 ///
979 /// Subsequently, all buckets farther from the bucket of the closest neighbour are
980 /// refreshed by initiating an additional bootstrapping query for each such
981 /// bucket with random keys.
982 ///
983 /// Returns `Ok` if bootstrapping has been initiated with a self-lookup, providing the
984 /// `QueryId` for the entire bootstrapping process. The progress of bootstrapping is
985 /// reported via [`Event::OutboundQueryProgressed{QueryResult::Bootstrap}`] events,
986 /// with one such event per bootstrapping query.
987 ///
988 /// Returns `Err` if bootstrapping is impossible due an empty routing table.
989 ///
990 /// > **Note**: Bootstrapping requires at least one node of the DHT to be known.
991 /// > See [`Behaviour::add_address`].
992 ///
993 /// > **Note**: Bootstrap does not require to be called manually. It is periodically
994 /// > invoked at regular intervals based on the configured `periodic_bootstrap_interval` (see
995 /// > [`Config::set_periodic_bootstrap_interval`] for details) and it is also automatically
996 /// > invoked
997 /// > when a new peer is inserted in the routing table.
998 /// > This parameter is used to call [`Behaviour::bootstrap`] periodically and automatically
999 /// > to ensure a healthy routing table.
1000 pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
1001 let local_key = *self.kbuckets.local_key();
1002 let info = QueryInfo::Bootstrap {
1003 peer: *local_key.preimage(),
1004 remaining: None,
1005 step: ProgressStep::first(),
1006 };
1007 let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
1008 if peers.is_empty() {
1009 self.bootstrap_status.reset_timers();
1010 Err(NoKnownPeers())
1011 } else {
1012 self.bootstrap_status.on_started();
1013 Ok(self.queries.add_iter_closest(local_key, peers, info))
1014 }
1015 }
1016
1017 /// Establishes the local node as a provider of a value for the given key.
1018 ///
1019 /// This operation publishes a provider record with the given key and
1020 /// identity of the local node to the peers closest to the key, thus establishing
1021 /// the local node as a provider.
1022 ///
1023 /// Returns `Ok` if a provider record has been stored locally, providing the
1024 /// `QueryId` of the initial query that announces the local node as a provider.
1025 ///
1026 /// The publication of the provider records is periodically repeated as per the
1027 /// configured interval, to renew the expiry and account for changes to the DHT
1028 /// topology. A provider record may be removed from local storage and
1029 /// thus no longer re-published by calling [`Behaviour::stop_providing`].
1030 ///
1031 /// In contrast to the standard Kademlia push-based model for content distribution
1032 /// implemented by [`Behaviour::put_record`], the provider API implements a
1033 /// pull-based model that may be used in addition or as an alternative.
1034 /// The means by which the actual value is obtained from a provider is out of scope
1035 /// of the libp2p Kademlia provider API.
1036 ///
1037 /// The results of the (repeated) provider announcements sent by this node are
1038 /// reported via [`Event::OutboundQueryProgressed{QueryResult::StartProviding}`].
1039 pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
1040 // Note: We store our own provider records locally without local addresses
1041 // to avoid redundant storage and outdated addresses. Instead these are
1042 // acquired on demand when returning a `ProviderRecord` for the local node.
1043 let local_addrs = Vec::new();
1044 let record = ProviderRecord::new(
1045 key.clone(),
1046 *self.kbuckets.local_key().preimage(),
1047 local_addrs,
1048 );
1049 self.store.add_provider(record)?;
1050 let target = kbucket::Key::new(key.clone());
1051 let peers = self.kbuckets.closest_keys(&target);
1052 let context = AddProviderContext::Publish;
1053 let info = QueryInfo::AddProvider {
1054 context,
1055 key,
1056 phase: AddProviderPhase::GetClosestPeers,
1057 };
1058 let id = self.queries.add_iter_closest(target.clone(), peers, info);
1059 Ok(id)
1060 }
1061
1062 /// Stops the local node from announcing that it is a provider for the given key.
1063 ///
1064 /// This is a local operation. The local node will still be considered as a
1065 /// provider for the key by other nodes until these provider records expire.
1066 pub fn stop_providing(&mut self, key: &record::Key) {
1067 self.store
1068 .remove_provider(key, self.kbuckets.local_key().preimage());
1069 }
1070
1071 /// Performs a lookup for providers of a value to the given key.
1072 ///
1073 /// The result of this operation is delivered in a
1074 /// reported via [`Event::OutboundQueryProgressed{QueryResult::GetProviders}`].
1075 pub fn get_providers(&mut self, key: record::Key) -> QueryId {
1076 let providers: HashSet<_> = self
1077 .store
1078 .providers(&key)
1079 .into_iter()
1080 .filter(|p| !p.is_expired(Instant::now()))
1081 .map(|p| p.provider)
1082 .collect();
1083
1084 let step = ProgressStep::first();
1085
1086 let info = QueryInfo::GetProviders {
1087 key: key.clone(),
1088 providers_found: providers.len(),
1089 step: if providers.is_empty() {
1090 step.clone()
1091 } else {
1092 step.next()
1093 },
1094 };
1095
1096 let target = kbucket::Key::new(key.clone());
1097 let peers = self.kbuckets.closest_keys(&target);
1098 let id = self.queries.add_iter_closest(target.clone(), peers, info);
1099
1100 // No queries were actually done for the results yet.
1101 let stats = QueryStats::empty();
1102
1103 if !providers.is_empty() {
1104 self.queued_events
1105 .push_back(ToSwarm::GenerateEvent(Event::OutboundQueryProgressed {
1106 id,
1107 result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
1108 key,
1109 providers,
1110 })),
1111 step,
1112 stats,
1113 }));
1114 }
1115 id
1116 }
1117
1118 /// Set the [`Mode`] in which we should operate.
1119 ///
1120 /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we
1121 /// have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
1122 ///
1123 /// Setting a mode via this function disables this automatic behaviour and unconditionally
1124 /// operates in the specified mode. To reactivate the automatic configuration, pass [`None`]
1125 /// instead.
1126 pub fn set_mode(&mut self, mode: Option<Mode>) {
1127 match mode {
1128 Some(mode) => {
1129 self.mode = mode;
1130 self.auto_mode = false;
1131 self.reconfigure_mode();
1132 }
1133 None => {
1134 self.auto_mode = true;
1135 self.determine_mode_from_external_addresses();
1136 }
1137 }
1138
1139 if let Some(waker) = self.no_events_waker.take() {
1140 waker.wake();
1141 }
1142 }
1143
1144 /// Get the [`Mode`] in which the DHT is currently operating.
1145 pub fn mode(&self) -> Mode {
1146 self.mode
1147 }
1148
1149 fn reconfigure_mode(&mut self) {
1150 if self.connections.is_empty() {
1151 return;
1152 }
1153
1154 let num_connections = self.connections.len();
1155
1156 tracing::debug!(
1157 "Re-configuring {} established connection{}",
1158 num_connections,
1159 if num_connections > 1 { "s" } else { "" }
1160 );
1161
1162 self.queued_events
1163 .extend(
1164 self.connections
1165 .iter()
1166 .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
1167 peer_id: *peer_id,
1168 handler: NotifyHandler::One(*conn_id),
1169 event: HandlerIn::ReconfigureMode {
1170 new_mode: self.mode,
1171 },
1172 }),
1173 );
1174 }
1175
1176 fn determine_mode_from_external_addresses(&mut self) {
1177 let old_mode = self.mode;
1178
1179 self.mode = match (self.external_addresses.as_slice(), self.mode) {
1180 ([], Mode::Server) => {
1181 tracing::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
1182
1183 Mode::Client
1184 }
1185 ([], Mode::Client) => {
1186 // Previously client-mode, now also client-mode because no external addresses.
1187
1188 Mode::Client
1189 }
1190 (confirmed_external_addresses, Mode::Client) => {
1191 if tracing::enabled!(Level::DEBUG) {
1192 let confirmed_external_addresses =
1193 to_comma_separated_list(confirmed_external_addresses);
1194
1195 tracing::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
1196 }
1197
1198 Mode::Server
1199 }
1200 (confirmed_external_addresses, Mode::Server) => {
1201 debug_assert!(
1202 !confirmed_external_addresses.is_empty(),
1203 "Previous match arm handled empty list"
1204 );
1205
1206 // Previously, server-mode, now also server-mode because > 1 external address.
1207 // Don't log anything to avoid spam.
1208 Mode::Server
1209 }
1210 };
1211
1212 self.reconfigure_mode();
1213
1214 if old_mode != self.mode {
1215 self.queued_events
1216 .push_back(ToSwarm::GenerateEvent(Event::ModeChanged {
1217 new_mode: self.mode,
1218 }));
1219 }
1220 }
1221
1222 /// Processes discovered peers from a successful request in an iterative `Query`.
1223 fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
1224 where
1225 I: Iterator<Item = &'a KadPeer> + Clone,
1226 {
1227 let local_id = self.kbuckets.local_key().preimage();
1228 let others_iter = peers.filter(|p| &p.node_id != local_id);
1229 if let Some(query) = self.queries.get_mut(query_id) {
1230 tracing::trace!(peer=%source, query=?query_id, "Request to peer in query succeeded");
1231 for peer in others_iter.clone() {
1232 tracing::trace!(
1233 ?peer,
1234 %source,
1235 query=?query_id,
1236 "Peer reported by source in query"
1237 );
1238 let addrs = peer.multiaddrs.iter().cloned().collect();
1239 query.peers.addresses.insert(peer.node_id, addrs);
1240 }
1241 query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
1242 }
1243 }
1244
1245 /// Collects all peers who are known to be providers of the value for a given `Multihash`.
1246 fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
1247 let kbuckets = &mut self.kbuckets;
1248 let connected = &mut self.connected_peers;
1249 let listen_addresses = &self.listen_addresses;
1250 let external_addresses = &self.external_addresses;
1251
1252 self.store
1253 .providers(key)
1254 .into_iter()
1255 .filter_map(move |p| {
1256 if &p.provider != source {
1257 let node_id = p.provider;
1258 let multiaddrs = p.addresses;
1259 let connection_ty = if connected.contains(&node_id) {
1260 ConnectionType::Connected
1261 } else {
1262 ConnectionType::NotConnected
1263 };
1264 if multiaddrs.is_empty() {
1265 // The provider is either the local node and we fill in
1266 // the local addresses on demand, or it is a legacy
1267 // provider record without addresses, in which case we
1268 // try to find addresses in the routing table, as was
1269 // done before provider records were stored along with
1270 // their addresses.
1271 if &node_id == kbuckets.local_key().preimage() {
1272 Some(
1273 listen_addresses
1274 .iter()
1275 .chain(external_addresses.iter())
1276 .cloned()
1277 .collect::<Vec<_>>(),
1278 )
1279 } else {
1280 let key = kbucket::Key::from(node_id);
1281 kbuckets
1282 .entry(&key)
1283 .as_mut()
1284 .and_then(|e| e.view())
1285 .map(|e| e.node.value.clone().into_vec())
1286 }
1287 } else {
1288 Some(multiaddrs)
1289 }
1290 .map(|multiaddrs| KadPeer {
1291 node_id,
1292 multiaddrs,
1293 connection_ty,
1294 })
1295 } else {
1296 None
1297 }
1298 })
1299 .take(self.queries.config().replication_factor.get())
1300 .collect()
1301 }
1302
1303 /// Starts an iterative `ADD_PROVIDER` query for the given key.
1304 fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
1305 let info = QueryInfo::AddProvider {
1306 context,
1307 key: key.clone(),
1308 phase: AddProviderPhase::GetClosestPeers,
1309 };
1310 let target = kbucket::Key::new(key);
1311 let peers = self.kbuckets.closest_keys(&target);
1312 self.queries.add_iter_closest(target.clone(), peers, info);
1313 }
1314
1315 /// Starts an iterative `PUT_VALUE` query for the given record.
1316 fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
1317 let quorum = quorum.eval(self.queries.config().replication_factor);
1318 let target = kbucket::Key::new(record.key.clone());
1319 let peers = self.kbuckets.closest_keys(&target);
1320 let info = QueryInfo::PutRecord {
1321 record,
1322 quorum,
1323 context,
1324 phase: PutRecordPhase::GetClosestPeers,
1325 };
1326 self.queries.add_iter_closest(target.clone(), peers, info);
1327 }
1328
1329 /// Updates the routing table with a new connection status and address of a peer.
1330 fn connection_updated(
1331 &mut self,
1332 peer: PeerId,
1333 address: Option<Multiaddr>,
1334 new_status: NodeStatus,
1335 ) {
1336 let key = kbucket::Key::from(peer);
1337 match self.kbuckets.entry(&key) {
1338 Some(kbucket::Entry::Present(mut entry, old_status)) => {
1339 if old_status != new_status {
1340 entry.update(new_status)
1341 }
1342 if let Some(address) = address {
1343 if entry.value().insert(address) {
1344 self.queued_events.push_back(ToSwarm::GenerateEvent(
1345 Event::RoutingUpdated {
1346 peer,
1347 is_new_peer: false,
1348 addresses: entry.value().clone(),
1349 old_peer: None,
1350 bucket_range: self
1351 .kbuckets
1352 .bucket(&key)
1353 .map(|b| b.range())
1354 .expect("Not kbucket::Entry::SelfEntry."),
1355 },
1356 ))
1357 }
1358 }
1359 }
1360
1361 Some(kbucket::Entry::Pending(mut entry, old_status)) => {
1362 if let Some(address) = address {
1363 entry.value().insert(address);
1364 }
1365 if old_status != new_status {
1366 entry.update(new_status);
1367 }
1368 }
1369
1370 Some(kbucket::Entry::Absent(entry)) => {
1371 // Only connected nodes with a known address are newly inserted.
1372 if new_status != NodeStatus::Connected {
1373 return;
1374 }
1375 match (address, self.kbucket_inserts) {
1376 (None, _) => {
1377 self.queued_events
1378 .push_back(ToSwarm::GenerateEvent(Event::UnroutablePeer { peer }));
1379 }
1380 (Some(a), BucketInserts::Manual) => {
1381 self.queued_events
1382 .push_back(ToSwarm::GenerateEvent(Event::RoutablePeer {
1383 peer,
1384 address: a,
1385 }));
1386 }
1387 (Some(a), BucketInserts::OnConnected) => {
1388 let addresses = Addresses::new(a);
1389 match entry.insert(addresses.clone(), new_status) {
1390 kbucket::InsertResult::Inserted => {
1391 self.bootstrap_on_low_peers();
1392
1393 let event = Event::RoutingUpdated {
1394 peer,
1395 is_new_peer: true,
1396 addresses,
1397 old_peer: None,
1398 bucket_range: self
1399 .kbuckets
1400 .bucket(&key)
1401 .map(|b| b.range())
1402 .expect("Not kbucket::Entry::SelfEntry."),
1403 };
1404 self.queued_events.push_back(ToSwarm::GenerateEvent(event));
1405 }
1406 kbucket::InsertResult::Full => {
1407 tracing::debug!(
1408 %peer,
1409 "Bucket full. Peer not added to routing table"
1410 );
1411 let address = addresses.first().clone();
1412 self.queued_events.push_back(ToSwarm::GenerateEvent(
1413 Event::RoutablePeer { peer, address },
1414 ));
1415 }
1416 kbucket::InsertResult::Pending { disconnected } => {
1417 let address = addresses.first().clone();
1418 self.queued_events.push_back(ToSwarm::GenerateEvent(
1419 Event::PendingRoutablePeer { peer, address },
1420 ));
1421
1422 // `disconnected` might already be in the process of re-connecting.
1423 // In other words `disconnected` might have already re-connected but
1424 // is not yet confirmed to support the Kademlia protocol via
1425 // [`HandlerEvent::ProtocolConfirmed`].
1426 //
1427 // Only try dialing peer if not currently connected.
1428 if !self.connected_peers.contains(disconnected.preimage()) {
1429 self.queued_events.push_back(ToSwarm::Dial {
1430 opts: DialOpts::peer_id(disconnected.into_preimage())
1431 .build(),
1432 })
1433 }
1434 }
1435 }
1436 }
1437 }
1438 }
1439 _ => {}
1440 }
1441 }
1442
1443 /// A new peer has been inserted in the routing table but we check if the routing
1444 /// table is currently small (less that `K_VALUE` peers are present) and only
1445 /// trigger a bootstrap in that case
1446 fn bootstrap_on_low_peers(&mut self) {
1447 if self
1448 .kbuckets()
1449 .map(|kbucket| kbucket.num_entries())
1450 .sum::<usize>()
1451 < K_VALUE.get()
1452 {
1453 self.bootstrap_status.trigger();
1454 }
1455 }
1456
1457 /// Handles a finished (i.e. successful) query.
1458 fn query_finished(&mut self, q: Query) -> Option<Event> {
1459 let query_id = q.id();
1460 tracing::trace!(query=?query_id, "Query finished");
1461 match q.info {
1462 QueryInfo::Bootstrap {
1463 peer,
1464 remaining,
1465 mut step,
1466 } => {
1467 let local_key = *self.kbuckets.local_key();
1468 let mut remaining = remaining.unwrap_or_else(|| {
1469 debug_assert_eq!(&peer, local_key.preimage());
1470 // The lookup for the local key finished. To complete the bootstrap process,
1471 // a bucket refresh should be performed for every bucket farther away than
1472 // the first non-empty bucket (which are most likely no more than the last
1473 // few, i.e. farthest, buckets).
1474 self.kbuckets
1475 .iter()
1476 .skip_while(|b| b.is_empty())
1477 .skip(1) // Skip the bucket with the closest neighbour.
1478 .map(|b| {
1479 // Try to find a key that falls into the bucket. While such keys can
1480 // be generated fully deterministically, the current libp2p kademlia
1481 // wire protocol requires transmission of the preimages of the actual
1482 // keys in the DHT keyspace, hence for now this is just a "best effort"
1483 // to find a key that hashes into a specific bucket. The probabilities
1484 // of finding a key in the bucket `b` with as most 16 trials are as
1485 // follows:
1486 //
1487 // Pr(bucket-255) = 1 - (1/2)^16 ~= 1
1488 // Pr(bucket-254) = 1 - (3/4)^16 ~= 1
1489 // Pr(bucket-253) = 1 - (7/8)^16 ~= 0.88
1490 // Pr(bucket-252) = 1 - (15/16)^16 ~= 0.64
1491 // ...
1492 let mut target = kbucket::Key::from(PeerId::random());
1493 for _ in 0..16 {
1494 let d = local_key.distance(&target);
1495 if b.contains(&d) {
1496 break;
1497 }
1498 target = kbucket::Key::from(PeerId::random());
1499 }
1500 target
1501 })
1502 .collect::<Vec<_>>()
1503 .into_iter()
1504 });
1505
1506 let num_remaining = remaining.len() as u32;
1507
1508 if let Some(target) = remaining.next() {
1509 let info = QueryInfo::Bootstrap {
1510 peer: *target.preimage(),
1511 remaining: Some(remaining),
1512 step: step.next(),
1513 };
1514 let peers = self.kbuckets.closest_keys(&target);
1515 self.queries
1516 .continue_iter_closest(query_id, target, peers, info);
1517 } else {
1518 step.last = true;
1519 self.bootstrap_status.on_finish();
1520 };
1521
1522 Some(Event::OutboundQueryProgressed {
1523 id: query_id,
1524 stats: q.stats,
1525 result: QueryResult::Bootstrap(Ok(BootstrapOk {
1526 peer,
1527 num_remaining,
1528 })),
1529 step,
1530 })
1531 }
1532
1533 QueryInfo::GetClosestPeers { key, mut step, .. } => {
1534 step.last = true;
1535
1536 Some(Event::OutboundQueryProgressed {
1537 id: query_id,
1538 stats: q.stats,
1539 result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
1540 key,
1541 peers: q.peers.into_peerinfos_iter().collect(),
1542 })),
1543 step,
1544 })
1545 }
1546
1547 QueryInfo::GetProviders { mut step, .. } => {
1548 step.last = true;
1549
1550 Some(Event::OutboundQueryProgressed {
1551 id: query_id,
1552 stats: q.stats,
1553 result: QueryResult::GetProviders(Ok(
1554 GetProvidersOk::FinishedWithNoAdditionalRecord {
1555 closest_peers: q.peers.into_peerids_iter().collect(),
1556 },
1557 )),
1558 step,
1559 })
1560 }
1561
1562 QueryInfo::AddProvider {
1563 context,
1564 key,
1565 phase: AddProviderPhase::GetClosestPeers,
1566 } => {
1567 let provider_id = self.local_peer_id;
1568 let external_addresses = self.external_addresses.iter().cloned().collect();
1569 let info = QueryInfo::AddProvider {
1570 context,
1571 key,
1572 phase: AddProviderPhase::AddProvider {
1573 provider_id,
1574 external_addresses,
1575 get_closest_peers_stats: q.stats,
1576 },
1577 };
1578 self.queries
1579 .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1580 None
1581 }
1582
1583 QueryInfo::AddProvider {
1584 context,
1585 key,
1586 phase:
1587 AddProviderPhase::AddProvider {
1588 get_closest_peers_stats,
1589 ..
1590 },
1591 } => match context {
1592 AddProviderContext::Publish => Some(Event::OutboundQueryProgressed {
1593 id: query_id,
1594 stats: get_closest_peers_stats.merge(q.stats),
1595 result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
1596 step: ProgressStep::first_and_last(),
1597 }),
1598 AddProviderContext::Republish => Some(Event::OutboundQueryProgressed {
1599 id: query_id,
1600 stats: get_closest_peers_stats.merge(q.stats),
1601 result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
1602 step: ProgressStep::first_and_last(),
1603 }),
1604 },
1605
1606 QueryInfo::GetRecord {
1607 key,
1608 mut step,
1609 found_a_record,
1610 cache_candidates,
1611 } => {
1612 step.last = true;
1613
1614 let results = if found_a_record {
1615 Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
1616 } else {
1617 Err(GetRecordError::NotFound {
1618 key,
1619 closest_peers: q.peers.into_peerids_iter().collect(),
1620 })
1621 };
1622 Some(Event::OutboundQueryProgressed {
1623 id: query_id,
1624 stats: q.stats,
1625 result: QueryResult::GetRecord(results),
1626 step,
1627 })
1628 }
1629
1630 QueryInfo::PutRecord {
1631 context,
1632 record,
1633 quorum,
1634 phase: PutRecordPhase::GetClosestPeers,
1635 } => {
1636 let info = QueryInfo::PutRecord {
1637 context,
1638 record,
1639 quorum,
1640 phase: PutRecordPhase::PutRecord {
1641 success: vec![],
1642 get_closest_peers_stats: q.stats,
1643 },
1644 };
1645 self.queries
1646 .continue_fixed(query_id, q.peers.into_peerids_iter(), info);
1647 None
1648 }
1649
1650 QueryInfo::PutRecord {
1651 context,
1652 record,
1653 quorum,
1654 phase:
1655 PutRecordPhase::PutRecord {
1656 success,
1657 get_closest_peers_stats,
1658 },
1659 } => {
1660 let mk_result = |key: record::Key| {
1661 if success.len() >= quorum.get() {
1662 Ok(PutRecordOk { key })
1663 } else {
1664 Err(PutRecordError::QuorumFailed {
1665 key,
1666 quorum,
1667 success,
1668 })
1669 }
1670 };
1671 match context {
1672 PutRecordContext::Publish | PutRecordContext::Custom => {
1673 Some(Event::OutboundQueryProgressed {
1674 id: query_id,
1675 stats: get_closest_peers_stats.merge(q.stats),
1676 result: QueryResult::PutRecord(mk_result(record.key)),
1677 step: ProgressStep::first_and_last(),
1678 })
1679 }
1680 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1681 id: query_id,
1682 stats: get_closest_peers_stats.merge(q.stats),
1683 result: QueryResult::RepublishRecord(mk_result(record.key)),
1684 step: ProgressStep::first_and_last(),
1685 }),
1686 PutRecordContext::Replicate => {
1687 tracing::debug!(record=?record.key, "Record replicated");
1688 None
1689 }
1690 }
1691 }
1692 }
1693 }
1694
1695 /// Handles a query that timed out.
1696 fn query_timeout(&mut self, query: Query) -> Option<Event> {
1697 let query_id = query.id();
1698 tracing::trace!(query=?query_id, "Query timed out");
1699 match query.info {
1700 QueryInfo::Bootstrap {
1701 peer,
1702 mut remaining,
1703 mut step,
1704 } => {
1705 let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
1706
1707 // Continue with the next bootstrap query if `remaining` is not empty.
1708 if let Some((target, remaining)) =
1709 remaining.take().and_then(|mut r| Some((r.next()?, r)))
1710 {
1711 let info = QueryInfo::Bootstrap {
1712 peer: target.into_preimage(),
1713 remaining: Some(remaining),
1714 step: step.next(),
1715 };
1716 let peers = self.kbuckets.closest_keys(&target);
1717 self.queries
1718 .continue_iter_closest(query_id, target, peers, info);
1719 } else {
1720 step.last = true;
1721 self.bootstrap_status.on_finish();
1722 }
1723
1724 Some(Event::OutboundQueryProgressed {
1725 id: query_id,
1726 stats: query.stats,
1727 result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
1728 peer,
1729 num_remaining,
1730 })),
1731 step,
1732 })
1733 }
1734
1735 QueryInfo::AddProvider { context, key, .. } => Some(match context {
1736 AddProviderContext::Publish => Event::OutboundQueryProgressed {
1737 id: query_id,
1738 stats: query.stats,
1739 result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
1740 step: ProgressStep::first_and_last(),
1741 },
1742 AddProviderContext::Republish => Event::OutboundQueryProgressed {
1743 id: query_id,
1744 stats: query.stats,
1745 result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
1746 step: ProgressStep::first_and_last(),
1747 },
1748 }),
1749
1750 QueryInfo::GetClosestPeers { key, mut step, .. } => {
1751 step.last = true;
1752 Some(Event::OutboundQueryProgressed {
1753 id: query_id,
1754 stats: query.stats,
1755 result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
1756 key,
1757 peers: query.peers.into_peerinfos_iter().collect(),
1758 })),
1759 step,
1760 })
1761 }
1762
1763 QueryInfo::PutRecord {
1764 record,
1765 quorum,
1766 context,
1767 phase,
1768 } => {
1769 let err = Err(PutRecordError::Timeout {
1770 key: record.key,
1771 quorum,
1772 success: match phase {
1773 PutRecordPhase::GetClosestPeers => vec![],
1774 PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
1775 },
1776 });
1777 match context {
1778 PutRecordContext::Publish | PutRecordContext::Custom => {
1779 Some(Event::OutboundQueryProgressed {
1780 id: query_id,
1781 stats: query.stats,
1782 result: QueryResult::PutRecord(err),
1783 step: ProgressStep::first_and_last(),
1784 })
1785 }
1786 PutRecordContext::Republish => Some(Event::OutboundQueryProgressed {
1787 id: query_id,
1788 stats: query.stats,
1789 result: QueryResult::RepublishRecord(err),
1790 step: ProgressStep::first_and_last(),
1791 }),
1792 PutRecordContext::Replicate => match phase {
1793 PutRecordPhase::GetClosestPeers => {
1794 tracing::warn!(
1795 "Locating closest peers for replication failed: {:?}",
1796 err
1797 );
1798 None
1799 }
1800 PutRecordPhase::PutRecord { .. } => {
1801 tracing::debug!("Replicating record failed: {:?}", err);
1802 None
1803 }
1804 },
1805 }
1806 }
1807
1808 QueryInfo::GetRecord { key, mut step, .. } => {
1809 step.last = true;
1810
1811 Some(Event::OutboundQueryProgressed {
1812 id: query_id,
1813 stats: query.stats,
1814 result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
1815 step,
1816 })
1817 }
1818
1819 QueryInfo::GetProviders { key, mut step, .. } => {
1820 step.last = true;
1821
1822 Some(Event::OutboundQueryProgressed {
1823 id: query_id,
1824 stats: query.stats,
1825 result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
1826 key,
1827 closest_peers: query.peers.into_peerids_iter().collect(),
1828 })),
1829 step,
1830 })
1831 }
1832 }
1833 }
1834
1835 /// Processes a record received from a peer.
1836 fn record_received(
1837 &mut self,
1838 source: PeerId,
1839 connection: ConnectionId,
1840 request_id: RequestId,
1841 mut record: Record,
1842 ) {
1843 if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
1844 // If the (alleged) publisher is the local node, do nothing. The record of
1845 // the original publisher should never change as a result of replication
1846 // and the publisher is always assumed to have the "right" value.
1847 self.queued_events.push_back(ToSwarm::NotifyHandler {
1848 peer_id: source,
1849 handler: NotifyHandler::One(connection),
1850 event: HandlerIn::PutRecordRes {
1851 key: record.key,
1852 value: record.value,
1853 request_id,
1854 },
1855 });
1856 return;
1857 }
1858
1859 let now = Instant::now();
1860
1861 // Calculate the expiration exponentially inversely proportional to the
1862 // number of nodes between the local node and the closest node to the key
1863 // (beyond the replication factor). This ensures avoiding over-caching
1864 // outside of the k closest nodes to a key.
1865 let target = kbucket::Key::new(record.key.clone());
1866 let num_between = self.kbuckets.count_nodes_between(&target);
1867 let k = self.queries.config().replication_factor.get();
1868 let num_beyond_k = (usize::max(k, num_between) - k) as u32;
1869 let expiration = self
1870 .record_ttl
1871 .map(|ttl| now + exp_decrease(ttl, num_beyond_k));
1872 // The smaller TTL prevails. Only if neither TTL is set is the record
1873 // stored "forever".
1874 record.expires = record.expires.or(expiration).min(expiration);
1875
1876 if let Some(job) = self.put_record_job.as_mut() {
1877 // Ignore the record in the next run of the replication
1878 // job, since we can assume the sender replicated the
1879 // record to the k closest peers. Effectively, only
1880 // one of the k closest peers performs a replication
1881 // in the configured interval, assuming a shared interval.
1882 job.skip(record.key.clone())
1883 }
1884
1885 // While records received from a publisher, as well as records that do
1886 // not exist locally should always (attempted to) be stored, there is a
1887 // choice here w.r.t. the handling of replicated records whose keys refer
1888 // to records that exist locally: The value and / or the publisher may
1889 // either be overridden or left unchanged. At the moment and in the
1890 // absence of a decisive argument for another option, both are always
1891 // overridden as it avoids having to load the existing record in the
1892 // first place.
1893
1894 if !record.is_expired(now) {
1895 // The record is cloned because of the weird libp2p protocol
1896 // requirement to send back the value in the response, although this
1897 // is a waste of resources.
1898 match self.record_filtering {
1899 StoreInserts::Unfiltered => match self.store.put(record.clone()) {
1900 Ok(()) => {
1901 tracing::debug!(
1902 record=?record.key,
1903 "Record stored: {} bytes",
1904 record.value.len()
1905 );
1906 self.queued_events.push_back(ToSwarm::GenerateEvent(
1907 Event::InboundRequest {
1908 request: InboundRequest::PutRecord {
1909 source,
1910 connection,
1911 record: None,
1912 },
1913 },
1914 ));
1915 }
1916 Err(e) => {
1917 tracing::info!("Record not stored: {:?}", e);
1918 self.queued_events.push_back(ToSwarm::NotifyHandler {
1919 peer_id: source,
1920 handler: NotifyHandler::One(connection),
1921 event: HandlerIn::Reset(request_id),
1922 });
1923
1924 return;
1925 }
1926 },
1927 StoreInserts::FilterBoth => {
1928 self.queued_events
1929 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1930 request: InboundRequest::PutRecord {
1931 source,
1932 connection,
1933 record: Some(record.clone()),
1934 },
1935 }));
1936 }
1937 }
1938 }
1939
1940 // The remote receives a [`HandlerIn::PutRecordRes`] even in the
1941 // case where the record is discarded due to being expired. Given that
1942 // the remote sent the local node a [`HandlerEvent::PutRecord`]
1943 // request, the remote perceives the local node as one node among the k
1944 // closest nodes to the target. In addition returning
1945 // [`HandlerIn::PutRecordRes`] does not reveal any internal
1946 // information to a possibly malicious remote node.
1947 self.queued_events.push_back(ToSwarm::NotifyHandler {
1948 peer_id: source,
1949 handler: NotifyHandler::One(connection),
1950 event: HandlerIn::PutRecordRes {
1951 key: record.key,
1952 value: record.value,
1953 request_id,
1954 },
1955 })
1956 }
1957
1958 /// Processes a provider record received from a peer.
1959 fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
1960 if &provider.node_id != self.kbuckets.local_key().preimage() {
1961 let record = ProviderRecord {
1962 key,
1963 provider: provider.node_id,
1964 expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
1965 addresses: provider.multiaddrs,
1966 };
1967 match self.record_filtering {
1968 StoreInserts::Unfiltered => {
1969 if let Err(e) = self.store.add_provider(record) {
1970 tracing::info!("Provider record not stored: {:?}", e);
1971 return;
1972 }
1973
1974 self.queued_events
1975 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1976 request: InboundRequest::AddProvider { record: None },
1977 }));
1978 }
1979 StoreInserts::FilterBoth => {
1980 self.queued_events
1981 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
1982 request: InboundRequest::AddProvider {
1983 record: Some(record),
1984 },
1985 }));
1986 }
1987 }
1988 }
1989 }
1990
1991 fn address_failed(&mut self, peer_id: PeerId, address: &Multiaddr) {
1992 let key = kbucket::Key::from(peer_id);
1993
1994 if let Some(addrs) = self.kbuckets.entry(&key).as_mut().and_then(|e| e.value()) {
1995 // TODO: Ideally, the address should only be removed if the error can
1996 // be classified as "permanent" but since `err` is currently a borrowed
1997 // trait object without a `'static` bound, even downcasting for inspection
1998 // of the error is not possible (and also not truly desirable or ergonomic).
1999 // The error passed in should rather be a dedicated enum.
2000 if addrs.remove(address).is_ok() {
2001 tracing::debug!(
2002 peer=%peer_id,
2003 %address,
2004 "Address removed from peer due to error."
2005 );
2006 } else {
2007 // Despite apparently having no reachable address (any longer),
2008 // the peer is kept in the routing table with the last address to avoid
2009 // (temporary) loss of network connectivity to "flush" the routing
2010 // table. Once in, a peer is only removed from the routing table
2011 // if it is the least recently connected peer, currently disconnected
2012 // and is unreachable in the context of another peer pending insertion
2013 // into the same bucket. This is handled transparently by the
2014 // `KBucketsTable` and takes effect through `KBucketsTable::take_applied_pending`
2015 // within `Behaviour::poll`.
2016 tracing::debug!(
2017 peer=%peer_id,
2018 %address,
2019 "Last remaining address of peer is unreachable."
2020 );
2021 }
2022 }
2023
2024 for query in self.queries.iter_mut() {
2025 if let Some(addrs) = query.peers.addresses.get_mut(&peer_id) {
2026 addrs.retain(|a| a != address);
2027 }
2028 }
2029 }
2030
2031 fn on_connection_established(
2032 &mut self,
2033 ConnectionEstablished {
2034 peer_id,
2035 failed_addresses,
2036 other_established,
2037 ..
2038 }: ConnectionEstablished,
2039 ) {
2040 for addr in failed_addresses {
2041 self.address_failed(peer_id, addr);
2042 }
2043
2044 // Peer's first connection.
2045 if other_established == 0 {
2046 self.connected_peers.insert(peer_id);
2047 }
2048 }
2049
2050 fn on_address_change(
2051 &mut self,
2052 AddressChange {
2053 peer_id: peer,
2054 old,
2055 new,
2056 ..
2057 }: AddressChange,
2058 ) {
2059 let (old, new) = (old.get_remote_address(), new.get_remote_address());
2060
2061 // Update routing table.
2062 if let Some(addrs) = self
2063 .kbuckets
2064 .entry(&kbucket::Key::from(peer))
2065 .as_mut()
2066 .and_then(|e| e.value())
2067 {
2068 if addrs.replace(old, new) {
2069 tracing::debug!(
2070 %peer,
2071 old_address=%old,
2072 new_address=%new,
2073 "Old address replaced with new address for peer."
2074 );
2075 } else {
2076 tracing::debug!(
2077 %peer,
2078 old_address=%old,
2079 new_address=%new,
2080 "Old address not replaced with new address for peer as old address wasn't present.",
2081 );
2082 }
2083 } else {
2084 tracing::debug!(
2085 %peer,
2086 old_address=%old,
2087 new_address=%new,
2088 "Old address not replaced with new address for peer as peer is not present in the \
2089 routing table."
2090 );
2091 }
2092
2093 // Update query address cache.
2094 //
2095 // Given two connected nodes: local node A and remote node B. Say node B
2096 // is not in node A's routing table. Additionally node B is part of the
2097 // `Query::addresses` list of an ongoing query on node A. Say Node
2098 // B triggers an address change and then disconnects. Later on the
2099 // earlier mentioned query on node A would like to connect to node B.
2100 // Without replacing the address in the `Query::addresses` set node
2101 // A would attempt to dial the old and not the new address.
2102 //
2103 // While upholding correctness, iterating through all discovered
2104 // addresses of a peer in all currently ongoing queries might have a
2105 // large performance impact. If so, the code below might be worth
2106 // revisiting.
2107 for query in self.queries.iter_mut() {
2108 if let Some(addrs) = query.peers.addresses.get_mut(&peer) {
2109 for addr in addrs.iter_mut() {
2110 if addr == old {
2111 *addr = new.clone();
2112 }
2113 }
2114 }
2115 }
2116 }
2117
2118 fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) {
2119 let Some(peer_id) = peer_id else { return };
2120
2121 match error {
2122 DialError::LocalPeerId { .. }
2123 | DialError::WrongPeerId { .. }
2124 | DialError::Aborted
2125 | DialError::Denied { .. }
2126 | DialError::Transport(_)
2127 | DialError::NoAddresses => {
2128 if let DialError::Transport(addresses) = error {
2129 for (addr, _) in addresses {
2130 self.address_failed(peer_id, addr)
2131 }
2132 }
2133
2134 for query in self.queries.iter_mut() {
2135 query.on_failure(&peer_id);
2136 }
2137 }
2138 DialError::DialPeerConditionFalse(
2139 dial_opts::PeerCondition::Disconnected
2140 | dial_opts::PeerCondition::NotDialing
2141 | dial_opts::PeerCondition::DisconnectedAndNotDialing,
2142 ) => {
2143 // We might (still) be connected, or about to be connected, thus do not report the
2144 // failure to the queries.
2145 }
2146 DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => {
2147 unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse.");
2148 }
2149 }
2150 }
2151
2152 fn on_connection_closed(
2153 &mut self,
2154 ConnectionClosed {
2155 peer_id,
2156 remaining_established,
2157 connection_id,
2158 ..
2159 }: ConnectionClosed,
2160 ) {
2161 self.connections.remove(&connection_id);
2162
2163 if remaining_established == 0 {
2164 for query in self.queries.iter_mut() {
2165 query.on_failure(&peer_id);
2166 }
2167 self.connection_updated(peer_id, None, NodeStatus::Disconnected);
2168 self.connected_peers.remove(&peer_id);
2169 }
2170 }
2171
2172 /// Preloads a new [`Handler`] with requests that are waiting
2173 /// to be sent to the newly connected peer.
2174 fn preload_new_handler(
2175 &mut self,
2176 handler: &mut Handler,
2177 connection_id: ConnectionId,
2178 peer: PeerId,
2179 ) {
2180 self.connections.insert(connection_id, peer);
2181 // Queue events for sending pending RPCs to the connected peer.
2182 // There can be only one pending RPC for a particular peer and query per definition.
2183 for (_peer_id, event) in self.queries.iter_mut().filter_map(|q| {
2184 q.pending_rpcs
2185 .iter()
2186 .position(|(p, _)| p == &peer)
2187 .map(|p| q.pending_rpcs.remove(p))
2188 }) {
2189 handler.on_behaviour_event(event)
2190 }
2191 }
2192}
2193
2194/// Exponentially decrease the given duration (base 2).
2195fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
2196 Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
2197}
2198
2199impl<TStore> NetworkBehaviour for Behaviour<TStore>
2200where
2201 TStore: RecordStore + Send + 'static,
2202{
2203 type ConnectionHandler = Handler;
2204 type ToSwarm = Event;
2205
2206 fn handle_established_inbound_connection(
2207 &mut self,
2208 connection_id: ConnectionId,
2209 peer: PeerId,
2210 local_addr: &Multiaddr,
2211 remote_addr: &Multiaddr,
2212 ) -> Result<THandler<Self>, ConnectionDenied> {
2213 let connected_point = ConnectedPoint::Listener {
2214 local_addr: local_addr.clone(),
2215 send_back_addr: remote_addr.clone(),
2216 };
2217
2218 let mut handler = Handler::new(
2219 self.protocol_config.clone(),
2220 connected_point,
2221 peer,
2222 self.mode,
2223 );
2224 self.preload_new_handler(&mut handler, connection_id, peer);
2225
2226 Ok(handler)
2227 }
2228
2229 fn handle_established_outbound_connection(
2230 &mut self,
2231 connection_id: ConnectionId,
2232 peer: PeerId,
2233 addr: &Multiaddr,
2234 role_override: Endpoint,
2235 port_use: PortUse,
2236 ) -> Result<THandler<Self>, ConnectionDenied> {
2237 let connected_point = ConnectedPoint::Dialer {
2238 address: addr.clone(),
2239 role_override,
2240 port_use,
2241 };
2242
2243 let mut handler = Handler::new(
2244 self.protocol_config.clone(),
2245 connected_point,
2246 peer,
2247 self.mode,
2248 );
2249 self.preload_new_handler(&mut handler, connection_id, peer);
2250
2251 Ok(handler)
2252 }
2253
2254 fn handle_pending_outbound_connection(
2255 &mut self,
2256 _connection_id: ConnectionId,
2257 maybe_peer: Option<PeerId>,
2258 _addresses: &[Multiaddr],
2259 _effective_role: Endpoint,
2260 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
2261 let peer_id = match maybe_peer {
2262 None => return Ok(vec![]),
2263 Some(peer) => peer,
2264 };
2265
2266 // We should order addresses from decreasing likelihood of connectivity, so start with
2267 // the addresses of that peer in the k-buckets.
2268 let key = kbucket::Key::from(peer_id);
2269 let mut peer_addrs =
2270 if let Some(kbucket::Entry::Present(mut entry, _)) = self.kbuckets.entry(&key) {
2271 let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
2272 debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
2273 addrs
2274 } else {
2275 Vec::new()
2276 };
2277
2278 // We add to that a temporary list of addresses from the ongoing queries.
2279 for query in self.queries.iter() {
2280 if let Some(addrs) = query.peers.addresses.get(&peer_id) {
2281 peer_addrs.extend(addrs.iter().cloned())
2282 }
2283 }
2284
2285 Ok(peer_addrs)
2286 }
2287
2288 fn on_connection_handler_event(
2289 &mut self,
2290 source: PeerId,
2291 connection: ConnectionId,
2292 event: THandlerOutEvent<Self>,
2293 ) {
2294 match event {
2295 HandlerEvent::ProtocolConfirmed { endpoint } => {
2296 debug_assert!(self.connected_peers.contains(&source));
2297 // The remote's address can only be put into the routing table,
2298 // and thus shared with other nodes, if the local node is the dialer,
2299 // since the remote address on an inbound connection may be specific
2300 // to that connection (e.g. typically the TCP port numbers).
2301 let address = match endpoint {
2302 ConnectedPoint::Dialer { address, .. } => Some(address),
2303 ConnectedPoint::Listener { .. } => None,
2304 };
2305
2306 self.connection_updated(source, address, NodeStatus::Connected);
2307 }
2308
2309 HandlerEvent::ProtocolNotSupported { endpoint } => {
2310 let address = match endpoint {
2311 ConnectedPoint::Dialer { address, .. } => Some(address),
2312 ConnectedPoint::Listener { .. } => None,
2313 };
2314 self.connection_updated(source, address, NodeStatus::Disconnected);
2315 }
2316
2317 HandlerEvent::FindNodeReq { key, request_id } => {
2318 let closer_peers = self
2319 .find_closest_local_peers(&kbucket::Key::new(key), &source)
2320 .collect::<Vec<_>>();
2321
2322 self.queued_events
2323 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2324 request: InboundRequest::FindNode {
2325 num_closer_peers: closer_peers.len(),
2326 },
2327 }));
2328
2329 self.queued_events.push_back(ToSwarm::NotifyHandler {
2330 peer_id: source,
2331 handler: NotifyHandler::One(connection),
2332 event: HandlerIn::FindNodeRes {
2333 closer_peers,
2334 request_id,
2335 },
2336 });
2337 }
2338
2339 HandlerEvent::FindNodeRes {
2340 closer_peers,
2341 query_id,
2342 } => {
2343 self.discovered(&query_id, &source, closer_peers.iter());
2344 }
2345
2346 HandlerEvent::GetProvidersReq { key, request_id } => {
2347 let provider_peers = self.provider_peers(&key, &source);
2348 let closer_peers = self
2349 .find_closest_local_peers(&kbucket::Key::new(key), &source)
2350 .collect::<Vec<_>>();
2351
2352 self.queued_events
2353 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2354 request: InboundRequest::GetProvider {
2355 num_closer_peers: closer_peers.len(),
2356 num_provider_peers: provider_peers.len(),
2357 },
2358 }));
2359
2360 self.queued_events.push_back(ToSwarm::NotifyHandler {
2361 peer_id: source,
2362 handler: NotifyHandler::One(connection),
2363 event: HandlerIn::GetProvidersRes {
2364 closer_peers,
2365 provider_peers,
2366 request_id,
2367 },
2368 });
2369 }
2370
2371 HandlerEvent::GetProvidersRes {
2372 closer_peers,
2373 provider_peers,
2374 query_id,
2375 } => {
2376 let peers = closer_peers.iter().chain(provider_peers.iter());
2377 self.discovered(&query_id, &source, peers);
2378 if let Some(query) = self.queries.get_mut(&query_id) {
2379 let stats = query.stats().clone();
2380 if let QueryInfo::GetProviders {
2381 ref key,
2382 ref mut providers_found,
2383 ref mut step,
2384 ..
2385 } = query.info
2386 {
2387 *providers_found += provider_peers.len();
2388 let providers = provider_peers.iter().map(|p| p.node_id).collect();
2389
2390 self.queued_events.push_back(ToSwarm::GenerateEvent(
2391 Event::OutboundQueryProgressed {
2392 id: query_id,
2393 result: QueryResult::GetProviders(Ok(
2394 GetProvidersOk::FoundProviders {
2395 key: key.clone(),
2396 providers,
2397 },
2398 )),
2399 step: step.clone(),
2400 stats,
2401 },
2402 ));
2403 *step = step.next();
2404 }
2405 }
2406 }
2407 HandlerEvent::QueryError { query_id, error } => {
2408 tracing::debug!(
2409 peer=%source,
2410 query=?query_id,
2411 "Request to peer in query failed with {:?}",
2412 error
2413 );
2414 // If the query to which the error relates is still active,
2415 // signal the failure w.r.t. `source`.
2416 if let Some(query) = self.queries.get_mut(&query_id) {
2417 query.on_failure(&source)
2418 }
2419 }
2420
2421 HandlerEvent::AddProvider { key, provider } => {
2422 // Only accept a provider record from a legitimate peer.
2423 if provider.node_id != source {
2424 return;
2425 }
2426
2427 self.provider_received(key, provider);
2428 }
2429
2430 HandlerEvent::GetRecord { key, request_id } => {
2431 // Lookup the record locally.
2432 let record = match self.store.get(&key) {
2433 Some(record) => {
2434 if record.is_expired(Instant::now()) {
2435 self.store.remove(&key);
2436 None
2437 } else {
2438 Some(record.into_owned())
2439 }
2440 }
2441 None => None,
2442 };
2443
2444 let closer_peers = self
2445 .find_closest_local_peers(&kbucket::Key::new(key), &source)
2446 .collect::<Vec<_>>();
2447
2448 self.queued_events
2449 .push_back(ToSwarm::GenerateEvent(Event::InboundRequest {
2450 request: InboundRequest::GetRecord {
2451 num_closer_peers: closer_peers.len(),
2452 present_locally: record.is_some(),
2453 },
2454 }));
2455
2456 self.queued_events.push_back(ToSwarm::NotifyHandler {
2457 peer_id: source,
2458 handler: NotifyHandler::One(connection),
2459 event: HandlerIn::GetRecordRes {
2460 record,
2461 closer_peers,
2462 request_id,
2463 },
2464 });
2465 }
2466
2467 HandlerEvent::GetRecordRes {
2468 record,
2469 closer_peers,
2470 query_id,
2471 } => {
2472 if let Some(query) = self.queries.get_mut(&query_id) {
2473 let stats = query.stats().clone();
2474 if let QueryInfo::GetRecord {
2475 key,
2476 ref mut step,
2477 ref mut found_a_record,
2478 cache_candidates,
2479 } = &mut query.info
2480 {
2481 if let Some(record) = record {
2482 *found_a_record = true;
2483 let record = PeerRecord {
2484 peer: Some(source),
2485 record,
2486 };
2487
2488 self.queued_events.push_back(ToSwarm::GenerateEvent(
2489 Event::OutboundQueryProgressed {
2490 id: query_id,
2491 result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
2492 record,
2493 ))),
2494 step: step.clone(),
2495 stats,
2496 },
2497 ));
2498
2499 *step = step.next();
2500 } else {
2501 tracing::trace!(record=?key, %source, "Record not found at source");
2502 if let Caching::Enabled { max_peers } = self.caching {
2503 let source_key = kbucket::Key::from(source);
2504 let target_key = kbucket::Key::from(key.clone());
2505 let distance = source_key.distance(&target_key);
2506 cache_candidates.insert(distance, source);
2507 if cache_candidates.len() > max_peers as usize {
2508 // TODO: `pop_last()` would be nice once stabilised.
2509 // See https://github.com/rust-lang/rust/issues/62924.
2510 let last =
2511 *cache_candidates.keys().next_back().expect("len > 0");
2512 cache_candidates.remove(&last);
2513 }
2514 }
2515 }
2516 }
2517 }
2518
2519 self.discovered(&query_id, &source, closer_peers.iter());
2520 }
2521
2522 HandlerEvent::PutRecord { record, request_id } => {
2523 self.record_received(source, connection, request_id, record);
2524 }
2525
2526 HandlerEvent::PutRecordRes { query_id, .. } => {
2527 if let Some(query) = self.queries.get_mut(&query_id) {
2528 query.on_success(&source, vec![]);
2529 if let QueryInfo::PutRecord {
2530 phase: PutRecordPhase::PutRecord { success, .. },
2531 quorum,
2532 ..
2533 } = &mut query.info
2534 {
2535 success.push(source);
2536
2537 let quorum = quorum.get();
2538 if success.len() >= quorum {
2539 let peers = success.clone();
2540 let finished = query.try_finish(peers.iter());
2541 if !finished {
2542 tracing::debug!(
2543 peer=%source,
2544 query=?query_id,
2545 "PutRecord query reached quorum ({}/{}) with response \
2546 from peer but could not yet finish.",
2547 peers.len(),
2548 quorum,
2549 );
2550 }
2551 }
2552 }
2553 }
2554 }
2555 };
2556 }
2557
2558 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
2559 fn poll(
2560 &mut self,
2561 cx: &mut Context<'_>,
2562 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
2563 let now = Instant::now();
2564
2565 // Calculate the available capacity for queries triggered by background jobs.
2566 let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
2567
2568 // Run the periodic provider announcement job.
2569 if let Some(mut job) = self.add_provider_job.take() {
2570 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2571 for i in 0..num {
2572 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2573 self.start_add_provider(r.key, AddProviderContext::Republish)
2574 } else {
2575 jobs_query_capacity -= i;
2576 break;
2577 }
2578 }
2579 self.add_provider_job = Some(job);
2580 }
2581
2582 // Run the periodic record replication / publication job.
2583 if let Some(mut job) = self.put_record_job.take() {
2584 let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
2585 for _ in 0..num {
2586 if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
2587 let context =
2588 if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
2589 PutRecordContext::Republish
2590 } else {
2591 PutRecordContext::Replicate
2592 };
2593 self.start_put_record(r, Quorum::All, context)
2594 } else {
2595 break;
2596 }
2597 }
2598 self.put_record_job = Some(job);
2599 }
2600
2601 // Poll bootstrap periodically and automatically.
2602 if let Poll::Ready(()) = self.bootstrap_status.poll_next_bootstrap(cx) {
2603 if let Err(e) = self.bootstrap() {
2604 tracing::warn!("Failed to trigger bootstrap: {e}");
2605 }
2606 }
2607
2608 loop {
2609 // Drain queued events first.
2610 if let Some(event) = self.queued_events.pop_front() {
2611 return Poll::Ready(event);
2612 }
2613
2614 // Drain applied pending entries from the routing table.
2615 if let Some(entry) = self.kbuckets.take_applied_pending() {
2616 let kbucket::Node { key, value } = entry.inserted;
2617 let peer_id = key.into_preimage();
2618 self.queued_events
2619 .push_back(ToSwarm::NewExternalAddrOfPeer {
2620 peer_id,
2621 address: value.first().clone(),
2622 });
2623 let event = Event::RoutingUpdated {
2624 bucket_range: self
2625 .kbuckets
2626 .bucket(&key)
2627 .map(|b| b.range())
2628 .expect("Self to never be applied from pending."),
2629 peer: peer_id,
2630 is_new_peer: true,
2631 addresses: value,
2632 old_peer: entry.evicted.map(|n| n.key.into_preimage()),
2633 };
2634 return Poll::Ready(ToSwarm::GenerateEvent(event));
2635 }
2636
2637 // Look for a finished query.
2638 loop {
2639 match self.queries.poll(now) {
2640 QueryPoolState::Finished(q) => {
2641 if let Some(event) = self.query_finished(q) {
2642 return Poll::Ready(ToSwarm::GenerateEvent(event));
2643 }
2644 }
2645 QueryPoolState::Timeout(q) => {
2646 if let Some(event) = self.query_timeout(q) {
2647 return Poll::Ready(ToSwarm::GenerateEvent(event));
2648 }
2649 }
2650 QueryPoolState::Waiting(Some((query, peer_id))) => {
2651 let event = query.info.to_request(query.id());
2652 // TODO: AddProvider requests yield no response, so the query completes
2653 // as soon as all requests have been sent. However, the handler should
2654 // better emit an event when the request has been sent (and report
2655 // an error if sending fails), instead of immediately reporting
2656 // "success" somewhat prematurely here.
2657 if let QueryInfo::AddProvider {
2658 phase: AddProviderPhase::AddProvider { .. },
2659 ..
2660 } = &query.info
2661 {
2662 query.on_success(&peer_id, vec![])
2663 }
2664
2665 if self.connected_peers.contains(&peer_id) {
2666 self.queued_events.push_back(ToSwarm::NotifyHandler {
2667 peer_id,
2668 event,
2669 handler: NotifyHandler::Any,
2670 });
2671 } else if &peer_id != self.kbuckets.local_key().preimage() {
2672 query.pending_rpcs.push((peer_id, event));
2673 self.queued_events.push_back(ToSwarm::Dial {
2674 opts: DialOpts::peer_id(peer_id).build(),
2675 });
2676 }
2677 }
2678 QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
2679 }
2680 }
2681
2682 // No immediate event was produced as a result of a finished query.
2683 // If no new events have been queued either, signal `NotReady` to
2684 // be polled again later.
2685 if self.queued_events.is_empty() {
2686 self.no_events_waker = Some(cx.waker().clone());
2687
2688 return Poll::Pending;
2689 }
2690 }
2691 }
2692
2693 fn on_swarm_event(&mut self, event: FromSwarm) {
2694 self.listen_addresses.on_swarm_event(&event);
2695 let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
2696
2697 if self.auto_mode && external_addresses_changed {
2698 self.determine_mode_from_external_addresses();
2699 }
2700
2701 match event {
2702 FromSwarm::ConnectionEstablished(connection_established) => {
2703 self.on_connection_established(connection_established)
2704 }
2705 FromSwarm::ConnectionClosed(connection_closed) => {
2706 self.on_connection_closed(connection_closed)
2707 }
2708 FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
2709 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
2710 FromSwarm::NewListenAddr(_) if self.connected_peers.is_empty() => {
2711 // A new listen addr was just discovered and we have no connected peers,
2712 // it can mean that our network interfaces were not up but they are now
2713 // so it might be a good idea to trigger a bootstrap.
2714 self.bootstrap_status.trigger();
2715 }
2716 _ => {}
2717 }
2718 }
2719}
2720
2721/// Peer Info combines a Peer ID with a set of multiaddrs that the peer is listening on.
2722#[derive(Debug, Clone, PartialEq, Eq)]
2723pub struct PeerInfo {
2724 pub peer_id: PeerId,
2725 pub addrs: Vec<Multiaddr>,
2726}
2727
2728/// A quorum w.r.t. the configured replication factor specifies the minimum
2729/// number of distinct nodes that must be successfully contacted in order
2730/// for a query to succeed.
2731#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2732pub enum Quorum {
2733 One,
2734 Majority,
2735 All,
2736 N(NonZeroUsize),
2737}
2738
2739impl Quorum {
2740 /// Evaluate the quorum w.r.t a given total (number of peers).
2741 fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
2742 match self {
2743 Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
2744 Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
2745 Quorum::All => total,
2746 Quorum::N(n) => NonZeroUsize::min(total, *n),
2747 }
2748 }
2749}
2750
2751/// A record either received by the given peer or retrieved from the local
2752/// record store.
2753#[derive(Debug, Clone, PartialEq, Eq)]
2754pub struct PeerRecord {
2755 /// The peer from whom the record was received. `None` if the record was
2756 /// retrieved from local storage.
2757 pub peer: Option<PeerId>,
2758 pub record: Record,
2759}
2760
2761//////////////////////////////////////////////////////////////////////////////
2762// Events
2763
2764/// The events produced by the `Kademlia` behaviour.
2765///
2766/// See [`NetworkBehaviour::poll`].
2767#[derive(Debug, Clone)]
2768#[allow(clippy::large_enum_variant)]
2769pub enum Event {
2770 /// An inbound request has been received and handled.
2771 // Note on the difference between 'request' and 'query': A request is a
2772 // single request-response style exchange with a single remote peer. A query
2773 // is made of multiple requests across multiple remote peers.
2774 InboundRequest { request: InboundRequest },
2775
2776 /// An outbound query has made progress.
2777 OutboundQueryProgressed {
2778 /// The ID of the query that finished.
2779 id: QueryId,
2780 /// The intermediate result of the query.
2781 result: QueryResult,
2782 /// Execution statistics from the query.
2783 stats: QueryStats,
2784 /// Indicates which event this is, if therer are multiple responses for a single query.
2785 step: ProgressStep,
2786 },
2787
2788 /// The routing table has been updated with a new peer and / or
2789 /// address, thereby possibly evicting another peer.
2790 RoutingUpdated {
2791 /// The ID of the peer that was added or updated.
2792 peer: PeerId,
2793 /// Whether this is a new peer and was thus just added to the routing
2794 /// table, or whether it is an existing peer who's addresses changed.
2795 is_new_peer: bool,
2796 /// The full list of known addresses of `peer`.
2797 addresses: Addresses,
2798 /// Returns the minimum inclusive and maximum inclusive distance for
2799 /// the bucket of the peer.
2800 bucket_range: (Distance, Distance),
2801 /// The ID of the peer that was evicted from the routing table to make
2802 /// room for the new peer, if any.
2803 old_peer: Option<PeerId>,
2804 },
2805
2806 /// A peer has connected for whom no listen address is known.
2807 ///
2808 /// If the peer is to be added to the routing table, a known
2809 /// listen address for the peer must be provided via [`Behaviour::add_address`].
2810 UnroutablePeer { peer: PeerId },
2811
2812 /// A connection to a peer has been established for whom a listen address
2813 /// is known but the peer has not been added to the routing table either
2814 /// because [`BucketInserts::Manual`] is configured or because
2815 /// the corresponding bucket is full.
2816 ///
2817 /// If the peer is to be included in the routing table, it must
2818 /// must be explicitly added via [`Behaviour::add_address`], possibly after
2819 /// removing another peer.
2820 ///
2821 /// See [`Behaviour::kbucket`] for insight into the contents of
2822 /// the k-bucket of `peer`.
2823 RoutablePeer { peer: PeerId, address: Multiaddr },
2824
2825 /// A connection to a peer has been established for whom a listen address
2826 /// is known but the peer is only pending insertion into the routing table
2827 /// if the least-recently disconnected peer is unresponsive, i.e. the peer
2828 /// may not make it into the routing table.
2829 ///
2830 /// If the peer is to be unconditionally included in the routing table,
2831 /// it should be explicitly added via [`Behaviour::add_address`] after
2832 /// removing another peer.
2833 ///
2834 /// See [`Behaviour::kbucket`] for insight into the contents of
2835 /// the k-bucket of `peer`.
2836 PendingRoutablePeer { peer: PeerId, address: Multiaddr },
2837
2838 /// This peer's mode has been updated automatically.
2839 ///
2840 /// This happens in response to an external
2841 /// address being added or removed.
2842 ModeChanged { new_mode: Mode },
2843}
2844
2845/// Information about progress events.
2846#[derive(Debug, Clone)]
2847pub struct ProgressStep {
2848 /// The index into the event
2849 pub count: NonZeroUsize,
2850 /// Is this the final event?
2851 pub last: bool,
2852}
2853
2854impl ProgressStep {
2855 fn first() -> Self {
2856 Self {
2857 count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
2858 last: false,
2859 }
2860 }
2861
2862 fn first_and_last() -> Self {
2863 let mut first = ProgressStep::first();
2864 first.last = true;
2865 first
2866 }
2867
2868 fn next(&self) -> Self {
2869 assert!(!self.last);
2870 let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
2871
2872 Self { count, last: false }
2873 }
2874}
2875
2876/// Information about a received and handled inbound request.
2877#[derive(Debug, Clone)]
2878pub enum InboundRequest {
2879 /// Request for the list of nodes whose IDs are the closest to `key`.
2880 FindNode { num_closer_peers: usize },
2881 /// Same as `FindNode`, but should also return the entries of the local
2882 /// providers list for this key.
2883 GetProvider {
2884 num_closer_peers: usize,
2885 num_provider_peers: usize,
2886 },
2887 /// A peer sent an add provider request.
2888 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
2889 /// included.
2890 ///
2891 /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details..
2892 AddProvider { record: Option<ProviderRecord> },
2893 /// Request to retrieve a record.
2894 GetRecord {
2895 num_closer_peers: usize,
2896 present_locally: bool,
2897 },
2898 /// A peer sent a put record request.
2899 /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`Record`] is included.
2900 ///
2901 /// See [`StoreInserts`] and [`Config::set_record_filtering`].
2902 PutRecord {
2903 source: PeerId,
2904 connection: ConnectionId,
2905 record: Option<Record>,
2906 },
2907}
2908
2909/// The results of Kademlia queries.
2910#[derive(Debug, Clone)]
2911pub enum QueryResult {
2912 /// The result of [`Behaviour::bootstrap`].
2913 Bootstrap(BootstrapResult),
2914
2915 /// The result of [`Behaviour::get_closest_peers`].
2916 GetClosestPeers(GetClosestPeersResult),
2917
2918 /// The result of [`Behaviour::get_providers`].
2919 GetProviders(GetProvidersResult),
2920
2921 /// The result of [`Behaviour::start_providing`].
2922 StartProviding(AddProviderResult),
2923
2924 /// The result of a (automatic) republishing of a provider record.
2925 RepublishProvider(AddProviderResult),
2926
2927 /// The result of [`Behaviour::get_record`].
2928 GetRecord(GetRecordResult),
2929
2930 /// The result of [`Behaviour::put_record`].
2931 PutRecord(PutRecordResult),
2932
2933 /// The result of a (automatic) republishing of a (value-)record.
2934 RepublishRecord(PutRecordResult),
2935}
2936
2937/// The result of [`Behaviour::get_record`].
2938pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
2939
2940/// The successful result of [`Behaviour::get_record`].
2941#[derive(Debug, Clone)]
2942pub enum GetRecordOk {
2943 FoundRecord(PeerRecord),
2944 FinishedWithNoAdditionalRecord {
2945 /// If caching is enabled, these are the peers closest
2946 /// _to the record key_ (not the local node) that were queried but
2947 /// did not return the record, sorted by distance to the record key
2948 /// from closest to farthest. How many of these are tracked is configured
2949 /// by [`Config::set_caching`].
2950 ///
2951 /// Writing back the cache at these peers is a manual operation.
2952 /// ie. you may wish to use these candidates with [`Behaviour::put_record_to`]
2953 /// after selecting one of the returned records.
2954 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
2955 },
2956}
2957
2958/// The error result of [`Behaviour::get_record`].
2959#[derive(Debug, Clone, Error)]
2960pub enum GetRecordError {
2961 #[error("the record was not found")]
2962 NotFound {
2963 key: record::Key,
2964 closest_peers: Vec<PeerId>,
2965 },
2966 #[error("the quorum failed; needed {quorum} peers")]
2967 QuorumFailed {
2968 key: record::Key,
2969 records: Vec<PeerRecord>,
2970 quorum: NonZeroUsize,
2971 },
2972 #[error("the request timed out")]
2973 Timeout { key: record::Key },
2974}
2975
2976impl GetRecordError {
2977 /// Gets the key of the record for which the operation failed.
2978 pub fn key(&self) -> &record::Key {
2979 match self {
2980 GetRecordError::QuorumFailed { key, .. } => key,
2981 GetRecordError::Timeout { key, .. } => key,
2982 GetRecordError::NotFound { key, .. } => key,
2983 }
2984 }
2985
2986 /// Extracts the key of the record for which the operation failed,
2987 /// consuming the error.
2988 pub fn into_key(self) -> record::Key {
2989 match self {
2990 GetRecordError::QuorumFailed { key, .. } => key,
2991 GetRecordError::Timeout { key, .. } => key,
2992 GetRecordError::NotFound { key, .. } => key,
2993 }
2994 }
2995}
2996
2997/// The result of [`Behaviour::put_record`].
2998pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
2999
3000/// The successful result of [`Behaviour::put_record`].
3001#[derive(Debug, Clone)]
3002pub struct PutRecordOk {
3003 pub key: record::Key,
3004}
3005
3006/// The error result of [`Behaviour::put_record`].
3007#[derive(Debug, Clone, Error)]
3008pub enum PutRecordError {
3009 #[error("the quorum failed; needed {quorum} peers")]
3010 QuorumFailed {
3011 key: record::Key,
3012 /// [`PeerId`]s of the peers the record was successfully stored on.
3013 success: Vec<PeerId>,
3014 quorum: NonZeroUsize,
3015 },
3016 #[error("the request timed out")]
3017 Timeout {
3018 key: record::Key,
3019 /// [`PeerId`]s of the peers the record was successfully stored on.
3020 success: Vec<PeerId>,
3021 quorum: NonZeroUsize,
3022 },
3023}
3024
3025impl PutRecordError {
3026 /// Gets the key of the record for which the operation failed.
3027 pub fn key(&self) -> &record::Key {
3028 match self {
3029 PutRecordError::QuorumFailed { key, .. } => key,
3030 PutRecordError::Timeout { key, .. } => key,
3031 }
3032 }
3033
3034 /// Extracts the key of the record for which the operation failed,
3035 /// consuming the error.
3036 pub fn into_key(self) -> record::Key {
3037 match self {
3038 PutRecordError::QuorumFailed { key, .. } => key,
3039 PutRecordError::Timeout { key, .. } => key,
3040 }
3041 }
3042}
3043
3044/// The result of [`Behaviour::bootstrap`].
3045pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
3046
3047/// The successful result of [`Behaviour::bootstrap`].
3048#[derive(Debug, Clone)]
3049pub struct BootstrapOk {
3050 pub peer: PeerId,
3051 pub num_remaining: u32,
3052}
3053
3054/// The error result of [`Behaviour::bootstrap`].
3055#[derive(Debug, Clone, Error)]
3056pub enum BootstrapError {
3057 #[error("the request timed out")]
3058 Timeout {
3059 peer: PeerId,
3060 num_remaining: Option<u32>,
3061 },
3062}
3063
3064/// The result of [`Behaviour::get_closest_peers`].
3065pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
3066
3067/// The successful result of [`Behaviour::get_closest_peers`].
3068#[derive(Debug, Clone)]
3069pub struct GetClosestPeersOk {
3070 pub key: Vec<u8>,
3071 pub peers: Vec<PeerInfo>,
3072}
3073
3074/// The error result of [`Behaviour::get_closest_peers`].
3075#[derive(Debug, Clone, Error)]
3076pub enum GetClosestPeersError {
3077 #[error("the request timed out")]
3078 Timeout { key: Vec<u8>, peers: Vec<PeerInfo> },
3079}
3080
3081impl GetClosestPeersError {
3082 /// Gets the key for which the operation failed.
3083 pub fn key(&self) -> &Vec<u8> {
3084 match self {
3085 GetClosestPeersError::Timeout { key, .. } => key,
3086 }
3087 }
3088
3089 /// Extracts the key for which the operation failed,
3090 /// consuming the error.
3091 pub fn into_key(self) -> Vec<u8> {
3092 match self {
3093 GetClosestPeersError::Timeout { key, .. } => key,
3094 }
3095 }
3096}
3097
3098/// The result of [`Behaviour::get_providers`].
3099pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
3100
3101/// The successful result of [`Behaviour::get_providers`].
3102#[derive(Debug, Clone)]
3103pub enum GetProvidersOk {
3104 FoundProviders {
3105 key: record::Key,
3106 /// The new set of providers discovered.
3107 providers: HashSet<PeerId>,
3108 },
3109 FinishedWithNoAdditionalRecord {
3110 closest_peers: Vec<PeerId>,
3111 },
3112}
3113
3114/// The error result of [`Behaviour::get_providers`].
3115#[derive(Debug, Clone, Error)]
3116pub enum GetProvidersError {
3117 #[error("the request timed out")]
3118 Timeout {
3119 key: record::Key,
3120 closest_peers: Vec<PeerId>,
3121 },
3122}
3123
3124impl GetProvidersError {
3125 /// Gets the key for which the operation failed.
3126 pub fn key(&self) -> &record::Key {
3127 match self {
3128 GetProvidersError::Timeout { key, .. } => key,
3129 }
3130 }
3131
3132 /// Extracts the key for which the operation failed,
3133 /// consuming the error.
3134 pub fn into_key(self) -> record::Key {
3135 match self {
3136 GetProvidersError::Timeout { key, .. } => key,
3137 }
3138 }
3139}
3140
3141/// The result of publishing a provider record.
3142pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
3143
3144/// The successful result of publishing a provider record.
3145#[derive(Debug, Clone)]
3146pub struct AddProviderOk {
3147 pub key: record::Key,
3148}
3149
3150/// The possible errors when publishing a provider record.
3151#[derive(Debug, Clone, Error)]
3152pub enum AddProviderError {
3153 #[error("the request timed out")]
3154 Timeout { key: record::Key },
3155}
3156
3157impl AddProviderError {
3158 /// Gets the key for which the operation failed.
3159 pub fn key(&self) -> &record::Key {
3160 match self {
3161 AddProviderError::Timeout { key, .. } => key,
3162 }
3163 }
3164
3165 /// Extracts the key for which the operation failed,
3166 pub fn into_key(self) -> record::Key {
3167 match self {
3168 AddProviderError::Timeout { key, .. } => key,
3169 }
3170 }
3171}
3172
3173impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
3174 fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
3175 KadPeer {
3176 node_id: e.node.key.into_preimage(),
3177 multiaddrs: e.node.value.into_vec(),
3178 connection_ty: match e.status {
3179 NodeStatus::Connected => ConnectionType::Connected,
3180 NodeStatus::Disconnected => ConnectionType::NotConnected,
3181 },
3182 }
3183 }
3184}
3185
3186/// The context of a [`QueryInfo::AddProvider`] query.
3187#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3188pub enum AddProviderContext {
3189 /// The context is a [`Behaviour::start_providing`] operation.
3190 Publish,
3191 /// The context is periodic republishing of provider announcements
3192 /// initiated earlier via [`Behaviour::start_providing`].
3193 Republish,
3194}
3195
3196/// The context of a [`QueryInfo::PutRecord`] query.
3197#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3198pub enum PutRecordContext {
3199 /// The context is a [`Behaviour::put_record`] operation.
3200 Publish,
3201 /// The context is periodic republishing of records stored
3202 /// earlier via [`Behaviour::put_record`].
3203 Republish,
3204 /// The context is periodic replication (i.e. without extending
3205 /// the record TTL) of stored records received earlier from another peer.
3206 Replicate,
3207 /// The context is a custom store operation targeting specific
3208 /// peers initiated by [`Behaviour::put_record_to`].
3209 Custom,
3210}
3211
3212/// Information about a running query.
3213#[derive(Debug, Clone)]
3214pub enum QueryInfo {
3215 /// A query initiated by [`Behaviour::bootstrap`].
3216 Bootstrap {
3217 /// The targeted peer ID.
3218 peer: PeerId,
3219 /// The remaining random peer IDs to query, one per
3220 /// bucket that still needs refreshing.
3221 ///
3222 /// This is `None` if the initial self-lookup has not
3223 /// yet completed and `Some` with an exhausted iterator
3224 /// if bootstrapping is complete.
3225 remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
3226 step: ProgressStep,
3227 },
3228
3229 /// A (repeated) query initiated by [`Behaviour::get_closest_peers`].
3230 GetClosestPeers {
3231 /// The key being queried (the preimage).
3232 key: Vec<u8>,
3233 /// Current index of events.
3234 step: ProgressStep,
3235 /// If required, `num_results` specifies expected responding peers
3236 num_results: Option<NonZeroUsize>,
3237 },
3238
3239 /// A (repeated) query initiated by [`Behaviour::get_providers`].
3240 GetProviders {
3241 /// The key for which to search for providers.
3242 key: record::Key,
3243 /// The number of providers found so far.
3244 providers_found: usize,
3245 /// Current index of events.
3246 step: ProgressStep,
3247 },
3248
3249 /// A (repeated) query initiated by [`Behaviour::start_providing`].
3250 AddProvider {
3251 /// The record key.
3252 key: record::Key,
3253 /// The current phase of the query.
3254 phase: AddProviderPhase,
3255 /// The execution context of the query.
3256 context: AddProviderContext,
3257 },
3258
3259 /// A (repeated) query initiated by [`Behaviour::put_record`].
3260 PutRecord {
3261 record: Record,
3262 /// The expected quorum of responses w.r.t. the replication factor.
3263 quorum: NonZeroUsize,
3264 /// The current phase of the query.
3265 phase: PutRecordPhase,
3266 /// The execution context of the query.
3267 context: PutRecordContext,
3268 },
3269
3270 /// A (repeated) query initiated by [`Behaviour::get_record`].
3271 GetRecord {
3272 /// The key to look for.
3273 key: record::Key,
3274 /// Current index of events.
3275 step: ProgressStep,
3276 /// Did we find at least one record?
3277 found_a_record: bool,
3278 /// The peers closest to the `key` that were queried but did not return a record,
3279 /// i.e. the peers that are candidates for caching the record.
3280 cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
3281 },
3282}
3283
3284impl QueryInfo {
3285 /// Creates an event for a handler to issue an outgoing request in the
3286 /// context of a query.
3287 fn to_request(&self, query_id: QueryId) -> HandlerIn {
3288 match &self {
3289 QueryInfo::Bootstrap { peer, .. } => HandlerIn::FindNodeReq {
3290 key: peer.to_bytes(),
3291 query_id,
3292 },
3293 QueryInfo::GetClosestPeers { key, .. } => HandlerIn::FindNodeReq {
3294 key: key.clone(),
3295 query_id,
3296 },
3297 QueryInfo::GetProviders { key, .. } => HandlerIn::GetProvidersReq {
3298 key: key.clone(),
3299 query_id,
3300 },
3301 QueryInfo::AddProvider { key, phase, .. } => match phase {
3302 AddProviderPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3303 key: key.to_vec(),
3304 query_id,
3305 },
3306 AddProviderPhase::AddProvider {
3307 provider_id,
3308 external_addresses,
3309 ..
3310 } => HandlerIn::AddProvider {
3311 key: key.clone(),
3312 provider: crate::protocol::KadPeer {
3313 node_id: *provider_id,
3314 multiaddrs: external_addresses.clone(),
3315 connection_ty: crate::protocol::ConnectionType::Connected,
3316 },
3317 query_id,
3318 },
3319 },
3320 QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord {
3321 key: key.clone(),
3322 query_id,
3323 },
3324 QueryInfo::PutRecord { record, phase, .. } => match phase {
3325 PutRecordPhase::GetClosestPeers => HandlerIn::FindNodeReq {
3326 key: record.key.to_vec(),
3327 query_id,
3328 },
3329 PutRecordPhase::PutRecord { .. } => HandlerIn::PutRecord {
3330 record: record.clone(),
3331 query_id,
3332 },
3333 },
3334 }
3335 }
3336}
3337
3338/// The phases of a [`QueryInfo::AddProvider`] query.
3339#[derive(Debug, Clone)]
3340pub enum AddProviderPhase {
3341 /// The query is searching for the closest nodes to the record key.
3342 GetClosestPeers,
3343
3344 /// The query advertises the local node as a provider for the key to
3345 /// the closest nodes to the key.
3346 AddProvider {
3347 /// The local peer ID that is advertised as a provider.
3348 provider_id: PeerId,
3349 /// The external addresses of the provider being advertised.
3350 external_addresses: Vec<Multiaddr>,
3351 /// Query statistics from the finished `GetClosestPeers` phase.
3352 get_closest_peers_stats: QueryStats,
3353 },
3354}
3355
3356/// The phases of a [`QueryInfo::PutRecord`] query.
3357#[derive(Debug, Clone, PartialEq, Eq)]
3358pub enum PutRecordPhase {
3359 /// The query is searching for the closest nodes to the record key.
3360 GetClosestPeers,
3361
3362 /// The query is replicating the record to the closest nodes to the key.
3363 PutRecord {
3364 /// A list of peers the given record has been successfully replicated to.
3365 success: Vec<PeerId>,
3366 /// Query statistics from the finished `GetClosestPeers` phase.
3367 get_closest_peers_stats: QueryStats,
3368 },
3369}
3370
3371/// A mutable reference to a running query.
3372pub struct QueryMut<'a> {
3373 query: &'a mut Query,
3374}
3375
3376impl QueryMut<'_> {
3377 pub fn id(&self) -> QueryId {
3378 self.query.id()
3379 }
3380
3381 /// Gets information about the type and state of the query.
3382 pub fn info(&self) -> &QueryInfo {
3383 &self.query.info
3384 }
3385
3386 /// Gets execution statistics about the query.
3387 ///
3388 /// For a multi-phase query such as `put_record`, these are the
3389 /// statistics of the current phase.
3390 pub fn stats(&self) -> &QueryStats {
3391 self.query.stats()
3392 }
3393
3394 /// Finishes the query asap, without waiting for the
3395 /// regular termination conditions.
3396 pub fn finish(&mut self) {
3397 self.query.finish()
3398 }
3399}
3400
3401/// An immutable reference to a running query.
3402pub struct QueryRef<'a> {
3403 query: &'a Query,
3404}
3405
3406impl QueryRef<'_> {
3407 pub fn id(&self) -> QueryId {
3408 self.query.id()
3409 }
3410
3411 /// Gets information about the type and state of the query.
3412 pub fn info(&self) -> &QueryInfo {
3413 &self.query.info
3414 }
3415
3416 /// Gets execution statistics about the query.
3417 ///
3418 /// For a multi-phase query such as `put_record`, these are the
3419 /// statistics of the current phase.
3420 pub fn stats(&self) -> &QueryStats {
3421 self.query.stats()
3422 }
3423}
3424
3425/// An operation failed to due no known peers in the routing table.
3426#[derive(Debug, Clone)]
3427pub struct NoKnownPeers();
3428
3429impl fmt::Display for NoKnownPeers {
3430 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3431 write!(f, "No known peers.")
3432 }
3433}
3434
3435impl std::error::Error for NoKnownPeers {}
3436
3437/// The possible outcomes of [`Behaviour::add_address`].
3438#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3439pub enum RoutingUpdate {
3440 /// The given peer and address has been added to the routing
3441 /// table.
3442 Success,
3443 /// The peer and address is pending insertion into
3444 /// the routing table, if a disconnected peer fails
3445 /// to respond. If the given peer and address ends up
3446 /// in the routing table, [`Event::RoutingUpdated`]
3447 /// is eventually emitted.
3448 Pending,
3449 /// The routing table update failed, either because the
3450 /// corresponding bucket for the peer is full and the
3451 /// pending slot(s) are occupied, or because the given
3452 /// peer ID is deemed invalid (e.g. refers to the local
3453 /// peer ID).
3454 Failed,
3455}
3456
3457#[derive(PartialEq, Copy, Clone, Debug)]
3458pub enum Mode {
3459 Client,
3460 Server,
3461}
3462
3463impl fmt::Display for Mode {
3464 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3465 match self {
3466 Mode::Client => write!(f, "client"),
3467 Mode::Server => write!(f, "server"),
3468 }
3469 }
3470}
3471
3472fn to_comma_separated_list<T>(confirmed_external_addresses: &[T]) -> String
3473where
3474 T: ToString,
3475{
3476 confirmed_external_addresses
3477 .iter()
3478 .map(|addr| addr.to_string())
3479 .collect::<Vec<_>>()
3480 .join(", ")
3481}