use super::*;
use crate::{
config::Config,
kbucket::{Distance, Key, MAX_NODES_PER_BUCKET},
};
use std::{
collections::btree_map::{BTreeMap, Entry},
time::{Duration, Instant},
};
#[derive(Debug, Clone)]
pub struct FindNodeQuery<TNodeId> {
target_key: Key<TNodeId>,
progress: QueryProgress,
closest_peers: BTreeMap<Distance, QueryPeer<TNodeId>>,
num_waiting: usize,
config: FindNodeQueryConfig,
}
#[derive(Debug, Clone)]
pub struct FindNodeQueryConfig {
pub parallelism: usize,
pub num_results: usize,
pub peer_timeout: Duration,
}
impl FindNodeQueryConfig {
pub fn new_from_config(config: &Config) -> Self {
Self {
parallelism: config.query_parallelism,
num_results: MAX_NODES_PER_BUCKET,
peer_timeout: config.query_peer_timeout,
}
}
}
impl<TNodeId> FindNodeQuery<TNodeId>
where
TNodeId: Into<Key<TNodeId>> + Eq + Clone,
{
pub fn with_config<I>(
config: FindNodeQueryConfig,
target_key: Key<TNodeId>,
known_closest_peers: I,
) -> Self
where
I: IntoIterator<Item = Key<TNodeId>>,
{
let closest_peers = known_closest_peers
.into_iter()
.map(|key| {
let key: Key<TNodeId> = key;
let distance = key.distance(&target_key);
let state = QueryPeerState::NotContacted;
(distance, QueryPeer::new(key, state))
})
.take(config.num_results)
.collect();
let progress = QueryProgress::Iterating { no_progress: 0 };
FindNodeQuery {
config,
target_key,
progress,
closest_peers,
num_waiting: 0,
}
}
pub fn on_success(&mut self, node_id: &TNodeId, closer_peers: Vec<TNodeId>) {
if let QueryProgress::Finished = self.progress {
return;
}
let key: Key<TNodeId> = node_id.clone().into();
let distance = key.distance(&self.target_key);
match self.closest_peers.entry(distance) {
Entry::Vacant(..) => return,
Entry::Occupied(mut e) => match e.get().state {
QueryPeerState::Waiting(..) => {
debug_assert!(self.num_waiting > 0);
self.num_waiting -= 1;
let peer = e.get_mut();
peer.peers_returned += closer_peers.len();
peer.state = QueryPeerState::Succeeded;
}
QueryPeerState::Unresponsive => {
let peer = e.get_mut();
peer.peers_returned += closer_peers.len();
peer.state = QueryPeerState::Succeeded;
}
QueryPeerState::NotContacted
| QueryPeerState::Failed
| QueryPeerState::Succeeded => return,
},
}
let mut progress = false;
let num_closest = self.closest_peers.len();
for peer in closer_peers {
let key: Key<TNodeId> = peer.into();
let distance = self.target_key.distance(&key);
let peer = QueryPeer::new(key, QueryPeerState::NotContacted);
self.closest_peers.entry(distance).or_insert(peer);
progress = self.closest_peers.keys().next() == Some(&distance)
|| num_closest < self.config.num_results;
}
self.progress = match self.progress {
QueryProgress::Iterating { no_progress } => {
let no_progress = if progress { 0 } else { no_progress + 1 };
if no_progress >= self.config.parallelism {
QueryProgress::Stalled
} else {
QueryProgress::Iterating { no_progress }
}
}
QueryProgress::Stalled => {
if progress {
QueryProgress::Iterating { no_progress: 0 }
} else {
QueryProgress::Stalled
}
}
QueryProgress::Finished => QueryProgress::Finished,
}
}
pub fn on_failure(&mut self, peer: &TNodeId) {
if let QueryProgress::Finished = self.progress {
return;
}
let key: Key<TNodeId> = peer.clone().into();
let distance = key.distance(&self.target_key);
match self.closest_peers.entry(distance) {
Entry::Vacant(_) => {}
Entry::Occupied(mut e) => match e.get().state {
QueryPeerState::Waiting(..) => {
debug_assert!(self.num_waiting > 0);
self.num_waiting -= 1;
e.get_mut().state = QueryPeerState::Failed
}
QueryPeerState::Unresponsive => e.get_mut().state = QueryPeerState::Failed,
_ => {}
},
}
}
pub fn next(&mut self, now: Instant) -> QueryState<TNodeId> {
if let QueryProgress::Finished = self.progress {
return QueryState::Finished;
}
let mut result_counter = Some(0);
let at_capacity = self.at_capacity();
for peer in self.closest_peers.values_mut() {
match peer.state {
QueryPeerState::NotContacted => {
if !at_capacity {
let timeout = now + self.config.peer_timeout;
peer.state = QueryPeerState::Waiting(timeout);
self.num_waiting += 1;
let peer = peer.key.preimage().clone();
return QueryState::Waiting(Some(peer));
} else {
return QueryState::WaitingAtCapacity;
}
}
QueryPeerState::Waiting(timeout) => {
if now >= timeout {
debug_assert!(self.num_waiting > 0);
self.num_waiting -= 1;
peer.state = QueryPeerState::Unresponsive;
} else if at_capacity {
return QueryState::WaitingAtCapacity;
} else {
result_counter = None;
}
}
QueryPeerState::Succeeded => {
if let Some(ref mut cnt) = result_counter {
*cnt += 1;
if *cnt >= self.config.num_results {
self.progress = QueryProgress::Finished;
return QueryState::Finished;
}
}
}
QueryPeerState::Failed | QueryPeerState::Unresponsive => {
}
}
}
if self.num_waiting > 0 {
QueryState::Waiting(None)
} else {
self.progress = QueryProgress::Finished;
QueryState::Finished
}
}
pub fn into_result(self) -> Vec<TNodeId> {
self.closest_peers
.into_iter()
.filter_map(|(_, peer)| {
if let QueryPeerState::Succeeded = peer.state {
Some(peer.key.into_preimage())
} else {
None
}
})
.take(self.config.num_results)
.collect()
}
fn at_capacity(&self) -> bool {
match self.progress {
QueryProgress::Stalled => self.num_waiting >= self.config.num_results,
QueryProgress::Iterating { .. } => self.num_waiting >= self.config.parallelism,
QueryProgress::Finished => true,
}
}
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
enum QueryProgress {
Iterating {
no_progress: usize,
},
Stalled,
Finished,
}
#[derive(Debug, Clone)]
struct QueryPeer<TNodeId> {
key: Key<TNodeId>,
peers_returned: usize,
state: QueryPeerState,
}
impl<TNodeId> QueryPeer<TNodeId> {
pub fn new(key: Key<TNodeId>, state: QueryPeerState) -> Self {
QueryPeer {
key,
peers_returned: 0,
state,
}
}
}
#[derive(Debug, Copy, Clone)]
enum QueryPeerState {
NotContacted,
Waiting(Instant),
Unresponsive,
Failed,
Succeeded,
}
#[cfg(test)]
mod tests {
use super::*;
use enr::NodeId;
use quickcheck::*;
use rand_07::{thread_rng, Rng};
use std::time::Duration;
type TestQuery = FindNodeQuery<NodeId>;
fn random_nodes(n: usize) -> impl Iterator<Item = NodeId> + Clone {
(0..n).map(|_| NodeId::random())
}
fn random_query<G: Rng>(g: &mut G) -> TestQuery {
let known_closest_peers = random_nodes(g.gen_range(1, 60)).map(Key::from);
let target = NodeId::random();
let config = FindNodeQueryConfig {
parallelism: g.gen_range(1, 10),
num_results: g.gen_range(1, 25),
peer_timeout: Duration::from_secs(g.gen_range(10, 30)),
};
FindNodeQuery::with_config(config, target.into(), known_closest_peers)
}
fn sorted(target: &Key<NodeId>, peers: &[Key<NodeId>]) -> bool {
peers
.windows(2)
.all(|w| w[0].distance(target) < w[1].distance(target))
}
impl Arbitrary for TestQuery {
fn arbitrary<G: Gen>(g: &mut G) -> TestQuery {
random_query(g)
}
}
#[test]
fn new_query() {
let query = random_query(&mut thread_rng());
let target = query.target_key.clone();
let (keys, states): (Vec<_>, Vec<_>) = query
.closest_peers
.values()
.map(|e| (e.key.clone(), &e.state))
.unzip();
let none_contacted = states
.iter()
.all(|s| matches!(s, QueryPeerState::NotContacted));
assert!(none_contacted, "Unexpected peer state in new query.");
assert!(
sorted(&target, &keys),
"Closest peers in new query not sorted by distance to target."
);
assert_eq!(
query.num_waiting, 0,
"Unexpected peers in progress in new query."
);
assert!(
query.into_result().is_empty(),
"Unexpected closest peers in new query"
);
}
#[test]
fn termination_and_parallelism() {
fn prop(mut query: TestQuery) {
let now = Instant::now();
let mut rng = thread_rng();
let mut expected = query
.closest_peers
.values()
.map(|e| e.key.clone())
.collect::<Vec<_>>();
let num_known = expected.len();
let max_parallelism = usize::min(query.config.parallelism, num_known);
let target = query.target_key.clone();
let mut remaining;
let mut num_failures = 0;
'finished: loop {
if expected.is_empty() {
break;
}
else if expected.len() < max_parallelism {
remaining = Vec::new();
} else {
remaining = expected.split_off(max_parallelism);
}
for k in expected.iter() {
match query.next(now) {
QueryState::Finished => break 'finished,
QueryState::Waiting(Some(p)) => assert_eq!(&p, k.preimage()),
QueryState::Waiting(None) => panic!("Expected another peer."),
QueryState::WaitingAtCapacity => panic!("Unexpectedly reached capacity."),
}
}
let num_waiting = query.num_waiting;
assert_eq!(num_waiting, expected.len());
if query.at_capacity() {
assert_eq!(query.next(now), QueryState::WaitingAtCapacity)
}
for (i, k) in expected.iter().enumerate() {
if rng.gen_bool(0.75) {
let num_closer = rng.gen_range(0, query.config.num_results + 1);
let closer_peers = random_nodes(num_closer).collect::<Vec<_>>();
remaining.extend(closer_peers.iter().map(|x| Key::from(*x)));
query.on_success(k.preimage(), closer_peers);
} else {
num_failures += 1;
query.on_failure(k.preimage());
}
assert_eq!(query.num_waiting, num_waiting - (i + 1));
}
remaining.sort_by_key(|k| target.distance(k));
expected = remaining
}
assert_eq!(query.next(now), QueryState::Finished);
assert_eq!(query.progress, QueryProgress::Finished);
let all_contacted = query.closest_peers.values().all(|e| {
!matches!(
e.state,
QueryPeerState::NotContacted | QueryPeerState::Waiting { .. }
)
});
let target_key = query.target_key.clone();
let num_results = query.config.num_results;
let result = query.into_result();
let closest = result.into_iter().map(Key::from).collect::<Vec<_>>();
assert!(sorted(&target_key, &closest));
if closest.len() < num_results {
assert!(num_known < num_results || num_failures > 0);
assert!(all_contacted, "Not all peers have been contacted.");
} else {
assert_eq!(num_results, closest.len(), "Too many results.");
}
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
}
#[test]
fn no_duplicates() {
fn prop(mut query: TestQuery) -> bool {
let now = Instant::now();
let closer: Vec<NodeId> = random_nodes(1).collect();
let peer1 = if let QueryState::Waiting(Some(p)) = query.next(now) {
p
} else {
panic!("No peer.");
};
query.on_success(&peer1, closer.clone());
query.on_success(&peer1, closer.clone());
match query.next(now) {
QueryState::Waiting(Some(p)) => {
let peer2 = p;
query.on_success(&peer2, closer.clone())
}
QueryState::Finished => {}
_ => panic!("Unexpectedly query state."),
};
let n = query
.closest_peers
.values()
.filter(|e| e.key.preimage() == &closer[0])
.count();
assert_eq!(n, 1);
true
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
}
#[test]
fn timeout() {
fn prop(mut query: TestQuery) -> bool {
let mut now = Instant::now();
let peer = query
.closest_peers
.values()
.next()
.unwrap()
.key
.clone()
.into_preimage();
match query.next(now) {
QueryState::Waiting(Some(id)) => assert_eq!(id, peer),
_ => panic!(),
}
now += query.config.peer_timeout;
let _ = query.next(now);
match &query.closest_peers.values().next().unwrap() {
QueryPeer {
key,
state: QueryPeerState::Unresponsive,
..
} => {
assert_eq!(key.preimage(), &peer);
}
QueryPeer { state, .. } => panic!("Unexpected peer state: {:?}", state),
}
let finished = query.progress == QueryProgress::Finished;
query.on_success(&peer, Vec::<NodeId>::new());
let closest = query.into_result();
if finished {
assert_eq!(Vec::<NodeId>::new(), closest);
} else {
assert_eq!(vec![peer], closest)
}
true
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
}
}