use crate::target_arch::Instant;
use libp2p::{kad::KBucketKey, PeerId};
use rand::{thread_rng, Rng};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use sn_protocol::NetworkAddress;
use std::collections::{btree_map::Entry, BTreeMap};
const INITIAL_GENERATION_ATTEMPTS: usize = 10_000;
const GENERATION_ATTEMPTS: usize = 1_000;
const MAX_PEERS_PER_BUCKET: usize = 5;
#[derive(Debug, Clone)]
pub(crate) struct NetworkDiscovery {
self_key: KBucketKey<PeerId>,
candidates: BTreeMap<u32, Vec<NetworkAddress>>,
}
impl NetworkDiscovery {
pub(crate) fn new(self_peer_id: &PeerId) -> Self {
let start = Instant::now();
let self_key = KBucketKey::from(*self_peer_id);
let candidates = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS);
info!(
"Time to generate NetworkDiscoveryCandidates: {:?}",
start.elapsed()
);
let buckets_covered = candidates
.iter()
.map(|(ilog2, candidates)| (*ilog2, candidates.len()))
.collect::<Vec<_>>();
info!("The generated network discovery candidates currently cover these ilog2 buckets: {buckets_covered:?}");
Self {
self_key,
candidates,
}
}
pub(crate) fn handle_get_closest_query(&mut self, closest_peers: Vec<PeerId>) {
let now = Instant::now();
let candidates_map: BTreeMap<u32, Vec<NetworkAddress>> = closest_peers
.into_iter()
.filter_map(|peer| {
let peer = NetworkAddress::from_peer(peer);
let peer_key = peer.as_kbucket_key();
peer_key
.distance(&self.self_key)
.ilog2()
.map(|ilog2| (ilog2, peer))
})
.fold(BTreeMap::new(), |mut acc, (ilog2, peer)| {
acc.entry(ilog2).or_default().push(peer);
acc
});
for (ilog2, candidates) in candidates_map {
self.insert_candidates(ilog2, candidates);
}
trace!(
"It took {:?} to NetworkDiscovery::handle get closest query",
now.elapsed()
);
}
pub(crate) fn candidates(&mut self) -> Vec<&NetworkAddress> {
self.try_refresh_candidates();
let mut rng = thread_rng();
let mut op = Vec::with_capacity(self.candidates.len());
let candidates = self.candidates.values().filter_map(|candidates| {
let random_index = rng.gen::<usize>() % candidates.len();
candidates.get(random_index)
});
op.extend(candidates);
op
}
fn try_refresh_candidates(&mut self) {
let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS);
for (ilog2, candidates) in candidates_vec {
self.insert_candidates(ilog2, candidates);
}
}
fn insert_candidates(&mut self, ilog2: u32, new_candidates: Vec<NetworkAddress>) {
match self.candidates.entry(ilog2) {
Entry::Occupied(mut entry) => {
let existing_candidates = entry.get_mut();
let new_candidates: Vec<_> = new_candidates
.into_iter()
.filter(|candidate| !existing_candidates.contains(candidate))
.collect();
existing_candidates.extend(new_candidates);
let excess = existing_candidates
.len()
.saturating_sub(MAX_PEERS_PER_BUCKET);
if excess > 0 {
existing_candidates.drain(..excess);
}
}
Entry::Vacant(entry) => {
entry.insert(new_candidates);
}
}
}
fn generate_candidates(
self_key: &KBucketKey<PeerId>,
num_to_generate: usize,
) -> BTreeMap<u32, Vec<NetworkAddress>> {
(0..num_to_generate)
.into_par_iter()
.filter_map(|_| {
let candidate = NetworkAddress::from_peer(PeerId::random());
let candidate_key = candidate.as_kbucket_key();
let ilog2 = candidate_key.distance(&self_key).ilog2()?;
Some((ilog2, candidate))
})
.fold(
BTreeMap::new,
|mut acc: BTreeMap<u32, Vec<NetworkAddress>>, (ilog2, candidate)| {
acc.entry(ilog2).or_default().push(candidate);
acc
},
)
.reduce(
BTreeMap::new,
|mut acc: BTreeMap<u32, Vec<NetworkAddress>>, map| {
for (ilog2, candidates) in map {
let entry = acc.entry(ilog2).or_default();
for candidate in candidates {
if entry.len() < MAX_PEERS_PER_BUCKET {
entry.push(candidate);
} else {
break;
}
}
}
acc
},
)
}
}