1mod peers;
22
23use std::{num::NonZeroUsize, time::Duration};
24
25use ant_libp2p_core as libp2p_core;
26
27use either::Either;
28use fnv::FnvHashMap;
29use libp2p_core::Multiaddr;
30use libp2p_identity::PeerId;
31use peers::{
32 closest::{disjoint::ClosestDisjointPeersIter, ClosestPeersIter, ClosestPeersIterConfig},
33 fixed::FixedPeersIter,
34 PeersIterState,
35};
36use smallvec::SmallVec;
37use web_time::Instant;
38
39use crate::{
40 behaviour::PeerInfo,
41 handler::HandlerIn,
42 kbucket::{Key, KeyBytes},
43 QueryInfo, ALPHA_VALUE, K_VALUE,
44};
45
46pub(crate) struct QueryPool {
52 next_id: usize,
53 config: QueryConfig,
54 queries: FnvHashMap<QueryId, Query>,
55}
56
57pub(crate) enum QueryPoolState<'a> {
59 Idle,
61 Waiting(Option<(&'a mut Query, PeerId)>),
64 Finished(Query),
66 Timeout(Query),
68}
69
70impl QueryPool {
71 pub(crate) fn new(config: QueryConfig) -> Self {
73 QueryPool {
74 next_id: 0,
75 config,
76 queries: Default::default(),
77 }
78 }
79
80 pub(crate) fn config(&self) -> &QueryConfig {
82 &self.config
83 }
84
85 pub(crate) fn iter(&self) -> impl Iterator<Item = &Query> {
87 self.queries.values()
88 }
89
90 pub(crate) fn size(&self) -> usize {
92 self.queries.len()
93 }
94
95 pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query> {
97 self.queries.values_mut()
98 }
99
100 pub(crate) fn add_fixed<I>(&mut self, peers: I, info: QueryInfo) -> QueryId
102 where
103 I: IntoIterator<Item = PeerId>,
104 {
105 let id = self.next_query_id();
106 self.continue_fixed(id, peers, info);
107 id
108 }
109
110 pub(crate) fn continue_fixed<I>(&mut self, id: QueryId, peers: I, info: QueryInfo)
114 where
115 I: IntoIterator<Item = PeerId>,
116 {
117 assert!(!self.queries.contains_key(&id));
118 let parallelism = self.config.replication_factor;
119 let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
120 let query = Query::new(id, peer_iter, info);
121 self.queries.insert(id, query);
122 }
123
124 pub(crate) fn add_iter_closest<T, I>(&mut self, target: T, peers: I, info: QueryInfo) -> QueryId
126 where
127 T: Into<KeyBytes> + Clone,
128 I: IntoIterator<Item = Key<PeerId>>,
129 {
130 let id = self.next_query_id();
131 self.continue_iter_closest(id, target, peers, info);
132 id
133 }
134
135 pub(crate) fn continue_iter_closest<T, I>(
137 &mut self,
138 id: QueryId,
139 target: T,
140 peers: I,
141 info: QueryInfo,
142 ) where
143 T: Into<KeyBytes> + Clone,
144 I: IntoIterator<Item = Key<PeerId>>,
145 {
146 let num_results = match info {
147 QueryInfo::GetClosestPeers {
148 num_results: Some(val),
149 ..
150 } => val,
151 _ => self.config.replication_factor,
152 };
153
154 let cfg = ClosestPeersIterConfig {
155 num_results,
156 parallelism: self.config.parallelism,
157 ..ClosestPeersIterConfig::default()
158 };
159
160 let peer_iter = if self.config.disjoint_query_paths {
161 QueryPeerIter::ClosestDisjoint(ClosestDisjointPeersIter::with_config(
162 cfg, target, peers,
163 ))
164 } else {
165 QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
166 };
167
168 let query = Query::new(id, peer_iter, info);
169 self.queries.insert(id, query);
170 }
171
172 fn next_query_id(&mut self) -> QueryId {
173 let id = QueryId(self.next_id);
174 self.next_id = self.next_id.wrapping_add(1);
175 id
176 }
177
178 pub(crate) fn get(&self, id: &QueryId) -> Option<&Query> {
180 self.queries.get(id)
181 }
182
183 pub(crate) fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query> {
185 self.queries.get_mut(id)
186 }
187
188 pub(crate) fn poll(&mut self, now: Instant) -> QueryPoolState<'_> {
190 let mut finished = None;
191 let mut timeout = None;
192 let mut waiting = None;
193
194 for (&query_id, query) in self.queries.iter_mut() {
195 query.stats.start = query.stats.start.or(Some(now));
196 match query.next(now) {
197 PeersIterState::Finished => {
198 finished = Some(query_id);
199 break;
200 }
201 PeersIterState::Waiting(Some(peer_id)) => {
202 let peer = peer_id.into_owned();
203 waiting = Some((query_id, peer));
204 break;
205 }
206 PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
207 let elapsed = now - query.stats.start.unwrap_or(now);
208 if elapsed >= self.config.timeout {
209 timeout = Some(query_id);
210 break;
211 }
212 }
213 }
214 }
215
216 if let Some((query_id, peer_id)) = waiting {
217 let query = self.queries.get_mut(&query_id).expect("s.a.");
218 return QueryPoolState::Waiting(Some((query, peer_id)));
219 }
220
221 if let Some(query_id) = finished {
222 let mut query = self.queries.remove(&query_id).expect("s.a.");
223 query.stats.end = Some(now);
224 return QueryPoolState::Finished(query);
225 }
226
227 if let Some(query_id) = timeout {
228 let mut query = self.queries.remove(&query_id).expect("s.a.");
229 query.stats.end = Some(now);
230 return QueryPoolState::Timeout(query);
231 }
232
233 if self.queries.is_empty() {
234 QueryPoolState::Idle
235 } else {
236 QueryPoolState::Waiting(None)
237 }
238 }
239}
240
241#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
243pub struct QueryId(usize);
244
245impl std::fmt::Display for QueryId {
246 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247 write!(f, "{}", self.0)
248 }
249}
250
251#[derive(Debug, Clone)]
253pub(crate) struct QueryConfig {
254 pub(crate) timeout: Duration,
258 pub(crate) replication_factor: NonZeroUsize,
262 pub(crate) parallelism: NonZeroUsize,
266 pub(crate) disjoint_query_paths: bool,
270}
271
272impl Default for QueryConfig {
273 fn default() -> Self {
274 QueryConfig {
275 timeout: Duration::from_secs(60),
276 replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
277 parallelism: ALPHA_VALUE,
278 disjoint_query_paths: false,
279 }
280 }
281}
282
283pub(crate) struct Query {
285 id: QueryId,
287 pub(crate) peers: QueryPeers,
289 pub(crate) stats: QueryStats,
291 pub(crate) info: QueryInfo,
293 pub(crate) pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>,
298}
299
300pub(crate) struct QueryPeers {
302 pub(crate) addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
304 peer_iter: QueryPeerIter,
306}
307
308impl QueryPeers {
309 pub(crate) fn into_peerids_iter(self) -> impl Iterator<Item = PeerId> {
311 match self.peer_iter {
312 QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
313 QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
314 QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
315 }
316 }
317
318 pub(crate) fn into_peerinfos_iter(mut self) -> impl Iterator<Item = PeerInfo> {
321 match self.peer_iter {
322 QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
323 QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
324 QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
325 }
326 .map(move |peer_id| {
327 let addrs = self.addresses.remove(&peer_id).unwrap_or_default().to_vec();
328 PeerInfo { peer_id, addrs }
329 })
330 }
331}
332
333enum QueryPeerIter {
335 Closest(ClosestPeersIter),
336 ClosestDisjoint(ClosestDisjointPeersIter),
337 Fixed(FixedPeersIter),
338}
339
340impl Query {
341 fn new(id: QueryId, peer_iter: QueryPeerIter, info: QueryInfo) -> Self {
343 Query {
344 id,
345 info,
346 peers: QueryPeers {
347 addresses: Default::default(),
348 peer_iter,
349 },
350 pending_rpcs: SmallVec::default(),
351 stats: QueryStats::empty(),
352 }
353 }
354
355 pub(crate) fn id(&self) -> QueryId {
357 self.id
358 }
359
360 pub(crate) fn stats(&self) -> &QueryStats {
362 &self.stats
363 }
364
365 pub(crate) fn on_failure(&mut self, peer: &PeerId) {
367 let updated = match &mut self.peers.peer_iter {
368 QueryPeerIter::Closest(iter) => iter.on_failure(peer),
369 QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
370 QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
371 };
372 if updated {
373 self.stats.failure += 1;
374 }
375 }
376
377 pub(crate) fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
381 where
382 I: IntoIterator<Item = PeerId>,
383 {
384 let updated = match &mut self.peers.peer_iter {
385 QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
386 QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
387 QueryPeerIter::Fixed(iter) => iter.on_success(peer),
388 };
389 if updated {
390 self.stats.success += 1;
391 }
392 }
393
394 fn next(&mut self, now: Instant) -> PeersIterState<'_> {
396 let state = match &mut self.peers.peer_iter {
397 QueryPeerIter::Closest(iter) => iter.next(now),
398 QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
399 QueryPeerIter::Fixed(iter) => iter.next(),
400 };
401
402 if let PeersIterState::Waiting(Some(_)) = state {
403 self.stats.requests += 1;
404 }
405
406 state
407 }
408
409 pub(crate) fn try_finish<'a, I>(&mut self, peers: I) -> bool
427 where
428 I: IntoIterator<Item = &'a PeerId>,
429 {
430 match &mut self.peers.peer_iter {
431 QueryPeerIter::Closest(iter) => {
432 iter.finish();
433 true
434 }
435 QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
436 QueryPeerIter::Fixed(iter) => {
437 iter.finish();
438 true
439 }
440 }
441 }
442
443 pub(crate) fn finish(&mut self) {
448 match &mut self.peers.peer_iter {
449 QueryPeerIter::Closest(iter) => iter.finish(),
450 QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
451 QueryPeerIter::Fixed(iter) => iter.finish(),
452 }
453 }
454
455 pub(crate) fn is_finished(&self) -> bool {
460 match &self.peers.peer_iter {
461 QueryPeerIter::Closest(iter) => iter.is_finished(),
462 QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
463 QueryPeerIter::Fixed(iter) => iter.is_finished(),
464 }
465 }
466}
467
468#[derive(Clone, Debug, PartialEq, Eq)]
470pub struct QueryStats {
471 requests: u32,
472 success: u32,
473 failure: u32,
474 start: Option<Instant>,
475 end: Option<Instant>,
476}
477
478impl QueryStats {
479 pub fn empty() -> Self {
480 QueryStats {
481 requests: 0,
482 success: 0,
483 failure: 0,
484 start: None,
485 end: None,
486 }
487 }
488
489 pub fn num_requests(&self) -> u32 {
491 self.requests
492 }
493
494 pub fn num_successes(&self) -> u32 {
496 self.success
497 }
498
499 pub fn num_failures(&self) -> u32 {
501 self.failure
502 }
503
504 pub fn num_pending(&self) -> u32 {
509 self.requests - (self.success + self.failure)
510 }
511
512 pub fn duration(&self) -> Option<Duration> {
520 if let Some(s) = self.start {
521 if let Some(e) = self.end {
522 Some(e - s)
523 } else {
524 Some(Instant::now() - s)
525 }
526 } else {
527 None
528 }
529 }
530
531 pub fn merge(self, other: QueryStats) -> Self {
538 QueryStats {
539 requests: self.requests + other.requests,
540 success: self.success + other.success,
541 failure: self.failure + other.failure,
542 start: match (self.start, other.start) {
543 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
544 (a, b) => a.or(b),
545 },
546 end: std::cmp::max(self.end, other.end),
547 }
548 }
549}