use std::{
collections::btree_map::{BTreeMap, Entry},
num::NonZeroUsize,
time::Duration,
};
use web_time::Instant;
use super::*;
use crate::{
kbucket::{Distance, Key, KeyBytes},
ALPHA_VALUE, K_VALUE,
};
pub(crate) mod disjoint;
#[derive(Debug, Clone)]
pub struct ClosestPeersIter {
config: ClosestPeersIterConfig,
target: KeyBytes,
state: State,
closest_peers: BTreeMap<Distance, Peer>,
num_waiting: usize,
}
#[derive(Debug, Clone)]
pub struct ClosestPeersIterConfig {
pub parallelism: NonZeroUsize,
pub num_results: NonZeroUsize,
pub peer_timeout: Duration,
}
impl Default for ClosestPeersIterConfig {
fn default() -> Self {
ClosestPeersIterConfig {
parallelism: ALPHA_VALUE,
num_results: K_VALUE,
peer_timeout: Duration::from_secs(10),
}
}
}
impl ClosestPeersIter {
pub fn new<I>(target: KeyBytes, known_closest_peers: I) -> Self
where
I: IntoIterator<Item = Key<PeerId>>,
{
Self::with_config(
ClosestPeersIterConfig::default(),
target,
known_closest_peers,
)
}
pub fn with_config<I, T>(
config: ClosestPeersIterConfig,
target: T,
known_closest_peers: I,
) -> Self
where
I: IntoIterator<Item = Key<PeerId>>,
T: Into<KeyBytes>,
{
let target = target.into();
let closest_peers = BTreeMap::from_iter(
known_closest_peers
.into_iter()
.map(|key| {
let distance = key.distance(&target);
let state = PeerState::NotContacted;
(distance, Peer { key, state })
})
.take(K_VALUE.into()),
);
let state = State::Iterating { no_progress: 0 };
ClosestPeersIter {
config,
target,
state,
closest_peers,
num_waiting: 0,
}
}
pub fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
where
I: IntoIterator<Item = PeerId>,
{
if let State::Finished = self.state {
return false;
}
let key = Key::from(*peer);
let distance = key.distance(&self.target);
match self.closest_peers.entry(distance) {
Entry::Vacant(..) => return false,
Entry::Occupied(mut e) => match e.get().state {
PeerState::Waiting(..) => {
debug_assert!(self.num_waiting > 0);
self.num_waiting -= 1;
e.get_mut().state = PeerState::Succeeded;
}
PeerState::Unresponsive => {
e.get_mut().state = PeerState::Succeeded;
}
PeerState::NotContacted | PeerState::Failed | PeerState::Succeeded => return false,
},
}
let mut cur_range = distance;
let num_results = self.config.num_results.get();
let furthest_peer = self
.closest_peers
.iter()
.enumerate()
.nth(num_results - 1)
.map(|(_, peer)| peer)
.or_else(|| self.closest_peers.iter().last());
if let Some((dist, _)) = furthest_peer {
cur_range = *dist;
}
let mut progress = self.closest_peers.len() < self.config.num_results.get();
for peer in closer_peers {
let key = peer.into();
let distance = self.target.distance(&key);
let peer = Peer {
key,
state: PeerState::NotContacted,
};
let is_first_insert = match self.closest_peers.entry(distance) {
Entry::Occupied(_) => false,
Entry::Vacant(entry) => {
entry.insert(peer);
true
}
};
progress = (is_first_insert && distance < cur_range) || progress;
}
self.state = match self.state {
State::Iterating { no_progress } => {
let no_progress = if progress { 0 } else { no_progress + 1 };
if no_progress >= self.config.parallelism.get() {
State::Stalled
} else {
State::Iterating { no_progress }
}
}
State::Stalled => {
if progress {
State::Iterating { no_progress: 0 }
} else {
State::Stalled
}
}
State::Finished => State::Finished,
};
true
}
pub fn on_failure(&mut self, peer: &PeerId) -> bool {
if let State::Finished = self.state {
return false;
}
let key = Key::from(*peer);
let distance = key.distance(&self.target);
match self.closest_peers.entry(distance) {
Entry::Vacant(_) => return false,
Entry::Occupied(mut e) => match e.get().state {
PeerState::Waiting(_) => {
debug_assert!(self.num_waiting > 0);
self.num_waiting -= 1;
e.get_mut().state = PeerState::Failed
}
PeerState::Unresponsive => e.get_mut().state = PeerState::Failed,
PeerState::NotContacted | PeerState::Failed | PeerState::Succeeded => return false,
},
}
true
}
pub fn waiting(&self) -> impl Iterator<Item = &PeerId> {
self.closest_peers
.values()
.filter_map(|peer| match peer.state {
PeerState::Waiting(..) => Some(peer.key.preimage()),
_ => None,
})
}
pub fn num_waiting(&self) -> usize {
self.num_waiting
}
pub fn is_waiting(&self, peer: &PeerId) -> bool {
self.waiting().any(|p| peer == p)
}
pub fn next(&mut self, now: Instant) -> PeersIterState<'_> {
if let State::Finished = self.state {
return PeersIterState::Finished;
}
let mut result_counter = Some(0);
let at_capacity = self.at_capacity();
for peer in self.closest_peers.values_mut() {
match peer.state {
PeerState::Waiting(timeout) => {
if now >= timeout {
debug_assert!(self.num_waiting > 0);
self.num_waiting -= 1;
peer.state = PeerState::Unresponsive
} else if at_capacity {
return PeersIterState::WaitingAtCapacity;
} else {
result_counter = None;
}
}
PeerState::Succeeded => {
if let Some(ref mut cnt) = result_counter {
*cnt += 1;
if *cnt >= self.config.num_results.get() {
self.state = State::Finished;
return PeersIterState::Finished;
}
}
}
PeerState::NotContacted => {
if !at_capacity {
let timeout = now + self.config.peer_timeout;
peer.state = PeerState::Waiting(timeout);
self.num_waiting += 1;
return PeersIterState::Waiting(Some(Cow::Borrowed(peer.key.preimage())));
} else {
return PeersIterState::WaitingAtCapacity;
}
}
PeerState::Unresponsive | PeerState::Failed => {
}
}
}
if self.num_waiting > 0 {
PeersIterState::Waiting(None)
} else {
self.state = State::Finished;
PeersIterState::Finished
}
}
pub fn finish(&mut self) {
self.state = State::Finished
}
pub fn is_finished(&self) -> bool {
self.state == State::Finished
}
pub fn into_result(self) -> impl Iterator<Item = PeerId> {
self.closest_peers
.into_iter()
.filter_map(|(_, peer)| {
if let PeerState::Succeeded = peer.state {
Some(peer.key.into_preimage())
} else {
None
}
})
.take(self.config.num_results.get())
}
fn at_capacity(&self) -> bool {
match self.state {
State::Stalled => {
self.num_waiting
>= usize::max(self.config.num_results.get(), self.config.parallelism.get())
}
State::Iterating { .. } => self.num_waiting >= self.config.parallelism.get(),
State::Finished => true,
}
}
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
enum State {
Iterating {
no_progress: usize,
},
Stalled,
Finished,
}
#[derive(Debug, Clone)]
struct Peer {
key: Key<PeerId>,
state: PeerState,
}
#[derive(Debug, Copy, Clone)]
enum PeerState {
NotContacted,
Waiting(Instant),
Unresponsive,
Failed,
Succeeded,
}
#[cfg(test)]
mod tests {
use std::iter;
use libp2p_core::multihash::Multihash;
use quickcheck::*;
use rand::{rngs::StdRng, Rng, SeedableRng};
use super::*;
use crate::SHA_256_MH;
fn random_peers<R: Rng>(n: usize, g: &mut R) -> Vec<PeerId> {
(0..n)
.map(|_| {
PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &g.gen::<[u8; 32]>()).unwrap())
.unwrap()
})
.collect()
}
fn sorted<T: AsRef<KeyBytes>>(target: &T, peers: &[Key<PeerId>]) -> bool {
peers
.windows(2)
.all(|w| w[0].distance(&target) < w[1].distance(&target))
}
#[derive(Clone, Debug)]
struct ArbitraryPeerId(PeerId);
impl Arbitrary for ArbitraryPeerId {
fn arbitrary(g: &mut Gen) -> ArbitraryPeerId {
let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
let peer_id =
PeerId::from_multihash(Multihash::wrap(SHA_256_MH, &hash).unwrap()).unwrap();
ArbitraryPeerId(peer_id)
}
}
impl Arbitrary for ClosestPeersIter {
fn arbitrary(g: &mut Gen) -> ClosestPeersIter {
let known_closest_peers = (0..g.gen_range(1..60u8))
.map(|_| Key::from(ArbitraryPeerId::arbitrary(g).0))
.collect::<Vec<_>>();
let target = Key::from(ArbitraryPeerId::arbitrary(g).0);
let config = ClosestPeersIterConfig {
parallelism: NonZeroUsize::new(g.gen_range(1..10)).unwrap(),
num_results: NonZeroUsize::new(g.gen_range(1..25)).unwrap(),
peer_timeout: Duration::from_secs(g.gen_range(10..30)),
};
ClosestPeersIter::with_config(config, target, known_closest_peers)
}
}
#[derive(Clone, Debug)]
struct Seed([u8; 32]);
impl Arbitrary for Seed {
fn arbitrary(g: &mut Gen) -> Seed {
let seed = core::array::from_fn(|_| u8::arbitrary(g));
Seed(seed)
}
}
#[test]
fn new_iter() {
fn prop(iter: ClosestPeersIter) {
let target = iter.target;
let (keys, states): (Vec<_>, Vec<_>) = iter
.closest_peers
.values()
.map(|e| (e.key, &e.state))
.unzip();
let none_contacted = states.iter().all(|s| matches!(s, PeerState::NotContacted));
assert!(none_contacted, "Unexpected peer state in new iterator.");
assert!(
sorted(&target, &keys),
"Closest peers in new iterator not sorted by distance to target."
);
assert_eq!(
iter.num_waiting(),
0,
"Unexpected peers in progress in new iterator."
);
assert_eq!(
iter.into_result().count(),
0,
"Unexpected closest peers in new iterator"
);
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
}
#[test]
fn termination_and_parallelism() {
fn prop(mut iter: ClosestPeersIter, seed: Seed) {
let now = Instant::now();
let mut rng = StdRng::from_seed(seed.0);
let mut expected = iter
.closest_peers
.values()
.map(|e| e.key)
.collect::<Vec<_>>();
let num_known = expected.len();
let max_parallelism = usize::min(iter.config.parallelism.get(), num_known);
let target = iter.target;
let mut remaining;
let mut num_failures = 0;
'finished: loop {
if expected.is_empty() {
break;
}
else if expected.len() < max_parallelism {
remaining = Vec::new();
} else {
remaining = expected.split_off(max_parallelism);
}
for k in expected.iter() {
match iter.next(now) {
PeersIterState::Finished => break 'finished,
PeersIterState::Waiting(Some(p)) => assert_eq!(&*p, k.preimage()),
PeersIterState::Waiting(None) => panic!("Expected another peer."),
PeersIterState::WaitingAtCapacity => {
panic!("Unexpectedly reached capacity.")
}
}
}
let num_waiting = iter.num_waiting();
assert_eq!(num_waiting, expected.len());
if iter.at_capacity() {
assert_eq!(iter.next(now), PeersIterState::WaitingAtCapacity)
}
for (i, k) in expected.iter().enumerate() {
if rng.gen_bool(0.75) {
let num_closer = rng.gen_range(0..iter.config.num_results.get() + 1);
let closer_peers = random_peers(num_closer, &mut rng);
remaining.extend(closer_peers.iter().cloned().map(Key::from));
iter.on_success(k.preimage(), closer_peers);
} else {
num_failures += 1;
iter.on_failure(k.preimage());
}
assert_eq!(iter.num_waiting(), num_waiting - (i + 1));
}
remaining.sort_by_key(|k| target.distance(&k));
expected = remaining
}
assert_eq!(iter.next(now), PeersIterState::Finished);
assert_eq!(iter.state, State::Finished);
let all_contacted = iter
.closest_peers
.values()
.all(|e| !matches!(e.state, PeerState::NotContacted | PeerState::Waiting { .. }));
let target = iter.target;
let num_results = iter.config.num_results;
let result = iter.into_result();
let closest = result.map(Key::from).collect::<Vec<_>>();
assert!(sorted(&target, &closest));
if closest.len() < num_results.get() {
assert!(num_known < num_results.get() || num_failures > 0);
assert!(all_contacted, "Not all peers have been contacted.");
} else {
assert_eq!(num_results.get(), closest.len(), "Too many results.");
}
}
QuickCheck::new()
.tests(10)
.quickcheck(prop as fn(_, _) -> _)
}
#[test]
fn no_duplicates() {
fn prop(mut iter: ClosestPeersIter, closer: ArbitraryPeerId) -> bool {
let now = Instant::now();
let closer = vec![closer.0];
let peer1 = match iter.next(now) {
PeersIterState::Waiting(Some(p)) => p.into_owned(),
_ => panic!("No peer."),
};
iter.on_success(&peer1, closer.clone());
iter.on_success(&peer1, closer.clone());
match iter.next(now) {
PeersIterState::Waiting(Some(p)) => {
let peer2 = p.into_owned();
assert!(iter.on_success(&peer2, closer.clone()))
}
PeersIterState::Finished => {}
_ => panic!("Unexpectedly iter state."),
};
let n = iter
.closest_peers
.values()
.filter(|e| e.key.preimage() == &closer[0])
.count();
assert_eq!(n, 1);
true
}
QuickCheck::new()
.tests(10)
.quickcheck(prop as fn(_, _) -> _)
}
#[test]
fn timeout() {
fn prop(mut iter: ClosestPeersIter) -> bool {
let mut now = Instant::now();
let peer = iter
.closest_peers
.values()
.next()
.unwrap()
.key
.into_preimage();
match iter.next(now) {
PeersIterState::Waiting(Some(id)) => assert_eq!(&*id, &peer),
_ => panic!(),
}
now += iter.config.peer_timeout;
let _ = iter.next(now);
match &iter.closest_peers.values().next().unwrap() {
Peer {
key,
state: PeerState::Unresponsive,
} => {
assert_eq!(key.preimage(), &peer);
}
Peer { state, .. } => panic!("Unexpected peer state: {state:?}"),
}
let finished = iter.is_finished();
iter.on_success(&peer, iter::empty());
let closest = iter.into_result().collect::<Vec<_>>();
if finished {
assert_eq!(Vec::<PeerId>::new(), closest)
} else {
assert_eq!(vec![peer], closest)
}
true
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
}
#[test]
fn without_success_try_up_to_k_peers() {
fn prop(mut iter: ClosestPeersIter) {
let now = Instant::now();
for _ in 0..(usize::min(iter.closest_peers.len(), K_VALUE.get())) {
match iter.next(now) {
PeersIterState::Waiting(Some(p)) => {
let peer = p.clone().into_owned();
iter.on_failure(&peer);
}
_ => panic!("Expected iterator to yield another peer to query."),
}
}
assert_eq!(PeersIterState::Finished, iter.next(now));
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_))
}
#[test]
fn stalled_at_capacity() {
fn prop(mut iter: ClosestPeersIter) {
iter.state = State::Stalled;
for i in 0..usize::max(iter.config.parallelism.get(), iter.config.num_results.get()) {
iter.num_waiting = i;
assert!(
!iter.at_capacity(),
"Iterator should not be at capacity if less than \
`max(parallelism, num_results)` requests are waiting.",
)
}
iter.num_waiting =
usize::max(iter.config.parallelism.get(), iter.config.num_results.get());
assert!(
iter.at_capacity(),
"Iterator should be at capacity if `max(parallelism, num_results)` requests are \
waiting.",
)
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_))
}
}