use crate::{
driver::PendingGetClosestType,
time::{interval, Instant, Interval},
Addresses, NetworkEvent, SwarmDriver,
};
use ant_protocol::NetworkAddress;
use libp2p::{
kad::{KBucketKey, K_VALUE},
PeerId,
};
use rand::{rngs::OsRng, Rng};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use std::collections::{btree_map::Entry, BTreeMap};
use tokio::time::Duration;
const INITIAL_GENERATION_ATTEMPTS: usize = 10_000;
const GENERATION_ATTEMPTS: usize = 1_000;
const MAX_PEERS_PER_BUCKET: usize = 5;
pub(crate) const NETWORK_DISCOVER_INTERVAL: Duration = Duration::from_secs(10);
const LAST_PEER_ADDED_TIME_LIMIT: Duration = Duration::from_secs(180);
const NO_PEER_ADDED_SLOWDOWN_INTERVAL_MAX_S: u64 = 1200;
type RefreshTargets = (
Vec<NetworkAddress>,
Vec<(PeerId, Addresses)>,
Vec<(PeerId, Addresses)>,
);
impl SwarmDriver {
pub(crate) async fn run_network_discover_continuously(
&mut self,
current_interval: Duration,
round_robin_index: usize,
) -> Option<Interval> {
let (should_discover, new_interval) = self
.network_discovery
.should_we_discover(self.peers_in_rt as u32, current_interval)
.await;
if should_discover {
self.trigger_network_discovery(round_robin_index);
}
new_interval
}
pub(crate) fn trigger_network_discovery(&mut self, round_robin_index: usize) {
let now = Instant::now();
let (get_closest_candidates, picked_non_full_bucket_peers, picked_full_bucket_peers) =
self.get_refresh_targets(round_robin_index);
for addr in get_closest_candidates {
let query_id = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_peers(addr.as_bytes());
let _ = self.pending_get_closest_peers.insert(
query_id,
(PendingGetClosestType::NetworkDiscovery, Default::default()),
);
}
if !picked_full_bucket_peers.is_empty() {
self.send_event(NetworkEvent::PeersForVersionQuery(picked_full_bucket_peers));
}
if !picked_non_full_bucket_peers.is_empty() {
self.send_event(NetworkEvent::PeersForVersionQuery(
picked_non_full_bucket_peers,
));
}
self.network_discovery.initiated();
debug!("Trigger network discovery took {:?}", now.elapsed());
}
fn get_refresh_targets(&mut self, round_robin_index: usize) -> RefreshTargets {
let kbuckets: Vec<_> = self.swarm.behaviour_mut().kademlia.kbuckets().collect();
let (full_buckets, non_full_buckets): (Vec<_>, Vec<_>) = kbuckets
.into_iter()
.partition(|kb| kb.num_entries() >= K_VALUE.get());
let non_full_non_empty_buckets_indexes = non_full_buckets
.iter()
.filter_map(|kbucket| kbucket.range().0.ilog2())
.collect::<Vec<_>>();
let full_buckets_index = full_buckets
.iter()
.filter_map(|kbucket| kbucket.range().0.ilog2())
.collect::<Vec<_>>();
let get_closest_candidates = self.network_discovery.candidates.get_candidates(
non_full_non_empty_buckets_indexes.clone(),
full_buckets_index,
round_robin_index,
);
info!(
"Going to undertake {} get_closest queries for non_full_buckets {non_full_non_empty_buckets_indexes:?}",
get_closest_candidates.len(),
);
if full_buckets.len() < 2 {
return (get_closest_candidates, vec![], vec![]);
}
let picked_full_bucket_index = round_robin_index % full_buckets.len();
let mut targeted_bucket = None;
let picked_full_bucket_peers =
if let Some(kbucket) = full_buckets.get(picked_full_bucket_index) {
targeted_bucket = kbucket.range().0.ilog2();
kbucket
.iter()
.map(|peer_entry| {
(
peer_entry.node.key.into_preimage(),
Addresses(peer_entry.node.value.clone().into_vec()),
)
})
.collect::<Vec<(PeerId, Addresses)>>()
} else {
error!(
"Full bucket {picked_full_bucket_index} doesn't exists among {} buckets.",
full_buckets.len()
);
vec![]
};
info!(
"Going to query {} peers of a full bucket {targeted_bucket:?} to check liveness.",
picked_full_bucket_peers.len()
);
if non_full_buckets.is_empty() {
return (get_closest_candidates, vec![], picked_full_bucket_peers);
}
let picked_non_full_bucket = round_robin_index % non_full_buckets.len();
let mut targeted_bucket = None;
let picked_non_full_bucket_peers =
if let Some(kbucket) = non_full_buckets.get(picked_non_full_bucket) {
targeted_bucket = kbucket.range().0.ilog2();
kbucket
.iter()
.map(|peer_entry| {
(
peer_entry.node.key.into_preimage(),
Addresses(peer_entry.node.value.clone().into_vec()),
)
})
.collect::<Vec<(PeerId, Addresses)>>()
} else {
error!(
"Non full bucket {picked_non_full_bucket} doesn't exists among {} buckets.",
non_full_buckets.len()
);
vec![]
};
info!(
"Going to query {} peers of a non-full bucket {targeted_bucket:?} to check liveness.",
picked_non_full_bucket_peers.len()
);
(
get_closest_candidates,
picked_non_full_bucket_peers,
picked_full_bucket_peers,
)
}
}
pub(crate) struct NetworkDiscovery {
initial_bootstrap_done: bool,
last_peer_added_instant: Instant,
last_network_discover_triggered: Option<Instant>,
candidates: NetworkDiscoveryCandidates,
}
impl NetworkDiscovery {
pub(crate) fn new(self_peer_id: &PeerId) -> Self {
Self {
initial_bootstrap_done: false,
last_peer_added_instant: Instant::now(),
last_network_discover_triggered: None,
candidates: NetworkDiscoveryCandidates::new(self_peer_id),
}
}
pub(crate) fn initiated(&mut self) {
self.last_network_discover_triggered = Some(Instant::now());
}
pub(crate) fn notify_new_peer(&mut self) -> bool {
self.last_peer_added_instant = Instant::now();
if !self.initial_bootstrap_done {
self.initial_bootstrap_done = true;
true
} else {
false
}
}
pub(crate) async fn should_we_discover(
&self,
peers_in_rt: u32,
current_interval: Duration,
) -> (bool, Option<Interval>) {
let should_network_discover = peers_in_rt >= 1;
if self.last_peer_added_instant.elapsed() > LAST_PEER_ADDED_TIME_LIMIT && peers_in_rt != 0 {
let no_peer_added_slowdown_interval: u64 = OsRng.gen_range(
NO_PEER_ADDED_SLOWDOWN_INTERVAL_MAX_S / 2..NO_PEER_ADDED_SLOWDOWN_INTERVAL_MAX_S,
);
let no_peer_added_slowdown_interval_duration =
Duration::from_secs(no_peer_added_slowdown_interval);
info!(
"It has been {LAST_PEER_ADDED_TIME_LIMIT:?} since we last added a peer to RT. Slowing down the continuous network discovery process. Old interval: {current_interval:?}, New interval: {no_peer_added_slowdown_interval_duration:?}"
);
let mut new_interval = interval(no_peer_added_slowdown_interval_duration);
new_interval.tick().await;
return (should_network_discover, Some(new_interval));
}
let duration_based_on_peers = Self::scaled_duration(peers_in_rt);
let new_interval = if duration_based_on_peers > current_interval {
info!("More peers have been added to our RT!. Slowing down the continuous network discovery process. Old interval: {current_interval:?}, New interval: {duration_based_on_peers:?}");
let mut interval = interval(duration_based_on_peers);
interval.tick().await;
Some(interval)
} else {
None
};
(should_network_discover, new_interval)
}
pub(crate) fn handle_get_closest_query(&mut self, closest_peers: Vec<(PeerId, Addresses)>) {
self.candidates.handle_get_closest_query(closest_peers);
}
fn scaled_duration(peers_in_rt: u32) -> Duration {
if peers_in_rt >= 450 {
return Duration::from_secs(1200);
}
let base: f64 = 1.00673;
Duration::from_secs_f64(60.0 * base.powi(peers_in_rt as i32))
}
}
#[derive(Debug, Clone)]
struct NetworkDiscoveryCandidates {
self_key: KBucketKey<PeerId>,
self_peer_id: PeerId,
candidates: BTreeMap<u32, Vec<NetworkAddress>>,
}
impl NetworkDiscoveryCandidates {
fn new(self_peer_id: &PeerId) -> Self {
let start = Instant::now();
let self_key = KBucketKey::from(*self_peer_id);
let candidates = Self::generate_candidates(&self_key, INITIAL_GENERATION_ATTEMPTS);
info!(
"Time to generate NetworkDiscoveryCandidates: {:?}",
start.elapsed()
);
let buckets_covered = candidates
.iter()
.map(|(ilog2, candidates)| (*ilog2, candidates.len()))
.collect::<Vec<_>>();
info!("The generated network discovery candidates currently cover these ilog2 buckets: {buckets_covered:?}");
Self {
self_key,
self_peer_id: *self_peer_id,
candidates,
}
}
fn handle_get_closest_query(&mut self, closest_peers: Vec<(PeerId, Addresses)>) {
let now = Instant::now();
let candidates_map: BTreeMap<u32, Vec<NetworkAddress>> = closest_peers
.into_iter()
.filter_map(|(peer, _)| {
let peer = NetworkAddress::from(peer);
let peer_key = peer.as_kbucket_key();
peer_key
.distance(&self.self_key)
.ilog2()
.map(|ilog2| (ilog2, peer))
})
.fold(BTreeMap::new(), |mut acc, (ilog2, peer)| {
acc.entry(ilog2).or_default().push(peer);
acc
});
for (ilog2, candidates) in candidates_map {
self.insert_candidates(ilog2, candidates);
}
trace!(
"It took {:?} to NetworkDiscovery::handle get closest query",
now.elapsed()
);
}
fn get_candidates(
&mut self,
non_full_non_empty_buckets: Vec<u32>,
full_buckets: Vec<u32>,
round_robin_index: usize,
) -> Vec<NetworkAddress> {
self.try_refresh_candidates();
let mut targets = vec![NetworkAddress::from(self.self_peer_id)];
targets.extend(
non_full_non_empty_buckets
.iter()
.filter_map(|ilog2| {
if let Some(candidates) = self.candidates.get(ilog2) {
let index = round_robin_index % candidates.len();
candidates.get(index).cloned()
} else {
None
}
})
.collect::<Vec<_>>(),
);
for (ilog2, candidates) in self.candidates.iter() {
if targets.len() >= 10 {
break;
}
if non_full_non_empty_buckets.contains(ilog2) || full_buckets.contains(ilog2) {
continue;
}
let index = round_robin_index % candidates.len();
if let Some(candidate) = candidates.get(index).cloned() {
targets.push(candidate);
}
}
targets
}
fn try_refresh_candidates(&mut self) {
let candidates_vec = Self::generate_candidates(&self.self_key, GENERATION_ATTEMPTS);
for (ilog2, candidates) in candidates_vec {
self.insert_candidates(ilog2, candidates);
}
}
fn insert_candidates(&mut self, ilog2: u32, new_candidates: Vec<NetworkAddress>) {
match self.candidates.entry(ilog2) {
Entry::Occupied(mut entry) => {
let existing_candidates = entry.get_mut();
let new_candidates: Vec<_> = new_candidates
.into_iter()
.filter(|candidate| !existing_candidates.contains(candidate))
.collect();
existing_candidates.extend(new_candidates);
let excess = existing_candidates
.len()
.saturating_sub(MAX_PEERS_PER_BUCKET);
if excess > 0 {
existing_candidates.drain(..excess);
}
}
Entry::Vacant(entry) => {
entry.insert(new_candidates);
}
}
}
fn generate_candidates(
self_key: &KBucketKey<PeerId>,
num_to_generate: usize,
) -> BTreeMap<u32, Vec<NetworkAddress>> {
(0..num_to_generate)
.into_par_iter()
.filter_map(|_| {
let candidate = NetworkAddress::from(PeerId::random());
let candidate_key = candidate.as_kbucket_key();
let ilog2 = candidate_key.distance(&self_key).ilog2()?;
Some((ilog2, candidate))
})
.fold(
BTreeMap::new,
|mut acc: BTreeMap<u32, Vec<NetworkAddress>>, (ilog2, candidate)| {
acc.entry(ilog2).or_default().push(candidate);
acc
},
)
.reduce(
BTreeMap::new,
|mut acc: BTreeMap<u32, Vec<NetworkAddress>>, map| {
for (ilog2, candidates) in map {
let entry = acc.entry(ilog2).or_default();
for candidate in candidates {
if entry.len() < MAX_PEERS_PER_BUCKET {
entry.push(candidate);
} else {
break;
}
}
}
acc
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_scaled_interval() {
let test_cases = vec![
(0, 60.0),
(50, 80.0),
(100, 120.0),
(150, 160.0),
(200, 230.0),
(220, 260.0),
(250, 320.0),
(300, 440.0),
(350, 626.0),
(400, 860.0),
(425, 1040.0),
(449, 1200.0),
(1000, 1200.0),
];
for (peers, expected_secs) in test_cases {
let interval = NetworkDiscovery::scaled_duration(peers);
let actual_secs = interval.as_secs_f64();
let tolerance = 0.15 * expected_secs;
assert!(
(actual_secs - expected_secs).abs() < tolerance,
"For {peers} peers, expected duration {expected_secs:.2}s but got {actual_secs:.2}s",
);
println!("Peers: {peers}, Expected: {expected_secs:.2}s, Actual: {actual_secs:.2}s",);
}
}
}