mod peers;
use std::{num::NonZeroUsize, time::Duration};
use either::Either;
use fnv::FnvHashMap;
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use peers::{
closest::{disjoint::ClosestDisjointPeersIter, ClosestPeersIter, ClosestPeersIterConfig},
fixed::FixedPeersIter,
PeersIterState,
};
use smallvec::SmallVec;
use web_time::Instant;
use crate::{
behaviour::PeerInfo,
handler::HandlerIn,
kbucket::{Key, KeyBytes},
QueryInfo, ALPHA_VALUE, K_VALUE,
};
pub(crate) struct QueryPool {
next_id: usize,
config: QueryConfig,
queries: FnvHashMap<QueryId, Query>,
}
pub(crate) enum QueryPoolState<'a> {
Idle,
Waiting(Option<(&'a mut Query, PeerId)>),
Finished(Query),
Timeout(Query),
}
impl QueryPool {
pub(crate) fn new(config: QueryConfig) -> Self {
QueryPool {
next_id: 0,
config,
queries: Default::default(),
}
}
pub(crate) fn config(&self) -> &QueryConfig {
&self.config
}
pub(crate) fn iter(&self) -> impl Iterator<Item = &Query> {
self.queries.values()
}
pub(crate) fn size(&self) -> usize {
self.queries.len()
}
pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut Query> {
self.queries.values_mut()
}
pub(crate) fn add_fixed<I>(&mut self, peers: I, info: QueryInfo) -> QueryId
where
I: IntoIterator<Item = PeerId>,
{
let id = self.next_query_id();
self.continue_fixed(id, peers, info);
id
}
pub(crate) fn continue_fixed<I>(&mut self, id: QueryId, peers: I, info: QueryInfo)
where
I: IntoIterator<Item = PeerId>,
{
assert!(!self.queries.contains_key(&id));
let parallelism = self.config.replication_factor;
let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
let query = Query::new(id, peer_iter, info);
self.queries.insert(id, query);
}
pub(crate) fn add_iter_closest<T, I>(&mut self, target: T, peers: I, info: QueryInfo) -> QueryId
where
T: Into<KeyBytes> + Clone,
I: IntoIterator<Item = Key<PeerId>>,
{
let id = self.next_query_id();
self.continue_iter_closest(id, target, peers, info);
id
}
pub(crate) fn continue_iter_closest<T, I>(
&mut self,
id: QueryId,
target: T,
peers: I,
info: QueryInfo,
) where
T: Into<KeyBytes> + Clone,
I: IntoIterator<Item = Key<PeerId>>,
{
let num_results = match info {
QueryInfo::GetClosestPeers {
num_results: val, ..
} => val,
QueryInfo::Bootstrap { .. } => K_VALUE,
_ => self.config.replication_factor,
};
let cfg = ClosestPeersIterConfig {
num_results,
parallelism: self.config.parallelism,
..ClosestPeersIterConfig::default()
};
let peer_iter = if self.config.disjoint_query_paths {
QueryPeerIter::ClosestDisjoint(ClosestDisjointPeersIter::with_config(
cfg, target, peers,
))
} else {
QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers))
};
let query = Query::new(id, peer_iter, info);
self.queries.insert(id, query);
}
fn next_query_id(&mut self) -> QueryId {
let id = QueryId(self.next_id);
self.next_id = self.next_id.wrapping_add(1);
id
}
pub(crate) fn get(&self, id: &QueryId) -> Option<&Query> {
self.queries.get(id)
}
pub(crate) fn get_mut(&mut self, id: &QueryId) -> Option<&mut Query> {
self.queries.get_mut(id)
}
pub(crate) fn poll(&mut self, now: Instant) -> QueryPoolState<'_> {
let mut finished = None;
let mut timeout = None;
let mut waiting = None;
for (&query_id, query) in self.queries.iter_mut() {
query.stats.start = query.stats.start.or(Some(now));
match query.next(now) {
PeersIterState::Finished => {
finished = Some(query_id);
break;
}
PeersIterState::Waiting(Some(peer_id)) => {
let peer = peer_id.into_owned();
waiting = Some((query_id, peer));
break;
}
PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
let elapsed = now - query.stats.start.unwrap_or(now);
if elapsed >= self.config.timeout {
timeout = Some(query_id);
break;
}
}
}
}
if let Some((query_id, peer_id)) = waiting {
let query = self.queries.get_mut(&query_id).expect("s.a.");
return QueryPoolState::Waiting(Some((query, peer_id)));
}
if let Some(query_id) = finished {
let mut query = self.queries.remove(&query_id).expect("s.a.");
query.stats.end = Some(now);
return QueryPoolState::Finished(query);
}
if let Some(query_id) = timeout {
let mut query = self.queries.remove(&query_id).expect("s.a.");
query.stats.end = Some(now);
return QueryPoolState::Timeout(query);
}
if self.queries.is_empty() {
QueryPoolState::Idle
} else {
QueryPoolState::Waiting(None)
}
}
}
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub struct QueryId(usize);
impl std::fmt::Display for QueryId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone)]
pub(crate) struct QueryConfig {
pub(crate) timeout: Duration,
pub(crate) replication_factor: NonZeroUsize,
pub(crate) parallelism: NonZeroUsize,
pub(crate) disjoint_query_paths: bool,
}
impl Default for QueryConfig {
fn default() -> Self {
QueryConfig {
timeout: Duration::from_secs(60),
replication_factor: NonZeroUsize::new(K_VALUE.get()).expect("K_VALUE > 0"),
parallelism: ALPHA_VALUE,
disjoint_query_paths: false,
}
}
}
pub(crate) struct Query {
id: QueryId,
pub(crate) peers: QueryPeers,
pub(crate) stats: QueryStats,
pub(crate) info: QueryInfo,
pub(crate) pending_rpcs: SmallVec<[(PeerId, HandlerIn); K_VALUE.get()]>,
}
pub(crate) struct QueryPeers {
pub(crate) addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
peer_iter: QueryPeerIter,
}
impl QueryPeers {
pub(crate) fn into_peerids_iter(self) -> impl Iterator<Item = PeerId> {
match self.peer_iter {
QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
}
}
pub(crate) fn into_peerinfos_iter(mut self) -> impl Iterator<Item = PeerInfo> {
match self.peer_iter {
QueryPeerIter::Closest(iter) => Either::Left(Either::Left(iter.into_result())),
QueryPeerIter::ClosestDisjoint(iter) => Either::Left(Either::Right(iter.into_result())),
QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()),
}
.map(move |peer_id| {
let addrs = self.addresses.remove(&peer_id).unwrap_or_default().to_vec();
PeerInfo { peer_id, addrs }
})
}
}
enum QueryPeerIter {
Closest(ClosestPeersIter),
ClosestDisjoint(ClosestDisjointPeersIter),
Fixed(FixedPeersIter),
}
impl Query {
fn new(id: QueryId, peer_iter: QueryPeerIter, info: QueryInfo) -> Self {
Query {
id,
info,
peers: QueryPeers {
addresses: Default::default(),
peer_iter,
},
pending_rpcs: SmallVec::default(),
stats: QueryStats::empty(),
}
}
pub(crate) fn id(&self) -> QueryId {
self.id
}
pub(crate) fn stats(&self) -> &QueryStats {
&self.stats
}
pub(crate) fn on_failure(&mut self, peer: &PeerId) {
let updated = match &mut self.peers.peer_iter {
QueryPeerIter::Closest(iter) => iter.on_failure(peer),
QueryPeerIter::ClosestDisjoint(iter) => iter.on_failure(peer),
QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
};
if updated {
self.stats.failure += 1;
}
}
pub(crate) fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
where
I: IntoIterator<Item = PeerId>,
{
let updated = match &mut self.peers.peer_iter {
QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
QueryPeerIter::ClosestDisjoint(iter) => iter.on_success(peer, new_peers),
QueryPeerIter::Fixed(iter) => iter.on_success(peer),
};
if updated {
self.stats.success += 1;
}
}
fn next(&mut self, now: Instant) -> PeersIterState<'_> {
let state = match &mut self.peers.peer_iter {
QueryPeerIter::Closest(iter) => iter.next(now),
QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
QueryPeerIter::Fixed(iter) => iter.next(),
};
if let PeersIterState::Waiting(Some(_)) = state {
self.stats.requests += 1;
}
state
}
pub(crate) fn try_finish<'a, I>(&mut self, peers: I) -> bool
where
I: IntoIterator<Item = &'a PeerId>,
{
match &mut self.peers.peer_iter {
QueryPeerIter::Closest(iter) => {
iter.finish();
true
}
QueryPeerIter::ClosestDisjoint(iter) => iter.finish_paths(peers),
QueryPeerIter::Fixed(iter) => {
iter.finish();
true
}
}
}
pub(crate) fn finish(&mut self) {
match &mut self.peers.peer_iter {
QueryPeerIter::Closest(iter) => iter.finish(),
QueryPeerIter::ClosestDisjoint(iter) => iter.finish(),
QueryPeerIter::Fixed(iter) => iter.finish(),
}
}
pub(crate) fn is_finished(&self) -> bool {
match &self.peers.peer_iter {
QueryPeerIter::Closest(iter) => iter.is_finished(),
QueryPeerIter::ClosestDisjoint(iter) => iter.is_finished(),
QueryPeerIter::Fixed(iter) => iter.is_finished(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct QueryStats {
requests: u32,
success: u32,
failure: u32,
start: Option<Instant>,
end: Option<Instant>,
}
impl QueryStats {
pub fn empty() -> Self {
QueryStats {
requests: 0,
success: 0,
failure: 0,
start: None,
end: None,
}
}
pub fn num_requests(&self) -> u32 {
self.requests
}
pub fn num_successes(&self) -> u32 {
self.success
}
pub fn num_failures(&self) -> u32 {
self.failure
}
pub fn num_pending(&self) -> u32 {
self.requests - (self.success + self.failure)
}
pub fn duration(&self) -> Option<Duration> {
if let Some(s) = self.start {
if let Some(e) = self.end {
Some(e - s)
} else {
Some(Instant::now() - s)
}
} else {
None
}
}
pub fn merge(self, other: QueryStats) -> Self {
QueryStats {
requests: self.requests + other.requests,
success: self.success + other.success,
failure: self.failure + other.failure,
start: match (self.start, other.start) {
(Some(a), Some(b)) => Some(std::cmp::min(a, b)),
(a, b) => a.or(b),
},
end: std::cmp::max(self.end, other.end),
}
}
}