1mod peers;
22
23use peers::PeersIterState;
24use peers::closest::{ClosestPeersIterConfig, ClosestPeersIter, disjoint::ClosestDisjointPeersIter};
25use peers::fixed::FixedPeersIter;
26
27use crate::{ALPHA_VALUE, K_VALUE};
28use crate::kbucket::{Key, KeyBytes};
29use either::Either;
30use fnv::FnvHashMap;
31use libp2p_core::PeerId;
32use std::{time::Duration, num::NonZeroUsize};
33use wasm_timer::Instant;
34
35pub struct WeightedPeer {
37 pub peer_id: Key<PeerId>,
39 pub weight: u32
41}
42
43pub struct QueryPool<TInner> {
49 next_id: usize,
50 config: QueryConfig,
51 queries: FnvHashMap<QueryId, Query<TInner>>,
52}
53
54pub enum QueryPoolState<'a, TInner> {
56 Idle,
58 Waiting(Option<(&'a mut Query<TInner>, PeerId)>),
61 Finished(Query<TInner>),
63 Timeout(Query<TInner>)
65}
66
67impl<TInner> QueryPool<TInner> {
68 pub fn new(config: QueryConfig) -> Self {
70 QueryPool {
71 next_id: 0,
72 config,
73 queries: Default::default()
74 }
75 }
76
77 pub fn config(&self) -> &QueryConfig {
79 &self.config
80 }
81
82 pub fn iter(&self) -> impl Iterator<Item = &Query<TInner>> {
84 self.queries.values()
85 }
86
87 pub fn size(&self) -> usize {
89 self.queries.len()
90 }
91
92 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query<TInner>> {
94 self.queries.values_mut()
95 }
96
97 pub fn add_fixed<I>(&mut self, peers: I, inner: TInner) -> QueryId
99 where
100 I: IntoIterator<Item = WeightedPeer>
101 {
102 let id = self.next_query_id();
103 self.continue_fixed(id, peers, inner);
104 id
105 }
106
107 pub fn continue_fixed<I>(&mut self, id: QueryId, peers: I, inner: TInner)
111 where
112 I: IntoIterator<Item = WeightedPeer>
113 {
114 assert!(!self.queries.contains_key(&id));
115 let parallelism = self.config.replication_factor;
117
118 let (swamp, weighted) = peers.into_iter().partition::<Vec<_>, _>(|p| p.weight == 0);
119 let swamp = swamp.into_iter().map(|p| p.peer_id.into_preimage());
120 let weighted = weighted.into_iter().map(|p| p.peer_id.into_preimage());
121
122 let weighted_iter = QueryPeerIter::Fixed(FixedPeersIter::new(weighted, parallelism));
123 let swamp_iter = QueryPeerIter::Fixed(FixedPeersIter::new(swamp, parallelism));
124 let query = Query::new(id, weighted_iter, swamp_iter, inner);
125 self.queries.insert(id, query);
126 }
127
128 pub fn add_iter_closest<T, I>(&mut self, target: T, peers: I, inner: TInner) -> QueryId
130 where
131 T: Into<KeyBytes> + Clone,
132 I: IntoIterator<Item = WeightedPeer>
133 {
134 let id = self.next_query_id();
135 self.continue_iter_closest(id, target, peers, inner);
136 id
137 }
138
139 pub fn continue_iter_closest<T, I>(&mut self, id: QueryId, target: T, peers: I, inner: TInner)
141 where
142 T: Into<KeyBytes> + Clone,
143 I: IntoIterator<Item = WeightedPeer>
144 {
145 let cfg = ClosestPeersIterConfig {
146 num_results: self.config.replication_factor,
147 parallelism: self.config.parallelism,
148 .. ClosestPeersIterConfig::default()
149 };
150
151 let (swamp, weighted) = peers.into_iter().partition::<Vec<_>, _>(|p| p.weight == 0);
152 let swamp = swamp.into_iter().map(|p| p.peer_id);
153 let weighted = weighted.into_iter().map(|p| p.peer_id);
154
155 let weighted_iter = if self.config.disjoint_query_paths {
156 QueryPeerIter::ClosestDisjoint(
157 ClosestDisjointPeersIter::with_config(cfg.clone(), target.clone(), weighted),
158 )
159 } else {
160 QueryPeerIter::Closest(
161 ClosestPeersIter::with_config(cfg.clone(), target.clone(), weighted)
162 )
163 };
164
165 let swamp_iter = if self.config.disjoint_query_paths {
166 QueryPeerIter::ClosestDisjoint(
167 ClosestDisjointPeersIter::with_config(cfg, target, swamp),
168 )
169 } else {
170 QueryPeerIter::Closest(
171 ClosestPeersIter::with_config(cfg, target, swamp)
172 )
173 };
174
175 let query = Query::new(id, weighted_iter, swamp_iter, inner);
176 self.queries.insert(id, query);
177 }
178
179 fn next_query_id(&mut self) -> QueryId {
180 let id = QueryId(self.next_id);
181 self.next_id = self.next_id.wrapping_add(1);
182 id
183 }
184
185 pub fn get(&self, id: &QueryId) -> Option<&Query<TInner>> {
187 self.queries.get(id)
188 }
189
190 pub fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query<TInner>> {
192 self.queries.get_mut(id)
193 }
194
195 pub fn poll(&mut self, now: Instant) -> QueryPoolState<'_, TInner> {
197 let mut finished = None;
198 let mut timeout = None;
199 let mut waiting = None;
200
201 for (&query_id, query) in self.queries.iter_mut() {
202 query.stats.start = query.stats.start.or(Some(now));
203 match query.next(now) {
204 PeersIterState::Finished => {
205 finished = Some(query_id);
206 break
207 }
208 PeersIterState::Waiting(Some(peer_id)) => {
209 let peer = peer_id.into_owned();
210 waiting = Some((query_id, peer));
211 break
212 }
213 PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
214 let elapsed = now - query.stats.start.unwrap_or(now);
215 if elapsed >= self.config.timeout {
216 timeout = Some(query_id);
217 break
218 }
219 }
220 }
221 }
222
223 if let Some((query_id, peer_id)) = waiting {
224 let query = self.queries.get_mut(&query_id).expect("s.a.");
225 return QueryPoolState::Waiting(Some((query, peer_id)))
226 }
227
228 if let Some(query_id) = finished {
229 let mut query = self.queries.remove(&query_id).expect("s.a.");
230 query.stats.end = Some(now);
231 return QueryPoolState::Finished(query)
232 }
233
234 if let Some(query_id) = timeout {
235 let mut query = self.queries.remove(&query_id).expect("s.a.");
236 query.stats.end = Some(now);
237 return QueryPoolState::Timeout(query)
238 }
239
240 if self.queries.is_empty() {
241 QueryPoolState::Idle
242 } else {
243 QueryPoolState::Waiting(None)
244 }
245 }
246}
247
248#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
250pub struct QueryId(usize);
251
252#[derive(Debug, Clone)]
254pub struct QueryConfig {
255 pub timeout: Duration,
259
260 pub replication_factor: NonZeroUsize,
264
265 pub parallelism: NonZeroUsize,
269
270 pub disjoint_query_paths: bool,
274}
275
276impl Default for QueryConfig {
277 fn default() -> Self {
278 QueryConfig {
279 timeout: Duration::from_secs(60),
280 replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
281 parallelism: ALPHA_VALUE,
282 disjoint_query_paths: false,
283 }
284 }
285}
286
287pub struct Query<TInner> {
289 id: QueryId,
291 weighted_iter: QueryPeerIter,
293 swamp_iter: QueryPeerIter,
295 stats: QueryStats,
297 pub inner: TInner,
299}
300
301enum QueryPeerIter {
303 Closest(ClosestPeersIter),
304 ClosestDisjoint(ClosestDisjointPeersIter),
305 Fixed(FixedPeersIter)
306}
307
308impl<TInner> Query<TInner> {
309 fn new(id: QueryId, weighted_iter: QueryPeerIter, swamp_iter: QueryPeerIter, inner: TInner) -> Self {
311 Query { id, inner, weighted_iter, swamp_iter, stats: QueryStats::empty() }
312 }
313
314 pub fn id(&self) -> QueryId {
316 self.id
317 }
318
319 pub fn stats(&self) -> &QueryStats {
321 &self.stats
322 }
323
324 pub fn on_failure(&mut self, peer: &PeerId) {
326 let updated_swamp = match &mut self.swamp_iter {
327 QueryPeerIter::Closest(iter) => iter.on_failure(peer),
328 QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
329 QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
330 };
331
332 let updated_weighted = match &mut self.weighted_iter {
333 QueryPeerIter::Closest(iter) => iter.on_failure(peer),
334 QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
335 QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
336 };
337
338 if updated_swamp || updated_weighted {
339 self.stats.failure += 1;
340 }
341 }
342
343 pub fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
347 where
348 I: IntoIterator<Item = WeightedPeer>
349 {
350 let (swamp, weighted) = new_peers.into_iter().partition::<Vec<_>, _>(|p| p.weight == 0);
351 let swamp = swamp.into_iter().map(|p| p.peer_id);
352 let weighted = weighted.into_iter().map(|p| p.peer_id);
353
354
355 let updated_swamp = match &mut self.swamp_iter {
356 QueryPeerIter::Closest(iter) => iter.on_success(peer, swamp),
357 QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, swamp),
358 QueryPeerIter::Fixed(iter) => iter.on_success(peer),
359 };
360
361 let updated_weighted = match &mut self.weighted_iter {
362 QueryPeerIter::Closest(iter) => iter.on_success(peer, weighted),
363 QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, weighted),
364 QueryPeerIter::Fixed(iter) => iter.on_success(peer),
365 };
366
367 if updated_swamp || updated_weighted {
368 self.stats.success += 1;
369 }
370 }
371
372 pub fn is_waiting(&self, peer: &PeerId) -> bool {
374 let weighted_waiting = match &self.weighted_iter {
375 QueryPeerIter::Closest(iter) => iter.is_waiting(peer),
376 QueryPeerIter::ClosestDisjoint(iter) => iter.is_waiting(peer),
377 QueryPeerIter::Fixed(iter) => iter.is_waiting(peer)
378 };
379
380 let swamp_waiting = match &self.swamp_iter {
381 QueryPeerIter::Closest(iter) => iter.is_waiting(peer),
382 QueryPeerIter::ClosestDisjoint(iter) => iter.is_waiting(peer),
383 QueryPeerIter::Fixed(iter) => iter.is_waiting(peer)
384 };
385
386 debug_assert_ne!(weighted_waiting, swamp_waiting);
387
388 weighted_waiting || swamp_waiting
389 }
390
391 fn next(&mut self, now: Instant) -> PeersIterState<'_> {
393 use PeersIterState::*;
394
395 let weighted_state = match &mut self.weighted_iter {
397 QueryPeerIter::Closest(iter) => iter.next(now),
398 QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
399 QueryPeerIter::Fixed(iter) => iter.next()
400 };
401
402 if let Waiting(Some(_)) = weighted_state {
404 self.stats.requests += 1;
405 return weighted_state;
406 }
407
408 let swamp_state = match &mut self.swamp_iter {
410 QueryPeerIter::Closest(iter) => iter.next(now),
411 QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
412 QueryPeerIter::Fixed(iter) => iter.next()
413 };
414
415 if let Waiting(Some(_)) = swamp_state {
417 self.stats.requests += 1;
418 return swamp_state;
419 }
420
421 match (weighted_state, swamp_state) {
423 (Finished, swamp) => swamp,
425 (weighted, _) => weighted
427 }
428 }
429
430 pub fn try_finish<'a, I>(&mut self, peers: I) -> bool
448 where
449 I: IntoIterator<Item = &'a PeerId> + Clone
450 {
451 let weighted = match &mut self.weighted_iter {
452 QueryPeerIter::Closest(iter) => { iter.finish(); true },
453 QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers.clone()),
454 QueryPeerIter::Fixed(iter) => { iter.finish(); true },
455 };
456
457 let swamp = match &mut self.swamp_iter {
458 QueryPeerIter::Closest(iter) => { iter.finish(); true },
459 QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
460 QueryPeerIter::Fixed(iter) => { iter.finish(); true },
461 };
462
463 weighted && swamp
464 }
465
466 pub fn finish(&mut self) {
471 match &mut self.weighted_iter {
472 QueryPeerIter::Closest(iter) => iter.finish(),
473 QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
474 QueryPeerIter::Fixed(iter) => iter.finish()
475 };
476
477 match &mut self.swamp_iter {
478 QueryPeerIter::Closest(iter) => iter.finish(),
479 QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
480 QueryPeerIter::Fixed(iter) => iter.finish()
481 };
482 }
483
484 pub fn is_finished(&self) -> bool {
489 let weighted_finished = match &self.weighted_iter {
490 QueryPeerIter::Closest(iter) => iter.is_finished(),
491 QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
492 QueryPeerIter::Fixed(iter) => iter.is_finished()
493 };
494
495 let swamp_finished = match &self.swamp_iter {
496 QueryPeerIter::Closest(iter) => iter.is_finished(),
497 QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
498 QueryPeerIter::Fixed(iter) => iter.is_finished()
499 };
500
501 weighted_finished && swamp_finished
502 }
503
504 pub fn into_result(self) -> QueryResult<TInner, impl Iterator<Item = PeerId>> {
506 fn into_result_(iter: QueryPeerIter) -> impl Iterator<Item = PeerId> {
507 match iter {
508 QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
509 QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
510 QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result())
511 }
512 }
513
514 let weighted = into_result_(self.weighted_iter);
515 let swamp = into_result_(self.swamp_iter);
516 let peers = weighted.chain(swamp);
517 QueryResult { peers, inner: self.inner, stats: self.stats }
518 }
519}
520
521pub struct QueryResult<TInner, TPeers> {
523 pub inner: TInner,
525 pub peers: TPeers,
527 pub stats: QueryStats
529}
530
531#[derive(Clone, Debug, PartialEq, Eq)]
533pub struct QueryStats {
534 requests: u32,
535 success: u32,
536 failure: u32,
537 start: Option<Instant>,
538 end: Option<Instant>
539}
540
541impl QueryStats {
542 pub fn empty() -> Self {
543 QueryStats {
544 requests: 0,
545 success: 0,
546 failure: 0,
547 start: None,
548 end: None,
549 }
550 }
551
552 pub fn num_requests(&self) -> u32 {
554 self.requests
555 }
556
557 pub fn num_successes(&self) -> u32 {
559 self.success
560 }
561
562 pub fn num_failures(&self) -> u32 {
564 self.failure
565 }
566
567 pub fn num_pending(&self) -> u32 {
572 self.requests - (self.success + self.failure)
573 }
574
575 pub fn duration(&self) -> Option<Duration> {
583 if let Some(s) = self.start {
584 if let Some(e) = self.end {
585 Some(e - s)
586 } else {
587 Some(Instant::now() - s)
588 }
589 } else {
590 None
591 }
592 }
593
594 pub fn merge(self, other: QueryStats) -> Self {
601 QueryStats {
602 requests: self.requests + other.requests,
603 success: self.success + other.success,
604 failure: self.failure + other.failure,
605 start: match (self.start, other.start) {
606 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
607 (a, b) => a.or(b)
608 },
609 end: std::cmp::max(self.end, other.end)
610 }
611 }
612}