mod peers;
pub use peers::QueryState;
pub(crate) use peers::{
closest::{FindNodeQuery, FindNodeQueryConfig},
predicate::{PredicateQuery, PredicateQueryConfig},
};
use crate::kbucket::{Key, PredicateKey};
use fnv::FnvHashMap;
use std::time::{Duration, Instant};
pub trait TargetKey<TNodeId> {
fn key(&self) -> Key<TNodeId>;
}
pub struct QueryPool<TTarget, TNodeId, TResult> {
next_id: usize,
query_timeout: Duration,
queries: FnvHashMap<QueryId, Query<TTarget, TNodeId, TResult>>,
}
#[allow(clippy::type_complexity)]
pub enum QueryPoolState<'a, TTarget, TNodeId, TResult> {
Idle,
Waiting(Option<(&'a mut Query<TTarget, TNodeId, TResult>, TNodeId)>),
Finished(Query<TTarget, TNodeId, TResult>),
Timeout(Query<TTarget, TNodeId, TResult>),
}
impl<TTarget, TNodeId, TResult> QueryPool<TTarget, TNodeId, TResult>
where
TTarget: TargetKey<TNodeId>,
TNodeId: Into<Key<TNodeId>> + Eq + Clone,
TResult: Into<TNodeId> + Clone,
{
pub fn new(query_timeout: Duration) -> Self {
QueryPool {
next_id: 0,
query_timeout,
queries: Default::default(),
}
}
pub fn iter(&self) -> impl Iterator<Item = &Query<TTarget, TNodeId, TResult>> {
self.queries.values()
}
pub fn add_findnode_query<I>(
&mut self,
config: FindNodeQueryConfig,
target: TTarget,
peers: I,
) -> QueryId
where
I: IntoIterator<Item = Key<TNodeId>>,
{
let target_key = target.key();
let findnode_query = FindNodeQuery::with_config(config, target_key, peers);
let peer_iter = QueryPeerIter::FindNode(findnode_query);
self.add(peer_iter, target)
}
pub(crate) fn add_predicate_query<I>(
&mut self,
config: PredicateQueryConfig,
target: TTarget,
peers: I,
predicate: impl Fn(&TResult) -> bool + Send + 'static,
) -> QueryId
where
I: IntoIterator<Item = PredicateKey<TNodeId>>,
{
let target_key = target.key();
let predicate_query = PredicateQuery::with_config(config, target_key, peers, predicate);
let peer_iter = QueryPeerIter::Predicate(predicate_query);
self.add(peer_iter, target)
}
fn add(&mut self, peer_iter: QueryPeerIter<TNodeId, TResult>, target: TTarget) -> QueryId {
let id = QueryId(self.next_id);
self.next_id = self.next_id.wrapping_add(1);
let query = Query::new(id, peer_iter, target);
self.queries.insert(id, query);
id
}
pub fn get_mut(&mut self, id: QueryId) -> Option<&mut Query<TTarget, TNodeId, TResult>> {
self.queries.get_mut(&id)
}
pub fn poll(&mut self) -> QueryPoolState<'_, TTarget, TNodeId, TResult> {
let now = Instant::now();
let mut finished = None;
let mut waiting = None;
let mut timeout = None;
for (&query_id, query) in self.queries.iter_mut() {
query.started = query.started.or(Some(now));
match query.next(now) {
QueryState::Finished => {
finished = Some(query_id);
break;
}
QueryState::Waiting(Some(return_peer)) => {
waiting = Some((query_id, return_peer));
break;
}
QueryState::Waiting(None) | QueryState::WaitingAtCapacity => {
let elapsed = now - query.started.unwrap_or(now);
if elapsed >= self.query_timeout {
timeout = Some(query_id);
break;
}
}
}
}
if let Some((query_id, return_peer)) = waiting {
let query = self.queries.get_mut(&query_id).expect("s.a.");
return QueryPoolState::Waiting(Some((query, return_peer)));
}
if let Some(query_id) = finished {
let query = self.queries.remove(&query_id).expect("s.a.");
return QueryPoolState::Finished(query);
}
if let Some(query_id) = timeout {
let query = self.queries.remove(&query_id).expect("s.a.");
return QueryPoolState::Timeout(query);
}
if self.queries.is_empty() {
QueryPoolState::Idle
} else {
QueryPoolState::Waiting(None)
}
}
}
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub struct QueryId(pub usize);
impl std::ops::Deref for QueryId {
type Target = usize;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub struct Query<TTarget, TNodeId, TResult> {
id: QueryId,
peer_iter: QueryPeerIter<TNodeId, TResult>,
started: Option<Instant>,
target: TTarget,
}
enum QueryPeerIter<TNodeId, TResult> {
FindNode(FindNodeQuery<TNodeId>),
Predicate(PredicateQuery<TNodeId, TResult>),
}
impl<TTarget, TNodeId, TResult> Query<TTarget, TNodeId, TResult>
where
TTarget: TargetKey<TNodeId>,
TNodeId: Into<Key<TNodeId>> + Eq + Clone,
TResult: Into<TNodeId> + Clone,
{
fn new(id: QueryId, peer_iter: QueryPeerIter<TNodeId, TResult>, target: TTarget) -> Self {
Query {
id,
peer_iter,
target,
started: None,
}
}
pub fn id(&self) -> QueryId {
self.id
}
pub fn on_failure(&mut self, peer: &TNodeId) {
match &mut self.peer_iter {
QueryPeerIter::FindNode(iter) => iter.on_failure(peer),
QueryPeerIter::Predicate(iter) => iter.on_failure(peer),
}
}
pub fn on_success<'a>(&mut self, peer: &TNodeId, new_peers: &'a [TResult])
where
&'a TResult: Into<TNodeId>,
{
match &mut self.peer_iter {
QueryPeerIter::FindNode(iter) => {
iter.on_success(peer, new_peers.iter().map(|result| result.into()).collect())
}
QueryPeerIter::Predicate(iter) => iter.on_success(peer, new_peers),
}
}
fn next(&mut self, now: Instant) -> QueryState<TNodeId> {
match &mut self.peer_iter {
QueryPeerIter::FindNode(iter) => iter.next(now),
QueryPeerIter::Predicate(iter) => iter.next(now),
}
}
pub fn into_result(self) -> QueryResult<TTarget, impl Iterator<Item = TNodeId>> {
let peers = match self.peer_iter {
QueryPeerIter::FindNode(iter) => iter.into_result(),
QueryPeerIter::Predicate(iter) => iter.into_result(),
};
QueryResult {
target: self.target,
closest_peers: peers.into_iter(),
}
}
pub fn target(&self) -> &TTarget {
&self.target
}
pub fn target_mut(&mut self) -> &mut TTarget {
&mut self.target
}
}
pub struct QueryResult<TTarget, TClosest> {
pub target: TTarget,
pub closest_peers: TClosest,
}