libp2p_kad/
query.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21mod peers;
22
23use peers::PeersIterState;
24use peers::closest::{ClosestPeersIterConfig, ClosestPeersIter, disjoint::ClosestDisjointPeersIter};
25use peers::fixed::FixedPeersIter;
26
27use crate::{ALPHA_VALUE, K_VALUE};
28use crate::kbucket::{Key, KeyBytes};
29use either::Either;
30use fnv::FnvHashMap;
31use libp2p_core::PeerId;
32use std::{time::Duration, num::NonZeroUsize};
33use wasm_timer::Instant;
34
35/// Peer along with its weight
36pub struct WeightedPeer {
37    /// Kademlia key & id of the peer
38    pub peer_id: Key<PeerId>,
39    /// Weight, calculated locally
40    pub weight: u32
41}
42
43/// A `QueryPool` provides an aggregate state machine for driving `Query`s to completion.
44///
45/// Internally, a `Query` is in turn driven by an underlying `QueryPeerIter`
46/// that determines the peer selection strategy, i.e. the order in which the
47/// peers involved in the query should be contacted.
48pub struct QueryPool<TInner> {
49    next_id: usize,
50    config: QueryConfig,
51    queries: FnvHashMap<QueryId, Query<TInner>>,
52}
53
54/// The observable states emitted by [`QueryPool::poll`].
55pub enum QueryPoolState<'a, TInner> {
56    /// The pool is idle, i.e. there are no queries to process.
57    Idle,
58    /// At least one query is waiting for results. `Some(request)` indicates
59    /// that a new request is now being waited on.
60    Waiting(Option<(&'a mut Query<TInner>, PeerId)>),
61    /// A query has finished.
62    Finished(Query<TInner>),
63    /// A query has timed out.
64    Timeout(Query<TInner>)
65}
66
67impl<TInner> QueryPool<TInner> {
68    /// Creates a new `QueryPool` with the given configuration.
69    pub fn new(config: QueryConfig) -> Self {
70        QueryPool {
71            next_id: 0,
72            config,
73            queries: Default::default()
74        }
75    }
76
77    /// Gets a reference to the `QueryConfig` used by the pool.
78    pub fn config(&self) -> &QueryConfig {
79        &self.config
80    }
81
82    /// Returns an iterator over the queries in the pool.
83    pub fn iter(&self) -> impl Iterator<Item = &Query<TInner>> {
84        self.queries.values()
85    }
86
87    /// Gets the current size of the pool, i.e. the number of running queries.
88    pub fn size(&self) -> usize {
89        self.queries.len()
90    }
91
92    /// Returns an iterator that allows modifying each query in the pool.
93    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query<TInner>> {
94        self.queries.values_mut()
95    }
96
97    /// Adds a query to the pool that contacts a fixed set of peers.
98    pub fn add_fixed<I>(&mut self, peers: I, inner: TInner) -> QueryId
99    where
100        I: IntoIterator<Item = WeightedPeer>
101    {
102        let id = self.next_query_id();
103        self.continue_fixed(id, peers, inner);
104        id
105    }
106
107    /// Continues an earlier query with a fixed set of peers, reusing
108    /// the given query ID, which must be from a query that finished
109    /// earlier.
110    pub fn continue_fixed<I>(&mut self, id: QueryId, peers: I, inner: TInner)
111    where
112        I: IntoIterator<Item = WeightedPeer>
113    {
114        assert!(!self.queries.contains_key(&id));
115        // TODO: why not alpha?
116        let parallelism = self.config.replication_factor;
117
118        let (swamp, weighted) = peers.into_iter().partition::<Vec<_>, _>(|p| p.weight == 0);
119        let swamp = swamp.into_iter().map(|p| p.peer_id.into_preimage());
120        let weighted = weighted.into_iter().map(|p| p.peer_id.into_preimage());
121
122        let weighted_iter = QueryPeerIter::Fixed(FixedPeersIter::new(weighted, parallelism));
123        let swamp_iter = QueryPeerIter::Fixed(FixedPeersIter::new(swamp, parallelism));
124        let query = Query::new(id, weighted_iter, swamp_iter, inner);
125        self.queries.insert(id, query);
126    }
127
128    /// Adds a query to the pool that iterates towards the closest peers to the target.
129    pub fn add_iter_closest<T, I>(&mut self, target: T, peers: I, inner: TInner) -> QueryId
130    where
131        T: Into<KeyBytes> + Clone,
132        I: IntoIterator<Item = WeightedPeer>
133    {
134        let id = self.next_query_id();
135        self.continue_iter_closest(id, target, peers, inner);
136        id
137    }
138
139    /// Adds a query to the pool that iterates towards the closest peers to the target.
140    pub fn continue_iter_closest<T, I>(&mut self, id: QueryId, target: T, peers: I, inner: TInner)
141    where
142        T: Into<KeyBytes> + Clone,
143        I: IntoIterator<Item = WeightedPeer>
144    {
145        let cfg = ClosestPeersIterConfig {
146            num_results: self.config.replication_factor,
147            parallelism: self.config.parallelism,
148            .. ClosestPeersIterConfig::default()
149        };
150
151        let (swamp, weighted) = peers.into_iter().partition::<Vec<_>, _>(|p| p.weight == 0);
152        let swamp = swamp.into_iter().map(|p| p.peer_id);
153        let weighted = weighted.into_iter().map(|p| p.peer_id);
154
155        let weighted_iter = if self.config.disjoint_query_paths {
156            QueryPeerIter::ClosestDisjoint(
157                ClosestDisjointPeersIter::with_config(cfg.clone(), target.clone(), weighted),
158            )
159        } else {
160            QueryPeerIter::Closest(
161                ClosestPeersIter::with_config(cfg.clone(), target.clone(), weighted)
162            )
163        };
164
165        let swamp_iter = if self.config.disjoint_query_paths {
166            QueryPeerIter::ClosestDisjoint(
167                ClosestDisjointPeersIter::with_config(cfg, target, swamp),
168            )
169        } else {
170            QueryPeerIter::Closest(
171                ClosestPeersIter::with_config(cfg, target, swamp)
172            )
173        };
174
175        let query = Query::new(id, weighted_iter, swamp_iter, inner);
176        self.queries.insert(id, query);
177    }
178
179    fn next_query_id(&mut self) -> QueryId {
180        let id = QueryId(self.next_id);
181        self.next_id = self.next_id.wrapping_add(1);
182        id
183    }
184
185    /// Returns a reference to a query with the given ID, if it is in the pool.
186    pub fn get(&self, id: &QueryId) -> Option<&Query<TInner>> {
187        self.queries.get(id)
188    }
189
190    /// Returns a mutablereference to a query with the given ID, if it is in the pool.
191    pub fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query<TInner>> {
192        self.queries.get_mut(id)
193    }
194
195    /// Polls the pool to advance the queries.
196    pub fn poll(&mut self, now: Instant) -> QueryPoolState<'_, TInner> {
197        let mut finished = None;
198        let mut timeout = None;
199        let mut waiting = None;
200
201        for (&query_id, query) in self.queries.iter_mut() {
202            query.stats.start = query.stats.start.or(Some(now));
203            match query.next(now) {
204                PeersIterState::Finished => {
205                    finished = Some(query_id);
206                    break
207                }
208                PeersIterState::Waiting(Some(peer_id)) => {
209                    let peer = peer_id.into_owned();
210                    waiting = Some((query_id, peer));
211                    break
212                }
213                PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
214                    let elapsed = now - query.stats.start.unwrap_or(now);
215                    if elapsed >= self.config.timeout {
216                        timeout = Some(query_id);
217                        break
218                    }
219                }
220            }
221        }
222
223        if let Some((query_id, peer_id)) = waiting {
224            let query = self.queries.get_mut(&query_id).expect("s.a.");
225            return QueryPoolState::Waiting(Some((query, peer_id)))
226        }
227
228        if let Some(query_id) = finished {
229            let mut query = self.queries.remove(&query_id).expect("s.a.");
230            query.stats.end = Some(now);
231            return QueryPoolState::Finished(query)
232        }
233
234        if let Some(query_id) = timeout {
235            let mut query = self.queries.remove(&query_id).expect("s.a.");
236            query.stats.end = Some(now);
237            return QueryPoolState::Timeout(query)
238        }
239
240        if self.queries.is_empty() {
241            QueryPoolState::Idle
242        } else {
243            QueryPoolState::Waiting(None)
244        }
245    }
246}
247
248/// Unique identifier for an active query.
249#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
250pub struct QueryId(usize);
251
252/// The configuration for queries in a `QueryPool`.
253#[derive(Debug, Clone)]
254pub struct QueryConfig {
255    /// Timeout of a single query.
256    ///
257    /// See [`crate::behaviour::KademliaConfig::set_query_timeout`] for details.
258    pub timeout: Duration,
259
260    /// The replication factor to use.
261    ///
262    /// See [`crate::behaviour::KademliaConfig::set_replication_factor`] for details.
263    pub replication_factor: NonZeroUsize,
264
265    /// Allowed level of parallelism for iterative queries.
266    ///
267    /// See [`crate::behaviour::KademliaConfig::set_parallelism`] for details.
268    pub parallelism: NonZeroUsize,
269
270    /// Whether to use disjoint paths on iterative lookups.
271    ///
272    /// See [`crate::behaviour::KademliaConfig::disjoint_query_paths`] for details.
273    pub disjoint_query_paths: bool,
274}
275
276impl Default for QueryConfig {
277    fn default() -> Self {
278        QueryConfig {
279            timeout: Duration::from_secs(60),
280            replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
281            parallelism: ALPHA_VALUE,
282            disjoint_query_paths: false,
283        }
284    }
285}
286
287/// A query in a `QueryPool`.
288pub struct Query<TInner> {
289    /// The unique ID of the query.
290    id: QueryId,
291    /// The peer iterator that drives the query state.
292    weighted_iter: QueryPeerIter,
293    /// The peer iterator that drives the query state.
294    swamp_iter: QueryPeerIter,
295    /// Execution statistics of the query.
296    stats: QueryStats,
297    /// The opaque inner query state.
298    pub inner: TInner,
299}
300
301/// The peer selection strategies that can be used by queries.
302enum QueryPeerIter {
303    Closest(ClosestPeersIter),
304    ClosestDisjoint(ClosestDisjointPeersIter),
305    Fixed(FixedPeersIter)
306}
307
308impl<TInner> Query<TInner> {
309    /// Creates a new query without starting it.
310    fn new(id: QueryId, weighted_iter: QueryPeerIter, swamp_iter: QueryPeerIter, inner: TInner) -> Self {
311        Query { id, inner, weighted_iter, swamp_iter, stats: QueryStats::empty() }
312    }
313
314    /// Gets the unique ID of the query.
315    pub fn id(&self) -> QueryId {
316        self.id
317    }
318
319    /// Gets the current execution statistics of the query.
320    pub fn stats(&self) -> &QueryStats {
321        &self.stats
322    }
323
324    /// Informs the query that the attempt to contact `peer` failed.
325    pub fn on_failure(&mut self, peer: &PeerId) {
326        let updated_swamp = match &mut self.swamp_iter {
327            QueryPeerIter::Closest(iter) => iter.on_failure(peer),
328            QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
329            QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
330        };
331
332        let updated_weighted = match &mut self.weighted_iter {
333            QueryPeerIter::Closest(iter) => iter.on_failure(peer),
334            QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
335            QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
336        };
337
338        if updated_swamp || updated_weighted {
339            self.stats.failure += 1;
340        }
341    }
342
343    /// Informs the query that the attempt to contact `peer` succeeded,
344    /// possibly resulting in new peers that should be incorporated into
345    /// the query, if applicable.
346    pub fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
347    where
348        I: IntoIterator<Item = WeightedPeer>
349    {
350        let (swamp, weighted) = new_peers.into_iter().partition::<Vec<_>, _>(|p| p.weight == 0);
351        let swamp = swamp.into_iter().map(|p| p.peer_id);
352        let weighted = weighted.into_iter().map(|p| p.peer_id);
353
354
355        let updated_swamp = match &mut self.swamp_iter {
356            QueryPeerIter::Closest(iter) => iter.on_success(peer, swamp),
357            QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, swamp),
358            QueryPeerIter::Fixed(iter) => iter.on_success(peer),
359        };
360
361        let updated_weighted = match &mut self.weighted_iter {
362            QueryPeerIter::Closest(iter) => iter.on_success(peer, weighted),
363            QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, weighted),
364            QueryPeerIter::Fixed(iter) => iter.on_success(peer),
365        };
366
367        if updated_swamp || updated_weighted {
368            self.stats.success += 1;
369        }
370    }
371
372    /// Checks whether the query is currently waiting for a result from `peer`.
373    pub fn is_waiting(&self, peer: &PeerId) -> bool {
374        let weighted_waiting = match &self.weighted_iter {
375            QueryPeerIter::Closest(iter) => iter.is_waiting(peer),
376            QueryPeerIter::ClosestDisjoint(iter) => iter.is_waiting(peer),
377            QueryPeerIter::Fixed(iter) => iter.is_waiting(peer)
378        };
379
380        let swamp_waiting = match &self.swamp_iter {
381            QueryPeerIter::Closest(iter) => iter.is_waiting(peer),
382            QueryPeerIter::ClosestDisjoint(iter) => iter.is_waiting(peer),
383            QueryPeerIter::Fixed(iter) => iter.is_waiting(peer)
384        };
385
386        debug_assert_ne!(weighted_waiting, swamp_waiting);
387
388        weighted_waiting || swamp_waiting
389    }
390
391    /// Advances the state of the underlying peer iterator.
392    fn next(&mut self, now: Instant) -> PeersIterState<'_> {
393        use PeersIterState::*;
394
395        // First query weighted iter
396        let weighted_state = match &mut self.weighted_iter {
397            QueryPeerIter::Closest(iter) => iter.next(now),
398            QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
399            QueryPeerIter::Fixed(iter) => iter.next()
400        };
401
402        // If there's a new weighted peer to send rpc to, return it
403        if let Waiting(Some(_)) = weighted_state {
404            self.stats.requests += 1;
405            return weighted_state;
406        }
407
408        // If there was no new weighted peer, check swamp
409        let swamp_state = match &mut self.swamp_iter {
410            QueryPeerIter::Closest(iter) => iter.next(now),
411            QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
412            QueryPeerIter::Fixed(iter) => iter.next()
413        };
414
415        // If there's a new swamp peer to send rpc to, return it
416        if let Waiting(Some(_)) = swamp_state {
417            self.stats.requests += 1;
418            return swamp_state;
419        }
420
421        // Return remaining state: weighted has higher priority
422        match (weighted_state, swamp_state) {
423            // If weighted finished, return swamp state
424            (Finished, swamp) => swamp,
425            // Otherwise, return weighted state first
426            (weighted, _) => weighted
427        }
428    }
429
430    /// Tries to (gracefully) finish the query prematurely, providing the peers
431    /// that are no longer of interest for further progress of the query.
432    ///
433    /// A query may require that in order to finish gracefully a certain subset
434    /// of peers must be contacted. E.g. in the case of disjoint query paths a
435    /// query may only finish gracefully if every path contacted a peer whose
436    /// response permits termination of the query. The given peers are those for
437    /// which this is considered to be the case, i.e. for which a termination
438    /// condition is satisfied.
439    ///
440    /// Returns `true` if the query did indeed finish, `false` otherwise. In the
441    /// latter case, a new attempt at finishing the query may be made with new
442    /// `peers`.
443    ///
444    /// A finished query immediately stops yielding new peers to contact and
445    /// will be reported by [`QueryPool::poll`] via
446    /// [`QueryPoolState::Finished`].
447    pub fn try_finish<'a, I>(&mut self, peers: I) -> bool
448    where
449        I: IntoIterator<Item = &'a PeerId> + Clone
450    {
451        let weighted = match &mut self.weighted_iter {
452            QueryPeerIter::Closest(iter) => { iter.finish(); true },
453            QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers.clone()),
454            QueryPeerIter::Fixed(iter) => { iter.finish(); true },
455        };
456
457        let swamp = match &mut self.swamp_iter {
458            QueryPeerIter::Closest(iter) => { iter.finish(); true },
459            QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
460            QueryPeerIter::Fixed(iter) => { iter.finish(); true },
461        };
462
463        weighted && swamp
464    }
465
466    /// Finishes the query prematurely.
467    ///
468    /// A finished query immediately stops yielding new peers to contact and will be
469    /// reported by [`QueryPool::poll`] via [`QueryPoolState::Finished`].
470    pub fn finish(&mut self) {
471        match &mut self.weighted_iter {
472            QueryPeerIter::Closest(iter) => iter.finish(),
473            QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
474            QueryPeerIter::Fixed(iter) => iter.finish()
475        };
476
477        match &mut self.swamp_iter {
478            QueryPeerIter::Closest(iter) => iter.finish(),
479            QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
480            QueryPeerIter::Fixed(iter) => iter.finish()
481        };
482    }
483
484    /// Checks whether the query has finished.
485    ///
486    /// A finished query is eventually reported by `QueryPool::next()` and
487    /// removed from the pool.
488    pub fn is_finished(&self) -> bool {
489        let weighted_finished = match &self.weighted_iter {
490            QueryPeerIter::Closest(iter) => iter.is_finished(),
491            QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
492            QueryPeerIter::Fixed(iter) => iter.is_finished()
493        };
494
495        let swamp_finished = match &self.swamp_iter {
496            QueryPeerIter::Closest(iter) => iter.is_finished(),
497            QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
498            QueryPeerIter::Fixed(iter) => iter.is_finished()
499        };
500
501        weighted_finished && swamp_finished
502    }
503
504    /// Consumes the query, producing the final `QueryResult`.
505    pub fn into_result(self) -> QueryResult<TInner, impl Iterator<Item = PeerId>> {
506        fn into_result_(iter: QueryPeerIter) -> impl Iterator<Item = PeerId> {
507            match iter {
508                QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
509                QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
510                QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result())
511            }
512        }
513
514        let weighted = into_result_(self.weighted_iter);
515        let swamp = into_result_(self.swamp_iter);
516        let peers = weighted.chain(swamp);
517        QueryResult { peers, inner: self.inner, stats: self.stats }
518    }
519}
520
521/// The result of a `Query`.
522pub struct QueryResult<TInner, TPeers> {
523    /// The opaque inner query state.
524    pub inner: TInner,
525    /// The successfully contacted peers.
526    pub peers: TPeers,
527    /// The collected query statistics.
528    pub stats: QueryStats
529}
530
531/// Execution statistics of a query.
532#[derive(Clone, Debug, PartialEq, Eq)]
533pub struct QueryStats {
534    requests: u32,
535    success: u32,
536    failure: u32,
537    start: Option<Instant>,
538    end: Option<Instant>
539}
540
541impl QueryStats {
542    pub fn empty() -> Self {
543        QueryStats {
544            requests: 0,
545            success: 0,
546            failure: 0,
547            start: None,
548            end: None,
549        }
550    }
551
552    /// Gets the total number of requests initiated by the query.
553    pub fn num_requests(&self) -> u32 {
554        self.requests
555    }
556
557    /// Gets the number of successful requests.
558    pub fn num_successes(&self) -> u32 {
559        self.success
560    }
561
562    /// Gets the number of failed requests.
563    pub fn num_failures(&self) -> u32 {
564        self.failure
565    }
566
567    /// Gets the number of pending requests.
568    ///
569    /// > **Note**: A query can finish while still having pending
570    /// > requests, if the termination conditions are already met.
571    pub fn num_pending(&self) -> u32 {
572        self.requests - (self.success + self.failure)
573    }
574
575    /// Gets the duration of the query.
576    ///
577    /// If the query has not yet finished, the duration is measured from the
578    /// start of the query to the current instant.
579    ///
580    /// If the query did not yet start (i.e. yield the first peer to contact),
581    /// `None` is returned.
582    pub fn duration(&self) -> Option<Duration> {
583        if let Some(s) = self.start {
584            if let Some(e) = self.end {
585                Some(e - s)
586            } else {
587                Some(Instant::now() - s)
588            }
589        } else {
590            None
591        }
592    }
593
594    /// Merges these stats with the given stats of another query,
595    /// e.g. to accumulate statistics from a multi-phase query.
596    ///
597    /// Counters are merged cumulatively while the instants for
598    /// start and end of the queries are taken as the minimum and
599    /// maximum, respectively.
600    pub fn merge(self, other: QueryStats) -> Self {
601        QueryStats {
602            requests: self.requests + other.requests,
603            success: self.success + other.success,
604            failure: self.failure + other.failure,
605            start: match (self.start, other.start) {
606                (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
607                (a, b) => a.or(b)
608            },
609            end: std::cmp::max(self.end, other.end)
610        }
611    }
612}