ant_libp2p_kad/
query.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21mod peers;
22
23use std::{num::NonZeroUsize, time::Duration};
24
25use ant_libp2p_core as libp2p_core;
26
27use either::Either;
28use fnv::FnvHashMap;
29use libp2p_core::Multiaddr;
30use libp2p_identity::PeerId;
31use peers::{
32    closest::{disjoint::ClosestDisjointPeersIter, ClosestPeersIter, ClosestPeersIterConfig},
33    fixed::FixedPeersIter,
34    PeersIterState,
35};
36use smallvec::SmallVec;
37use web_time::Instant;
38
39use crate::{
40    behaviour::PeerInfo,
41    handler::HandlerIn,
42    kbucket::{Key, KeyBytes},
43    QueryInfo, ALPHA_VALUE, K_VALUE,
44};
45
46/// A `QueryPool` provides an aggregate state machine for driving `Query`s to completion.
47///
48/// Internally, a `Query` is in turn driven by an underlying `QueryPeerIter`
49/// that determines the peer selection strategy, i.e. the order in which the
50/// peers involved in the query should be contacted.
51pub(crate) struct QueryPool {
52    next_id: usize,
53    config: QueryConfig,
54    queries: FnvHashMap<QueryId, Query>,
55}
56
57/// The observable states emitted by [`QueryPool::poll`].
58pub(crate) enum QueryPoolState<'a> {
59    /// The pool is idle, i.e. there are no queries to process.
60    Idle,
61    /// At least one query is waiting for results. `Some(request)` indicates
62    /// that a new request is now being waited on.
63    Waiting(Option<(&'a mut Query, PeerId)>),
64    /// A query has finished.
65    Finished(Query),
66    /// A query has timed out.
67    Timeout(Query),
68}
69
70impl QueryPool {
71    /// Creates a new `QueryPool` with the given configuration.
72    pub(crate) fn new(config: QueryConfig) -> Self {
73        QueryPool {
74            next_id: 0,
75            config,
76            queries: Default::default(),
77        }
78    }
79
80    /// Gets a reference to the `QueryConfig` used by the pool.
81    pub(crate) fn config(&self) -> &QueryConfig {
82        &self.config
83    }
84
85    /// Returns an iterator over the queries in the pool.
86    pub(crate) fn iter(&self) -> impl Iterator<Item = &Query> {
87        self.queries.values()
88    }
89
90    /// Gets the current size of the pool, i.e. the number of running queries.
91    pub(crate) fn size(&self) -> usize {
92        self.queries.len()
93    }
94
95    /// Returns an iterator that allows modifying each query in the pool.
96    pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query> {
97        self.queries.values_mut()
98    }
99
100    /// Adds a query to the pool that contacts a fixed set of peers.
101    pub(crate) fn add_fixed<I>(&mut self, peers: I, info: QueryInfo) -> QueryId
102    where
103        I: IntoIterator<Item = PeerId>,
104    {
105        let id = self.next_query_id();
106        self.continue_fixed(id, peers, info);
107        id
108    }
109
110    /// Continues an earlier query with a fixed set of peers, reusing
111    /// the given query ID, which must be from a query that finished
112    /// earlier.
113    pub(crate) fn continue_fixed<I>(&mut self, id: QueryId, peers: I, info: QueryInfo)
114    where
115        I: IntoIterator<Item = PeerId>,
116    {
117        assert!(!self.queries.contains_key(&id));
118        let parallelism = self.config.replication_factor;
119        let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
120        let query = Query::new(id, peer_iter, info);
121        self.queries.insert(id, query);
122    }
123
124    /// Adds a query to the pool that iterates towards the closest peers to the target.
125    pub(crate) fn add_iter_closest<T, I>(&mut self, target: T, peers: I, info: QueryInfo) -> QueryId
126    where
127        T: Into<KeyBytes> + Clone,
128        I: IntoIterator<Item = Key<PeerId>>,
129    {
130        let id = self.next_query_id();
131        self.continue_iter_closest(id, target, peers, info);
132        id
133    }
134
135    /// Adds a query to the pool that iterates towards the closest peers to the target.
136    pub(crate) fn continue_iter_closest<T, I>(
137        &mut self,
138        id: QueryId,
139        target: T,
140        peers: I,
141        info: QueryInfo,
142    ) where
143        T: Into<KeyBytes> + Clone,
144        I: IntoIterator<Item = Key<PeerId>>,
145    {
146        let num_results = match info {
147            QueryInfo::GetClosestPeers {
148                num_results: Some(val),
149                ..
150            } => val,
151            _ => self.config.replication_factor,
152        };
153
154        let cfg = ClosestPeersIterConfig {
155            num_results,
156            parallelism: self.config.parallelism,
157            ..ClosestPeersIterConfig::default()
158        };
159
160        let peer_iter = if self.config.disjoint_query_paths {
161            QueryPeerIter::ClosestDisjoint(ClosestDisjointPeersIter::with_config(
162                cfg, target, peers,
163            ))
164        } else {
165            QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
166        };
167
168        let query = Query::new(id, peer_iter, info);
169        self.queries.insert(id, query);
170    }
171
172    fn next_query_id(&mut self) -> QueryId {
173        let id = QueryId(self.next_id);
174        self.next_id = self.next_id.wrapping_add(1);
175        id
176    }
177
178    /// Returns a reference to a query with the given ID, if it is in the pool.
179    pub(crate) fn get(&self, id: &QueryId) -> Option<&Query> {
180        self.queries.get(id)
181    }
182
183    /// Returns a mutablereference to a query with the given ID, if it is in the pool.
184    pub(crate) fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query> {
185        self.queries.get_mut(id)
186    }
187
188    /// Polls the pool to advance the queries.
189    pub(crate) fn poll(&mut self, now: Instant) -> QueryPoolState<'_> {
190        let mut finished = None;
191        let mut timeout = None;
192        let mut waiting = None;
193
194        for (&query_id, query) in self.queries.iter_mut() {
195            query.stats.start = query.stats.start.or(Some(now));
196            match query.next(now) {
197                PeersIterState::Finished => {
198                    finished = Some(query_id);
199                    break;
200                }
201                PeersIterState::Waiting(Some(peer_id)) => {
202                    let peer = peer_id.into_owned();
203                    waiting = Some((query_id, peer));
204                    break;
205                }
206                PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
207                    let elapsed = now - query.stats.start.unwrap_or(now);
208                    if elapsed >= self.config.timeout {
209                        timeout = Some(query_id);
210                        break;
211                    }
212                }
213            }
214        }
215
216        if let Some((query_id, peer_id)) = waiting {
217            let query = self.queries.get_mut(&query_id).expect("s.a.");
218            return QueryPoolState::Waiting(Some((query, peer_id)));
219        }
220
221        if let Some(query_id) = finished {
222            let mut query = self.queries.remove(&query_id).expect("s.a.");
223            query.stats.end = Some(now);
224            return QueryPoolState::Finished(query);
225        }
226
227        if let Some(query_id) = timeout {
228            let mut query = self.queries.remove(&query_id).expect("s.a.");
229            query.stats.end = Some(now);
230            return QueryPoolState::Timeout(query);
231        }
232
233        if self.queries.is_empty() {
234            QueryPoolState::Idle
235        } else {
236            QueryPoolState::Waiting(None)
237        }
238    }
239}
240
241/// Unique identifier for an active query.
242#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
243pub struct QueryId(usize);
244
245impl std::fmt::Display for QueryId {
246    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247        write!(f, "{}", self.0)
248    }
249}
250
251/// The configuration for queries in a `QueryPool`.
252#[derive(Debug, Clone)]
253pub(crate) struct QueryConfig {
254    /// Timeout of a single query.
255    ///
256    /// See [`crate::behaviour::Config::set_query_timeout`] for details.
257    pub(crate) timeout: Duration,
258    /// The replication factor to use.
259    ///
260    /// See [`crate::behaviour::Config::set_replication_factor`] for details.
261    pub(crate) replication_factor: NonZeroUsize,
262    /// Allowed level of parallelism for iterative queries.
263    ///
264    /// See [`crate::behaviour::Config::set_parallelism`] for details.
265    pub(crate) parallelism: NonZeroUsize,
266    /// Whether to use disjoint paths on iterative lookups.
267    ///
268    /// See [`crate::behaviour::Config::disjoint_query_paths`] for details.
269    pub(crate) disjoint_query_paths: bool,
270}
271
272impl Default for QueryConfig {
273    fn default() -> Self {
274        QueryConfig {
275            timeout: Duration::from_secs(60),
276            replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
277            parallelism: ALPHA_VALUE,
278            disjoint_query_paths: false,
279        }
280    }
281}
282
283/// A query in a `QueryPool`.
284pub(crate) struct Query {
285    /// The unique ID of the query.
286    id: QueryId,
287    /// The peer iterator that drives the query state.
288    pub(crate) peers: QueryPeers,
289    /// Execution statistics of the query.
290    pub(crate) stats: QueryStats,
291    /// The query-specific state.
292    pub(crate) info: QueryInfo,
293    /// A map of pending requests to peers.
294    ///
295    /// A request is pending if the targeted peer is not currently connected
296    /// and these requests are sent as soon as a connection to the peer is established.
297    pub(crate) pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>,
298}
299
300/// The peer iterator that drives the query state,
301pub(crate) struct QueryPeers {
302    /// Addresses of peers discovered during a query.
303    pub(crate) addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
304    /// The peer iterator that drives the query state.
305    peer_iter: QueryPeerIter,
306}
307
308impl QueryPeers {
309    /// Consumes the peers iterator, producing a final `Iterator` over the discovered `PeerId`s.
310    pub(crate) fn into_peerids_iter(self) -> impl Iterator<Item = PeerId> {
311        match self.peer_iter {
312            QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
313            QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
314            QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
315        }
316    }
317
318    /// Consumes the peers iterator, producing a final `Iterator` over the discovered `PeerId`s
319    /// with their matching `Multiaddr`s.
320    pub(crate) fn into_peerinfos_iter(mut self) -> impl Iterator<Item = PeerInfo> {
321        match self.peer_iter {
322            QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
323            QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
324            QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
325        }
326        .map(move |peer_id| {
327            let addrs = self.addresses.remove(&peer_id).unwrap_or_default().to_vec();
328            PeerInfo { peer_id, addrs }
329        })
330    }
331}
332
333/// The peer selection strategies that can be used by queries.
334enum QueryPeerIter {
335    Closest(ClosestPeersIter),
336    ClosestDisjoint(ClosestDisjointPeersIter),
337    Fixed(FixedPeersIter),
338}
339
340impl Query {
341    /// Creates a new query without starting it.
342    fn new(id: QueryId, peer_iter: QueryPeerIter, info: QueryInfo) -> Self {
343        Query {
344            id,
345            info,
346            peers: QueryPeers {
347                addresses: Default::default(),
348                peer_iter,
349            },
350            pending_rpcs: SmallVec::default(),
351            stats: QueryStats::empty(),
352        }
353    }
354
355    /// Gets the unique ID of the query.
356    pub(crate) fn id(&self) -> QueryId {
357        self.id
358    }
359
360    /// Gets the current execution statistics of the query.
361    pub(crate) fn stats(&self) -> &QueryStats {
362        &self.stats
363    }
364
365    /// Informs the query that the attempt to contact `peer` failed.
366    pub(crate) fn on_failure(&mut self, peer: &PeerId) {
367        let updated = match &mut self.peers.peer_iter {
368            QueryPeerIter::Closest(iter) => iter.on_failure(peer),
369            QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
370            QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
371        };
372        if updated {
373            self.stats.failure += 1;
374        }
375    }
376
377    /// Informs the query that the attempt to contact `peer` succeeded,
378    /// possibly resulting in new peers that should be incorporated into
379    /// the query, if applicable.
380    pub(crate) fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
381    where
382        I: IntoIterator<Item = PeerId>,
383    {
384        let updated = match &mut self.peers.peer_iter {
385            QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
386            QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
387            QueryPeerIter::Fixed(iter) => iter.on_success(peer),
388        };
389        if updated {
390            self.stats.success += 1;
391        }
392    }
393
394    /// Advances the state of the underlying peer iterator.
395    fn next(&mut self, now: Instant) -> PeersIterState<'_> {
396        let state = match &mut self.peers.peer_iter {
397            QueryPeerIter::Closest(iter) => iter.next(now),
398            QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
399            QueryPeerIter::Fixed(iter) => iter.next(),
400        };
401
402        if let PeersIterState::Waiting(Some(_)) = state {
403            self.stats.requests += 1;
404        }
405
406        state
407    }
408
409    /// Tries to (gracefully) finish the query prematurely, providing the peers
410    /// that are no longer of interest for further progress of the query.
411    ///
412    /// A query may require that in order to finish gracefully a certain subset
413    /// of peers must be contacted. E.g. in the case of disjoint query paths a
414    /// query may only finish gracefully if every path contacted a peer whose
415    /// response permits termination of the query. The given peers are those for
416    /// which this is considered to be the case, i.e. for which a termination
417    /// condition is satisfied.
418    ///
419    /// Returns `true` if the query did indeed finish, `false` otherwise. In the
420    /// latter case, a new attempt at finishing the query may be made with new
421    /// `peers`.
422    ///
423    /// A finished query immediately stops yielding new peers to contact and
424    /// will be reported by [`QueryPool::poll`] via
425    /// [`QueryPoolState::Finished`].
426    pub(crate) fn try_finish<'a, I>(&mut self, peers: I) -> bool
427    where
428        I: IntoIterator<Item = &'a PeerId>,
429    {
430        match &mut self.peers.peer_iter {
431            QueryPeerIter::Closest(iter) => {
432                iter.finish();
433                true
434            }
435            QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
436            QueryPeerIter::Fixed(iter) => {
437                iter.finish();
438                true
439            }
440        }
441    }
442
443    /// Finishes the query prematurely.
444    ///
445    /// A finished query immediately stops yielding new peers to contact and will be
446    /// reported by [`QueryPool::poll`] via [`QueryPoolState::Finished`].
447    pub(crate) fn finish(&mut self) {
448        match &mut self.peers.peer_iter {
449            QueryPeerIter::Closest(iter) => iter.finish(),
450            QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
451            QueryPeerIter::Fixed(iter) => iter.finish(),
452        }
453    }
454
455    /// Checks whether the query has finished.
456    ///
457    /// A finished query is eventually reported by `QueryPool::next()` and
458    /// removed from the pool.
459    pub(crate) fn is_finished(&self) -> bool {
460        match &self.peers.peer_iter {
461            QueryPeerIter::Closest(iter) => iter.is_finished(),
462            QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
463            QueryPeerIter::Fixed(iter) => iter.is_finished(),
464        }
465    }
466}
467
468/// Execution statistics of a query.
469#[derive(Clone, Debug, PartialEq, Eq)]
470pub struct QueryStats {
471    requests: u32,
472    success: u32,
473    failure: u32,
474    start: Option<Instant>,
475    end: Option<Instant>,
476}
477
478impl QueryStats {
479    pub fn empty() -> Self {
480        QueryStats {
481            requests: 0,
482            success: 0,
483            failure: 0,
484            start: None,
485            end: None,
486        }
487    }
488
489    /// Gets the total number of requests initiated by the query.
490    pub fn num_requests(&self) -> u32 {
491        self.requests
492    }
493
494    /// Gets the number of successful requests.
495    pub fn num_successes(&self) -> u32 {
496        self.success
497    }
498
499    /// Gets the number of failed requests.
500    pub fn num_failures(&self) -> u32 {
501        self.failure
502    }
503
504    /// Gets the number of pending requests.
505    ///
506    /// > **Note**: A query can finish while still having pending
507    /// > requests, if the termination conditions are already met.
508    pub fn num_pending(&self) -> u32 {
509        self.requests - (self.success + self.failure)
510    }
511
512    /// Gets the duration of the query.
513    ///
514    /// If the query has not yet finished, the duration is measured from the
515    /// start of the query to the current instant.
516    ///
517    /// If the query did not yet start (i.e. yield the first peer to contact),
518    /// `None` is returned.
519    pub fn duration(&self) -> Option<Duration> {
520        if let Some(s) = self.start {
521            if let Some(e) = self.end {
522                Some(e - s)
523            } else {
524                Some(Instant::now() - s)
525            }
526        } else {
527            None
528        }
529    }
530
531    /// Merges these stats with the given stats of another query,
532    /// e.g. to accumulate statistics from a multi-phase query.
533    ///
534    /// Counters are merged cumulatively while the instants for
535    /// start and end of the queries are taken as the minimum and
536    /// maximum, respectively.
537    pub fn merge(self, other: QueryStats) -> Self {
538        QueryStats {
539            requests: self.requests + other.requests,
540            success: self.success + other.success,
541            failure: self.failure + other.failure,
542            start: match (self.start, other.start) {
543                (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
544                (a, b) => a.or(b),
545            },
546            end: std::cmp::max(self.end, other.end),
547        }
548    }
549}