#[cfg(feature = "open-metrics")]
use crate::metrics::NetworkMetricsRecorder;
use crate::{
bootstrap::{InitialBootstrap, InitialBootstrapTrigger, INITIAL_BOOTSTRAP_CHECK_INTERVAL},
circular_vec::CircularVec,
cmd::{LocalSwarmCmd, NetworkSwarmCmd},
driver::kad::U256,
error::Result,
event::{NetworkEvent, NodeEvent},
external_address::ExternalAddressManager,
log_markers::Marker,
network_discovery::{NetworkDiscovery, NETWORK_DISCOVER_INTERVAL},
relay_manager::RelayManager,
replication_fetcher::ReplicationFetcher,
time::{interval, spawn, Instant, Interval},
Addresses, NodeIssue, NodeRecordStore, CLOSE_GROUP_SIZE,
};
use ant_bootstrap::BootstrapCacheStore;
use ant_evm::PaymentQuote;
use ant_protocol::messages::ConnectionInfo;
use ant_protocol::{
messages::{Request, Response},
NetworkAddress,
};
use futures::StreamExt;
use libp2p::{
kad::{self, KBucketDistance as Distance, QueryId, K_VALUE},
request_response::OutboundRequestId,
swarm::{
dial_opts::{DialOpts, PeerCondition},
ConnectionId, Swarm,
},
Multiaddr, PeerId,
};
use libp2p::{
request_response,
swarm::{behaviour::toggle::Toggle, NetworkBehaviour, SwarmEvent},
};
use rand::Rng;
use std::collections::{btree_map::Entry, BTreeMap, HashMap, HashSet};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::Duration;
use tracing::warn;
pub(crate) type BadNodes = BTreeMap<PeerId, (Vec<(NodeIssue, Instant)>, bool)>;
pub(crate) const CLOSET_RECORD_CHECK_INTERVAL: Duration = Duration::from_secs(15);
pub(crate) const RELAY_MANAGER_RESERVATION_INTERVAL: Duration = Duration::from_secs(30);
const DIAL_QUEUE_CHECK_INTERVAL: Duration = Duration::from_secs(2);
pub(crate) enum PendingGetClosestType {
NetworkDiscovery,
FunctionCall(oneshot::Sender<Vec<(PeerId, Addresses)>>),
}
type PendingGetClosest = HashMap<QueryId, (PendingGetClosestType, Vec<(PeerId, Addresses)>)>;
impl From<std::convert::Infallible> for NodeEvent {
fn from(_: std::convert::Infallible) -> Self {
panic!("NodeBehaviour is not Infallible!")
}
}
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "NodeEvent")]
pub(super) struct NodeBehaviour {
pub(super) blocklist:
libp2p::allow_block_list::Behaviour<libp2p::allow_block_list::BlockedPeers>,
pub(super) do_not_disturb: crate::behaviour::do_not_disturb::Behaviour,
pub(super) identify: libp2p::identify::Behaviour,
pub(super) upnp: Toggle<libp2p::upnp::tokio::Behaviour>,
pub(super) relay_client: libp2p::relay::client::Behaviour,
pub(super) relay_server: Toggle<libp2p::relay::Behaviour>,
pub(super) kademlia: kad::Behaviour<NodeRecordStore>,
pub(super) request_response: request_response::cbor::Behaviour<Request, Response>,
}
pub struct SwarmDriver {
pub(crate) swarm: Swarm<NodeBehaviour>,
pub(crate) self_peer_id: PeerId,
pub(crate) local: bool,
pub(crate) is_relay_client: bool,
#[cfg(feature = "open-metrics")]
pub(crate) close_group: Vec<PeerId>,
pub(crate) peers_in_rt: usize,
pub(crate) initial_bootstrap: InitialBootstrap,
pub(crate) initial_bootstrap_trigger: InitialBootstrapTrigger,
pub(crate) network_discovery: NetworkDiscovery,
pub(crate) bootstrap_cache: Option<BootstrapCacheStore>,
pub(crate) external_address_manager: Option<ExternalAddressManager>,
pub(crate) relay_manager: Option<RelayManager>,
pub(crate) connected_relay_clients: HashSet<PeerId>,
pub(crate) replication_fetcher: ReplicationFetcher,
#[cfg(feature = "open-metrics")]
pub(crate) metrics_recorder: Option<NetworkMetricsRecorder>,
pub(crate) network_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
pub(crate) local_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
pub(crate) local_cmd_receiver: mpsc::Receiver<LocalSwarmCmd>,
pub(crate) network_cmd_receiver: mpsc::Receiver<NetworkSwarmCmd>,
pub(crate) event_sender: mpsc::Sender<NetworkEvent>,
pub(crate) pending_get_closest_peers: PendingGetClosest,
#[allow(clippy::type_complexity)]
pub(crate) pending_requests: HashMap<
OutboundRequestId,
Option<oneshot::Sender<Result<(Response, Option<ConnectionInfo>)>>>,
>,
pub(crate) dialed_peers: CircularVec<PeerId>,
pub(crate) dial_queue: HashMap<PeerId, (Addresses, Instant, usize)>,
pub(crate) live_connected_peers: BTreeMap<ConnectionId, (PeerId, Multiaddr, Instant)>,
pub(crate) latest_established_connection_ids: HashMap<usize, (Multiaddr, Instant)>,
pub(crate) handling_statistics: BTreeMap<String, Vec<Duration>>,
pub(crate) handled_times: usize,
pub(crate) hard_disk_write_error: usize,
pub(crate) bad_nodes: BadNodes,
pub(crate) quotes_history: BTreeMap<PeerId, PaymentQuote>,
pub(crate) replication_targets: BTreeMap<PeerId, Instant>,
pub(crate) last_replication: Option<Instant>,
pub(crate) last_connection_pruning_time: Instant,
pub(crate) peers_version: HashMap<PeerId, String>,
}
impl SwarmDriver {
pub async fn run(mut self, mut shutdown_rx: watch::Receiver<bool>) {
let mut network_discover_interval = interval(NETWORK_DISCOVER_INTERVAL);
let mut set_farthest_record_interval = interval(CLOSET_RECORD_CHECK_INTERVAL);
let mut relay_manager_reservation_interval = interval(RELAY_MANAGER_RESERVATION_INTERVAL);
let mut initial_bootstrap_trigger_check_interval =
Some(interval(INITIAL_BOOTSTRAP_CHECK_INTERVAL));
let mut dial_queue_check_interval = interval(DIAL_QUEUE_CHECK_INTERVAL);
dial_queue_check_interval.tick().await;
let mut bootstrap_cache_save_interval = self.bootstrap_cache.as_ref().and_then(|cache| {
if cache.config().disable_cache_writing {
None
} else {
let duration =
Self::duration_with_variance(cache.config().min_cache_save_duration, 10);
Some(interval(duration))
}
});
if let Some(interval) = bootstrap_cache_save_interval.as_mut() {
interval.tick().await; info!(
"Bootstrap cache save interval is set to {:?}",
interval.period()
);
}
let mut round_robin_index = 0;
let mut previous_incoming_connection_error_event = None;
loop {
tokio::select! {
biased;
local_cmd = self.local_cmd_receiver.recv() => match local_cmd {
Some(cmd) => {
let start = Instant::now();
let cmd_string = format!("{cmd:?}");
if let Err(err) = self.handle_local_cmd(cmd) {
warn!("Error while handling local cmd: {err}");
}
trace!("LocalCmd handled in {:?}: {cmd_string:?}", start.elapsed());
},
None => continue,
},
some_cmd = self.network_cmd_receiver.recv() => match some_cmd {
Some(cmd) => {
let start = Instant::now();
let cmd_string = format!("{cmd:?}");
if let Err(err) = self.handle_network_cmd(cmd) {
warn!("Error while handling cmd: {err}");
}
trace!("SwarmCmd handled in {:?}: {cmd_string:?}", start.elapsed());
},
None => continue,
},
result = shutdown_rx.changed() => {
if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
info!("Shutdown signal received or sender dropped. Exiting swarm driver loop.");
break;
}
},
swarm_event = self.swarm.select_next_some() => {
if let Some(previous_event) = previous_incoming_connection_error_event.take() {
if let Err(err) = self.handle_swarm_events(swarm_event) {
warn!("Error while handling swarm event: {err}");
}
if let Err(err) = self.handle_swarm_events(previous_event) {
warn!("Error while handling swarm event: {err}");
}
continue;
}
if matches!(swarm_event, SwarmEvent::IncomingConnectionError {..}) {
previous_incoming_connection_error_event = Some(swarm_event);
continue;
}
if let Err(err) = self.handle_swarm_events(swarm_event) {
warn!("Error while handling swarm event: {err}");
}
},
_ = dial_queue_check_interval.tick() => {
let now = Instant::now();
let mut to_remove = vec![];
for (peer_id, (addrs, wait_time, _resets)) in self.dial_queue.iter() {
if now > *wait_time {
info!("Dialing peer {peer_id:?} from dial queue with addresses {addrs:?}");
to_remove.push(*peer_id);
if let Err(err) = self.swarm.dial(
DialOpts::peer_id(*peer_id)
.condition(PeerCondition::NotDialing)
.addresses(addrs.0.clone())
.build(),
) {
warn!(%peer_id, ?addrs, "dialing error: {err:?}");
}
}
}
for peer_id in to_remove.iter() {
self.dial_queue.remove(peer_id);
}
},
Some(()) = Self::conditional_interval(&mut initial_bootstrap_trigger_check_interval) => {
if self.initial_bootstrap_trigger.should_trigger_initial_bootstrap() {
info!("Triggering initial bootstrap process. This is a one-time operation.");
self.initial_bootstrap.trigger_bootstrapping_process(&mut self.swarm, self.peers_in_rt);
initial_bootstrap_trigger_check_interval = None;
}
}
_ = network_discover_interval.tick() => {
round_robin_index += 1;
if round_robin_index > 255 {
round_robin_index = 0;
}
if let Some(new_interval) = self.run_network_discover_continuously(network_discover_interval.period(), round_robin_index).await {
network_discover_interval = new_interval;
}
let mut peers_in_non_full_buckets = vec![];
for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
let num_entires = kbucket.num_entries();
if num_entires >= K_VALUE.get() {
continue;
} else {
let peers_in_kbucket = kbucket
.iter()
.map(|peer_entry| peer_entry.node.key.into_preimage())
.collect::<Vec<PeerId>>();
peers_in_non_full_buckets.extend(peers_in_kbucket);
}
}
self.peers_version
.retain(|peer_id, _version| peers_in_non_full_buckets.contains(peer_id));
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &self.metrics_recorder {
metrics_recorder.update_node_versions(&self.peers_version);
}
}
_ = set_farthest_record_interval.tick() => {
let kbucket_status = self.get_kbuckets_status();
self.update_on_kbucket_status(&kbucket_status);
if kbucket_status.estimated_network_size <= CLOSE_GROUP_SIZE {
info!("Not enough estimated network size {}, with {} peers_in_non_full_buckets and {} num_of_full_buckets.",
kbucket_status.estimated_network_size,
kbucket_status.peers_in_non_full_buckets,
kbucket_status.num_of_full_buckets);
continue;
}
let density = U256::MAX / U256::from(kbucket_status.estimated_network_size);
let density_distance = density * U256::from(CLOSE_GROUP_SIZE);
let closest_k_peers = self.get_closest_k_local_peers_to_self();
if closest_k_peers.len() <= CLOSE_GROUP_SIZE + 2 {
continue;
}
let self_addr = NetworkAddress::from(self.self_peer_id);
let close_peers_distance = self_addr.distance(&NetworkAddress::from(closest_k_peers[CLOSE_GROUP_SIZE + 1].0));
let distance = std::cmp::max(Distance(density_distance), close_peers_distance);
info!("Set responsible range to {distance:?}({:?})", distance.ilog2());
self.swarm.behaviour_mut().kademlia.store_mut().set_responsible_distance_range(distance);
self.replication_fetcher.set_replication_distance_range(distance);
}
_ = relay_manager_reservation_interval.tick() => {
if let Some(relay_manager) = &mut self.relay_manager {
relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes)
}
},
Some(()) = Self::conditional_interval(&mut bootstrap_cache_save_interval) => {
let Some(current_interval) = bootstrap_cache_save_interval.as_mut() else {
continue;
};
let start = Instant::now();
if self.sync_and_flush_cache().is_err() {
warn!("Failed to sync and flush bootstrap cache, skipping this interval");
continue;
}
let Some(bootstrap_config) = self.bootstrap_cache.as_ref().map(|cache|cache.config()) else {
continue;
};
if current_interval.period() >= bootstrap_config.max_cache_save_duration {
continue;
}
let max_cache_save_duration =
Self::duration_with_variance(bootstrap_config.max_cache_save_duration, 1);
let scaled = current_interval.period().as_secs().saturating_mul(bootstrap_config.cache_save_scaling_factor);
let new_duration = Duration::from_secs(std::cmp::min(scaled, max_cache_save_duration.as_secs()));
info!("Scaling up the bootstrap cache save interval to {new_duration:?}");
*current_interval = interval(new_duration);
current_interval.tick().await;
trace!("Bootstrap cache synced in {:?}", start.elapsed());
},
}
}
}
pub(crate) fn queue_network_swarm_cmd(&self, event: NetworkSwarmCmd) {
let event_sender = self.network_cmd_sender.clone();
let capacity = event_sender.capacity();
let _handle = spawn(async move {
if capacity == 0 {
warn!(
"NetworkSwarmCmd channel is full. Await capacity to send: {:?}",
event
);
}
if let Err(error) = event_sender.send(event).await {
error!("SwarmDriver failed to send event: {}", error);
}
});
}
pub(crate) fn send_event(&self, event: NetworkEvent) {
let event_sender = self.event_sender.clone();
let capacity = event_sender.capacity();
let _handle = spawn(async move {
if capacity == 0 {
warn!(
"NetworkEvent channel is full. Await capacity to send: {:?}",
event
);
}
if let Err(error) = event_sender.send(event).await {
error!("SwarmDriver failed to send event: {}", error);
}
});
}
pub(crate) fn get_closest_k_local_peers_to_self(&mut self) -> Vec<(PeerId, Addresses)> {
self.get_closest_k_local_peers_to_target(&NetworkAddress::from(self.self_peer_id), true)
}
pub(crate) fn get_closest_k_local_peers_to_target(
&mut self,
target: &NetworkAddress,
include_self: bool,
) -> Vec<(PeerId, Addresses)> {
let num_peers = if include_self {
K_VALUE.get() - 1
} else {
K_VALUE.get()
};
let peer_ids: Vec<_> = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&target.as_kbucket_key())
.map(|key| key.into_preimage())
.take(num_peers)
.collect();
if include_self {
std::iter::once((self.self_peer_id, Default::default()))
.chain(self.collect_peers_info(peer_ids))
.collect()
} else {
self.collect_peers_info(peer_ids)
}
}
fn collect_peers_info(&mut self, peers: Vec<PeerId>) -> Vec<(PeerId, Addresses)> {
let mut peers_info = vec![];
for peer_id in peers {
if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(peer_id) {
if let Some(entry) = kbucket
.iter()
.find(|entry| entry.node.key.preimage() == &peer_id)
{
peers_info.push((peer_id, Addresses(entry.node.value.clone().into_vec())));
}
}
}
peers_info
}
pub(crate) fn log_handling(&mut self, handle_string: String, handle_time: Duration) {
if handle_string.is_empty() {
return;
}
match self.handling_statistics.entry(handle_string) {
Entry::Occupied(mut entry) => {
let records = entry.get_mut();
records.push(handle_time);
}
Entry::Vacant(entry) => {
entry.insert(vec![handle_time]);
}
}
self.handled_times += 1;
if self.handled_times >= 100 {
self.handled_times = 0;
let mut stats: Vec<(String, usize, Duration)> = self
.handling_statistics
.iter()
.map(|(kind, durations)| {
let count = durations.len();
let avg_time = durations.iter().sum::<Duration>() / count as u32;
(kind.clone(), count, avg_time)
})
.collect();
stats.sort_by(|a, b| b.1.cmp(&a.1));
trace!("SwarmDriver Handling Statistics: {:?}", stats);
self.handling_statistics.clear();
}
}
pub(crate) fn record_metrics(&self, marker: Marker) {
marker.log();
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = self.metrics_recorder.as_ref() {
metrics_recorder.record_from_marker(marker)
}
}
#[cfg(feature = "open-metrics")]
pub(crate) fn record_change_in_close_group(&self, new_close_group: Vec<PeerId>) {
if let Some(metrics_recorder) = self.metrics_recorder.as_ref() {
metrics_recorder.record_change_in_close_group(new_close_group);
}
}
pub(crate) fn listen_on(&mut self, addr: Multiaddr) -> Result<()> {
let id = self.swarm.listen_on(addr.clone())?;
info!("Listening on {id:?} with addr: {addr:?}");
Ok(())
}
pub(crate) fn sync_and_flush_cache(&mut self) -> Result<()> {
if let Some(bootstrap_cache) = self.bootstrap_cache.as_mut() {
let config = bootstrap_cache.config().clone();
let mut old_cache = bootstrap_cache.clone();
if let Ok(new) = BootstrapCacheStore::new(config) {
self.bootstrap_cache = Some(new);
crate::time::spawn(async move {
if let Err(err) = old_cache.sync_and_flush_to_disk() {
error!("Failed to save bootstrap cache: {err}");
}
});
}
}
Ok(())
}
fn duration_with_variance(duration: Duration, variance: u32) -> Duration {
let variance = duration.as_secs() as f64 * (variance as f64 / 100.0);
let random_adjustment =
Duration::from_secs(rand::thread_rng().gen_range(0..variance as u64));
if random_adjustment.as_secs() % 2 == 0 {
duration - random_adjustment
} else {
duration + random_adjustment
}
}
async fn conditional_interval(i: &mut Option<Interval>) -> Option<()> {
match i {
Some(i) => {
i.tick().await;
Some(())
}
None => None,
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
#[tokio::test]
async fn test_duration_variance_fn() {
let duration = Duration::from_secs(150);
let variance = 10;
let expected_variance = Duration::from_secs(15); for _ in 0..10000 {
let new_duration = crate::SwarmDriver::duration_with_variance(duration, variance);
println!("new_duration: {new_duration:?}");
if new_duration < duration - expected_variance
|| new_duration > duration + expected_variance
{
panic!("new_duration: {new_duration:?} is not within the expected range",);
}
}
}
}