1mod peers;
22
23use peers::PeersIterState;
24use peers::closest::{ClosestPeersIterConfig, ClosestPeersIter, disjoint::ClosestDisjointPeersIter};
25use peers::fixed::FixedPeersIter;
26
27use crate::{ALPHA_VALUE, K_VALUE};
28use crate::kbucket::{Key, KeyBytes};
29use either::Either;
30use fnv::FnvHashMap;
31use mwc_libp2p_core::PeerId;
32use std::{time::Duration, num::NonZeroUsize};
33use wasm_timer::Instant;
34
35pub struct QueryPool<TInner> {
41 next_id: usize,
42 config: QueryConfig,
43 queries: FnvHashMap<QueryId, Query<TInner>>,
44}
45
46pub enum QueryPoolState<'a, TInner> {
48 Idle,
50 Waiting(Option<(&'a mut Query<TInner>, PeerId)>),
53 Finished(Query<TInner>),
55 Timeout(Query<TInner>)
57}
58
59impl<TInner> QueryPool<TInner> {
60 pub fn new(config: QueryConfig) -> Self {
62 QueryPool {
63 next_id: 0,
64 config,
65 queries: Default::default()
66 }
67 }
68
69 pub fn config(&self) -> &QueryConfig {
71 &self.config
72 }
73
74 pub fn iter(&self) -> impl Iterator<Item = &Query<TInner>> {
76 self.queries.values()
77 }
78
79 pub fn size(&self) -> usize {
81 self.queries.len()
82 }
83
84 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query<TInner>> {
86 self.queries.values_mut()
87 }
88
89 pub fn add_fixed<I>(&mut self, peers: I, inner: TInner) -> QueryId
91 where
92 I: IntoIterator<Item = PeerId>
93 {
94 let id = self.next_query_id();
95 self.continue_fixed(id, peers, inner);
96 id
97 }
98
99 pub fn continue_fixed<I>(&mut self, id: QueryId, peers: I, inner: TInner)
103 where
104 I: IntoIterator<Item = PeerId>
105 {
106 assert!(!self.queries.contains_key(&id));
107 let parallelism = self.config.replication_factor;
108 let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
109 let query = Query::new(id, peer_iter, inner);
110 self.queries.insert(id, query);
111 }
112
113 pub fn add_iter_closest<T, I>(&mut self, target: T, peers: I, inner: TInner) -> QueryId
115 where
116 T: Into<KeyBytes> + Clone,
117 I: IntoIterator<Item = Key<PeerId>>
118 {
119 let id = self.next_query_id();
120 self.continue_iter_closest(id, target, peers, inner);
121 id
122 }
123
124 pub fn continue_iter_closest<T, I>(&mut self, id: QueryId, target: T, peers: I, inner: TInner)
126 where
127 T: Into<KeyBytes> + Clone,
128 I: IntoIterator<Item = Key<PeerId>>
129 {
130 let cfg = ClosestPeersIterConfig {
131 num_results: self.config.replication_factor,
132 parallelism: self.config.parallelism,
133 .. ClosestPeersIterConfig::default()
134 };
135
136 let peer_iter = if self.config.disjoint_query_paths {
137 QueryPeerIter::ClosestDisjoint(
138 ClosestDisjointPeersIter::with_config(cfg, target, peers),
139 )
140 } else {
141 QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
142 };
143
144 let query = Query::new(id, peer_iter, inner);
145 self.queries.insert(id, query);
146 }
147
148 fn next_query_id(&mut self) -> QueryId {
149 let id = QueryId(self.next_id);
150 self.next_id = self.next_id.wrapping_add(1);
151 id
152 }
153
154 pub fn get(&self, id: &QueryId) -> Option<&Query<TInner>> {
156 self.queries.get(id)
157 }
158
159 pub fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query<TInner>> {
161 self.queries.get_mut(id)
162 }
163
164 pub fn poll(&mut self, now: Instant) -> QueryPoolState<'_, TInner> {
166 let mut finished = None;
167 let mut timeout = None;
168 let mut waiting = None;
169
170 for (&query_id, query) in self.queries.iter_mut() {
171 query.stats.start = query.stats.start.or(Some(now));
172 match query.next(now) {
173 PeersIterState::Finished => {
174 finished = Some(query_id);
175 break
176 }
177 PeersIterState::Waiting(Some(peer_id)) => {
178 let peer = peer_id.into_owned();
179 waiting = Some((query_id, peer));
180 break
181 }
182 PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
183 let elapsed = now - query.stats.start.unwrap_or(now);
184 if elapsed >= self.config.timeout {
185 timeout = Some(query_id);
186 break
187 }
188 }
189 }
190 }
191
192 if let Some((query_id, peer_id)) = waiting {
193 let query = self.queries.get_mut(&query_id).expect("s.a.");
194 return QueryPoolState::Waiting(Some((query, peer_id)))
195 }
196
197 if let Some(query_id) = finished {
198 let mut query = self.queries.remove(&query_id).expect("s.a.");
199 query.stats.end = Some(now);
200 return QueryPoolState::Finished(query)
201 }
202
203 if let Some(query_id) = timeout {
204 let mut query = self.queries.remove(&query_id).expect("s.a.");
205 query.stats.end = Some(now);
206 return QueryPoolState::Timeout(query)
207 }
208
209 if self.queries.is_empty() {
210 QueryPoolState::Idle
211 } else {
212 QueryPoolState::Waiting(None)
213 }
214 }
215}
216
217#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
219pub struct QueryId(usize);
220
221#[derive(Debug, Clone)]
223pub struct QueryConfig {
224 pub timeout: Duration,
228
229 pub replication_factor: NonZeroUsize,
233
234 pub parallelism: NonZeroUsize,
238
239 pub disjoint_query_paths: bool,
243}
244
245impl Default for QueryConfig {
246 fn default() -> Self {
247 QueryConfig {
248 timeout: Duration::from_secs(60),
249 replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
250 parallelism: ALPHA_VALUE,
251 disjoint_query_paths: false,
252 }
253 }
254}
255
256pub struct Query<TInner> {
258 id: QueryId,
260 peer_iter: QueryPeerIter,
262 stats: QueryStats,
264 pub inner: TInner,
266}
267
268enum QueryPeerIter {
270 Closest(ClosestPeersIter),
271 ClosestDisjoint(ClosestDisjointPeersIter),
272 Fixed(FixedPeersIter)
273}
274
275impl<TInner> Query<TInner> {
276 fn new(id: QueryId, peer_iter: QueryPeerIter, inner: TInner) -> Self {
278 Query { id, inner, peer_iter, stats: QueryStats::empty() }
279 }
280
281 pub fn id(&self) -> QueryId {
283 self.id
284 }
285
286 pub fn stats(&self) -> &QueryStats {
288 &self.stats
289 }
290
291 pub fn on_failure(&mut self, peer: &PeerId) {
293 let updated = match &mut self.peer_iter {
294 QueryPeerIter::Closest(iter) => iter.on_failure(peer),
295 QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
296 QueryPeerIter::Fixed(iter) => iter.on_failure(peer)
297 };
298 if updated {
299 self.stats.failure += 1;
300 }
301 }
302
303 pub fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
307 where
308 I: IntoIterator<Item = PeerId>
309 {
310 let updated = match &mut self.peer_iter {
311 QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
312 QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
313 QueryPeerIter::Fixed(iter) => iter.on_success(peer)
314 };
315 if updated {
316 self.stats.success += 1;
317 }
318 }
319
320 pub fn is_waiting(&self, peer: &PeerId) -> bool {
322 match &self.peer_iter {
323 QueryPeerIter::Closest(iter) => iter.is_waiting(peer),
324 QueryPeerIter::ClosestDisjoint(iter) => iter.is_waiting(peer),
325 QueryPeerIter::Fixed(iter) => iter.is_waiting(peer)
326 }
327 }
328
329 fn next(&mut self, now: Instant) -> PeersIterState<'_> {
331 let state = match &mut self.peer_iter {
332 QueryPeerIter::Closest(iter) => iter.next(now),
333 QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
334 QueryPeerIter::Fixed(iter) => iter.next()
335 };
336
337 if let PeersIterState::Waiting(Some(_)) = state {
338 self.stats.requests += 1;
339 }
340
341 state
342 }
343
344 pub fn try_finish<'a, I>(&mut self, peers: I) -> bool
362 where
363 I: IntoIterator<Item = &'a PeerId>
364 {
365 match &mut self.peer_iter {
366 QueryPeerIter::Closest(iter) => { iter.finish(); true },
367 QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
368 QueryPeerIter::Fixed(iter) => { iter.finish(); true }
369 }
370 }
371
372 pub fn finish(&mut self) {
377 match &mut self.peer_iter {
378 QueryPeerIter::Closest(iter) => iter.finish(),
379 QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
380 QueryPeerIter::Fixed(iter) => iter.finish()
381 }
382 }
383
384 pub fn is_finished(&self) -> bool {
389 match &self.peer_iter {
390 QueryPeerIter::Closest(iter) => iter.is_finished(),
391 QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
392 QueryPeerIter::Fixed(iter) => iter.is_finished()
393 }
394 }
395
396 pub fn into_result(self) -> QueryResult<TInner, impl Iterator<Item = PeerId>> {
398 let peers = match self.peer_iter {
399 QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
400 QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
401 QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result())
402 };
403 QueryResult { peers, inner: self.inner, stats: self.stats }
404 }
405}
406
407pub struct QueryResult<TInner, TPeers> {
409 pub inner: TInner,
411 pub peers: TPeers,
413 pub stats: QueryStats
415}
416
417#[derive(Clone, Debug, PartialEq, Eq)]
419pub struct QueryStats {
420 requests: u32,
421 success: u32,
422 failure: u32,
423 start: Option<Instant>,
424 end: Option<Instant>
425}
426
427impl QueryStats {
428 pub fn empty() -> Self {
429 QueryStats {
430 requests: 0,
431 success: 0,
432 failure: 0,
433 start: None,
434 end: None,
435 }
436 }
437
438 pub fn num_requests(&self) -> u32 {
440 self.requests
441 }
442
443 pub fn num_successes(&self) -> u32 {
445 self.success
446 }
447
448 pub fn num_failures(&self) -> u32 {
450 self.failure
451 }
452
453 pub fn num_pending(&self) -> u32 {
458 self.requests - (self.success + self.failure)
459 }
460
461 pub fn duration(&self) -> Option<Duration> {
469 if let Some(s) = self.start {
470 if let Some(e) = self.end {
471 Some(e - s)
472 } else {
473 Some(Instant::now() - s)
474 }
475 } else {
476 None
477 }
478 }
479
480 pub fn merge(self, other: QueryStats) -> Self {
487 QueryStats {
488 requests: self.requests + other.requests,
489 success: self.success + other.success,
490 failure: self.failure + other.failure,
491 start: match (self.start, other.start) {
492 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
493 (a, b) => a.or(b)
494 },
495 end: std::cmp::max(self.end, other.end)
496 }
497 }
498}