cs_mwc_libp2p_kad/
query.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21mod peers;
22
23use peers::PeersIterState;
24use peers::closest::{ClosestPeersIterConfig, ClosestPeersIter, disjoint::ClosestDisjointPeersIter};
25use peers::fixed::FixedPeersIter;
26
27use crate::{ALPHA_VALUE, K_VALUE};
28use crate::kbucket::{Key, KeyBytes};
29use either::Either;
30use fnv::FnvHashMap;
31use mwc_libp2p_core::PeerId;
32use std::{time::Duration, num::NonZeroUsize};
33use wasm_timer::Instant;
34
35/// A `QueryPool` provides an aggregate state machine for driving `Query`s to completion.
36///
37/// Internally, a `Query` is in turn driven by an underlying `QueryPeerIter`
38/// that determines the peer selection strategy, i.e. the order in which the
39/// peers involved in the query should be contacted.
40pub struct QueryPool<TInner> {
41    next_id: usize,
42    config: QueryConfig,
43    queries: FnvHashMap<QueryId, Query<TInner>>,
44}
45
46/// The observable states emitted by [`QueryPool::poll`].
47pub enum QueryPoolState<'a, TInner> {
48    /// The pool is idle, i.e. there are no queries to process.
49    Idle,
50    /// At least one query is waiting for results. `Some(request)` indicates
51    /// that a new request is now being waited on.
52    Waiting(Option<(&'a mut Query<TInner>, PeerId)>),
53    /// A query has finished.
54    Finished(Query<TInner>),
55    /// A query has timed out.
56    Timeout(Query<TInner>)
57}
58
59impl<TInner> QueryPool<TInner> {
60    /// Creates a new `QueryPool` with the given configuration.
61    pub fn new(config: QueryConfig) -> Self {
62        QueryPool {
63            next_id: 0,
64            config,
65            queries: Default::default()
66        }
67    }
68
69    /// Gets a reference to the `QueryConfig` used by the pool.
70    pub fn config(&self) -> &QueryConfig {
71        &self.config
72    }
73
74    /// Returns an iterator over the queries in the pool.
75    pub fn iter(&self) -> impl Iterator<Item = &Query<TInner>> {
76        self.queries.values()
77    }
78
79    /// Gets the current size of the pool, i.e. the number of running queries.
80    pub fn size(&self) -> usize {
81        self.queries.len()
82    }
83
84    /// Returns an iterator that allows modifying each query in the pool.
85    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query<TInner>> {
86        self.queries.values_mut()
87    }
88
89    /// Adds a query to the pool that contacts a fixed set of peers.
90    pub fn add_fixed<I>(&mut self, peers: I, inner: TInner) -> QueryId
91    where
92        I: IntoIterator<Item = PeerId>
93    {
94        let id = self.next_query_id();
95        self.continue_fixed(id, peers, inner);
96        id
97    }
98
99    /// Continues an earlier query with a fixed set of peers, reusing
100    /// the given query ID, which must be from a query that finished
101    /// earlier.
102    pub fn continue_fixed<I>(&mut self, id: QueryId, peers: I, inner: TInner)
103    where
104        I: IntoIterator<Item = PeerId>
105    {
106        assert!(!self.queries.contains_key(&id));
107        let parallelism = self.config.replication_factor;
108        let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
109        let query = Query::new(id, peer_iter, inner);
110        self.queries.insert(id, query);
111    }
112
113    /// Adds a query to the pool that iterates towards the closest peers to the target.
114    pub fn add_iter_closest<T, I>(&mut self, target: T, peers: I, inner: TInner) -> QueryId
115    where
116        T: Into<KeyBytes> + Clone,
117        I: IntoIterator<Item = Key<PeerId>>
118    {
119        let id = self.next_query_id();
120        self.continue_iter_closest(id, target, peers, inner);
121        id
122    }
123
124    /// Adds a query to the pool that iterates towards the closest peers to the target.
125    pub fn continue_iter_closest<T, I>(&mut self, id: QueryId, target: T, peers: I, inner: TInner)
126    where
127        T: Into<KeyBytes> + Clone,
128        I: IntoIterator<Item = Key<PeerId>>
129    {
130        let cfg = ClosestPeersIterConfig {
131            num_results: self.config.replication_factor,
132            parallelism: self.config.parallelism,
133            .. ClosestPeersIterConfig::default()
134        };
135
136        let peer_iter = if self.config.disjoint_query_paths {
137            QueryPeerIter::ClosestDisjoint(
138                ClosestDisjointPeersIter::with_config(cfg, target, peers),
139            )
140        } else {
141            QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
142        };
143
144        let query = Query::new(id, peer_iter, inner);
145        self.queries.insert(id, query);
146    }
147
148    fn next_query_id(&mut self) -> QueryId {
149        let id = QueryId(self.next_id);
150        self.next_id = self.next_id.wrapping_add(1);
151        id
152    }
153
154    /// Returns a reference to a query with the given ID, if it is in the pool.
155    pub fn get(&self, id: &QueryId) -> Option<&Query<TInner>> {
156        self.queries.get(id)
157    }
158
159    /// Returns a mutablereference to a query with the given ID, if it is in the pool.
160    pub fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query<TInner>> {
161        self.queries.get_mut(id)
162    }
163
164    /// Polls the pool to advance the queries.
165    pub fn poll(&mut self, now: Instant) -> QueryPoolState<'_, TInner> {
166        let mut finished = None;
167        let mut timeout = None;
168        let mut waiting = None;
169
170        for (&query_id, query) in self.queries.iter_mut() {
171            query.stats.start = query.stats.start.or(Some(now));
172            match query.next(now) {
173                PeersIterState::Finished => {
174                    finished = Some(query_id);
175                    break
176                }
177                PeersIterState::Waiting(Some(peer_id)) => {
178                    let peer = peer_id.into_owned();
179                    waiting = Some((query_id, peer));
180                    break
181                }
182                PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
183                    let elapsed = now - query.stats.start.unwrap_or(now);
184                    if elapsed >= self.config.timeout {
185                        timeout = Some(query_id);
186                        break
187                    }
188                }
189            }
190        }
191
192        if let Some((query_id, peer_id)) = waiting {
193            let query = self.queries.get_mut(&query_id).expect("s.a.");
194            return QueryPoolState::Waiting(Some((query, peer_id)))
195        }
196
197        if let Some(query_id) = finished {
198            let mut query = self.queries.remove(&query_id).expect("s.a.");
199            query.stats.end = Some(now);
200            return QueryPoolState::Finished(query)
201        }
202
203        if let Some(query_id) = timeout {
204            let mut query = self.queries.remove(&query_id).expect("s.a.");
205            query.stats.end = Some(now);
206            return QueryPoolState::Timeout(query)
207        }
208
209        if self.queries.is_empty() {
210            QueryPoolState::Idle
211        } else {
212            QueryPoolState::Waiting(None)
213        }
214    }
215}
216
217/// Unique identifier for an active query.
218#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
219pub struct QueryId(usize);
220
221/// The configuration for queries in a `QueryPool`.
222#[derive(Debug, Clone)]
223pub struct QueryConfig {
224    /// Timeout of a single query.
225    ///
226    /// See [`crate::behaviour::KademliaConfig::set_query_timeout`] for details.
227    pub timeout: Duration,
228
229    /// The replication factor to use.
230    ///
231    /// See [`crate::behaviour::KademliaConfig::set_replication_factor`] for details.
232    pub replication_factor: NonZeroUsize,
233
234    /// Allowed level of parallelism for iterative queries.
235    ///
236    /// See [`crate::behaviour::KademliaConfig::set_parallelism`] for details.
237    pub parallelism: NonZeroUsize,
238
239    /// Whether to use disjoint paths on iterative lookups.
240    ///
241    /// See [`crate::behaviour::KademliaConfig::disjoint_query_paths`] for details.
242    pub disjoint_query_paths: bool,
243}
244
245impl Default for QueryConfig {
246    fn default() -> Self {
247        QueryConfig {
248            timeout: Duration::from_secs(60),
249            replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
250            parallelism: ALPHA_VALUE,
251            disjoint_query_paths: false,
252        }
253    }
254}
255
256/// A query in a `QueryPool`.
257pub struct Query<TInner> {
258    /// The unique ID of the query.
259    id: QueryId,
260    /// The peer iterator that drives the query state.
261    peer_iter: QueryPeerIter,
262    /// Execution statistics of the query.
263    stats: QueryStats,
264    /// The opaque inner query state.
265    pub inner: TInner,
266}
267
268/// The peer selection strategies that can be used by queries.
269enum QueryPeerIter {
270    Closest(ClosestPeersIter),
271    ClosestDisjoint(ClosestDisjointPeersIter),
272    Fixed(FixedPeersIter)
273}
274
275impl<TInner> Query<TInner> {
276    /// Creates a new query without starting it.
277    fn new(id: QueryId, peer_iter: QueryPeerIter, inner: TInner) -> Self {
278        Query { id, inner, peer_iter, stats: QueryStats::empty() }
279    }
280
281    /// Gets the unique ID of the query.
282    pub fn id(&self) -> QueryId {
283        self.id
284    }
285
286    /// Gets the current execution statistics of the query.
287    pub fn stats(&self) -> &QueryStats {
288        &self.stats
289    }
290
291    /// Informs the query that the attempt to contact `peer` failed.
292    pub fn on_failure(&mut self, peer: &PeerId) {
293        let updated = match &mut self.peer_iter {
294            QueryPeerIter::Closest(iter) => iter.on_failure(peer),
295            QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
296            QueryPeerIter::Fixed(iter) => iter.on_failure(peer)
297        };
298        if updated {
299            self.stats.failure += 1;
300        }
301    }
302
303    /// Informs the query that the attempt to contact `peer` succeeded,
304    /// possibly resulting in new peers that should be incorporated into
305    /// the query, if applicable.
306    pub fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
307    where
308        I: IntoIterator<Item = PeerId>
309    {
310        let updated = match &mut self.peer_iter {
311            QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
312            QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
313            QueryPeerIter::Fixed(iter) => iter.on_success(peer)
314        };
315        if updated {
316            self.stats.success += 1;
317        }
318    }
319
320    /// Checks whether the query is currently waiting for a result from `peer`.
321    pub fn is_waiting(&self, peer: &PeerId) -> bool {
322        match &self.peer_iter {
323            QueryPeerIter::Closest(iter) => iter.is_waiting(peer),
324            QueryPeerIter::ClosestDisjoint(iter) => iter.is_waiting(peer),
325            QueryPeerIter::Fixed(iter) => iter.is_waiting(peer)
326        }
327    }
328
329    /// Advances the state of the underlying peer iterator.
330    fn next(&mut self, now: Instant) -> PeersIterState<'_> {
331        let state = match &mut self.peer_iter {
332            QueryPeerIter::Closest(iter) => iter.next(now),
333            QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
334            QueryPeerIter::Fixed(iter) => iter.next()
335        };
336
337        if let PeersIterState::Waiting(Some(_)) = state {
338            self.stats.requests += 1;
339        }
340
341        state
342    }
343
344    /// Tries to (gracefully) finish the query prematurely, providing the peers
345    /// that are no longer of interest for further progress of the query.
346    ///
347    /// A query may require that in order to finish gracefully a certain subset
348    /// of peers must be contacted. E.g. in the case of disjoint query paths a
349    /// query may only finish gracefully if every path contacted a peer whose
350    /// response permits termination of the query. The given peers are those for
351    /// which this is considered to be the case, i.e. for which a termination
352    /// condition is satisfied.
353    ///
354    /// Returns `true` if the query did indeed finish, `false` otherwise. In the
355    /// latter case, a new attempt at finishing the query may be made with new
356    /// `peers`.
357    ///
358    /// A finished query immediately stops yielding new peers to contact and
359    /// will be reported by [`QueryPool::poll`] via
360    /// [`QueryPoolState::Finished`].
361    pub fn try_finish<'a, I>(&mut self, peers: I) -> bool
362    where
363        I: IntoIterator<Item = &'a PeerId>
364    {
365        match &mut self.peer_iter {
366            QueryPeerIter::Closest(iter) => { iter.finish(); true },
367            QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
368            QueryPeerIter::Fixed(iter) => { iter.finish(); true }
369        }
370    }
371
372    /// Finishes the query prematurely.
373    ///
374    /// A finished query immediately stops yielding new peers to contact and will be
375    /// reported by [`QueryPool::poll`] via [`QueryPoolState::Finished`].
376    pub fn finish(&mut self) {
377        match &mut self.peer_iter {
378            QueryPeerIter::Closest(iter) => iter.finish(),
379            QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
380            QueryPeerIter::Fixed(iter) => iter.finish()
381        }
382    }
383
384    /// Checks whether the query has finished.
385    ///
386    /// A finished query is eventually reported by `QueryPool::next()` and
387    /// removed from the pool.
388    pub fn is_finished(&self) -> bool {
389        match &self.peer_iter {
390            QueryPeerIter::Closest(iter) => iter.is_finished(),
391            QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
392            QueryPeerIter::Fixed(iter) => iter.is_finished()
393        }
394    }
395
396    /// Consumes the query, producing the final `QueryResult`.
397    pub fn into_result(self) -> QueryResult<TInner, impl Iterator<Item = PeerId>> {
398        let peers = match self.peer_iter {
399            QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
400            QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
401            QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result())
402        };
403        QueryResult { peers, inner: self.inner, stats: self.stats }
404    }
405}
406
407/// The result of a `Query`.
408pub struct QueryResult<TInner, TPeers> {
409    /// The opaque inner query state.
410    pub inner: TInner,
411    /// The successfully contacted peers.
412    pub peers: TPeers,
413    /// The collected query statistics.
414    pub stats: QueryStats
415}
416
417/// Execution statistics of a query.
418#[derive(Clone, Debug, PartialEq, Eq)]
419pub struct QueryStats {
420    requests: u32,
421    success: u32,
422    failure: u32,
423    start: Option<Instant>,
424    end: Option<Instant>
425}
426
427impl QueryStats {
428    pub fn empty() -> Self {
429        QueryStats {
430            requests: 0,
431            success: 0,
432            failure: 0,
433            start: None,
434            end: None,
435        }
436    }
437
438    /// Gets the total number of requests initiated by the query.
439    pub fn num_requests(&self) -> u32 {
440        self.requests
441    }
442
443    /// Gets the number of successful requests.
444    pub fn num_successes(&self) -> u32 {
445        self.success
446    }
447
448    /// Gets the number of failed requests.
449    pub fn num_failures(&self) -> u32 {
450        self.failure
451    }
452
453    /// Gets the number of pending requests.
454    ///
455    /// > **Note**: A query can finish while still having pending
456    /// > requests, if the termination conditions are already met.
457    pub fn num_pending(&self) -> u32 {
458        self.requests - (self.success + self.failure)
459    }
460
461    /// Gets the duration of the query.
462    ///
463    /// If the query has not yet finished, the duration is measured from the
464    /// start of the query to the current instant.
465    ///
466    /// If the query did not yet start (i.e. yield the first peer to contact),
467    /// `None` is returned.
468    pub fn duration(&self) -> Option<Duration> {
469        if let Some(s) = self.start {
470            if let Some(e) = self.end {
471                Some(e - s)
472            } else {
473                Some(Instant::now() - s)
474            }
475        } else {
476            None
477        }
478    }
479
480    /// Merges these stats with the given stats of another query,
481    /// e.g. to accumulate statistics from a multi-phase query.
482    ///
483    /// Counters are merged cumulatively while the instants for
484    /// start and end of the queries are taken as the minimum and
485    /// maximum, respectively.
486    pub fn merge(self, other: QueryStats) -> Self {
487        QueryStats {
488            requests: self.requests + other.requests,
489            success: self.success + other.success,
490            failure: self.failure + other.failure,
491            start: match (self.start, other.start) {
492                (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
493                (a, b) => a.or(b)
494            },
495            end: std::cmp::max(self.end, other.end)
496        }
497    }
498}