use crate::{
bootstrap::{InitialBootstrap, InitialBootstrapTrigger},
circular_vec::CircularVec,
driver::NodeBehaviour,
error::{NetworkError, Result},
event::NetworkEvent,
external_address::ExternalAddressManager,
network_discovery::NetworkDiscovery,
record_store::{NodeRecordStore, NodeRecordStoreConfig},
relay_manager::RelayManager,
replication_fetcher::ReplicationFetcher,
time::Instant,
transport, Network, SwarmDriver, CLOSE_GROUP_SIZE,
};
#[cfg(feature = "open-metrics")]
use crate::{
metrics::service::run_metrics_server, metrics::NetworkMetricsRecorder, MetricsRegistries,
};
use ant_bootstrap::BootstrapCacheStore;
use ant_protocol::{
version::{get_network_id_str, IDENTIFY_PROTOCOL_STR, REQ_RESPONSE_VERSION_STR},
NetworkAddress, PrettyPrintKBucketKey,
};
use futures::future::Either;
use libp2p::Transport as _;
use libp2p::{core::muxing::StreamMuxerBox, relay};
use libp2p::{
identity::Keypair,
kad,
multiaddr::Protocol,
request_response::{self, Config as RequestResponseConfig, ProtocolSupport},
swarm::{StreamProtocol, Swarm},
Multiaddr, PeerId,
};
#[cfg(feature = "open-metrics")]
use prometheus_client::metrics::info::Info;
use rand::Rng;
use std::{
convert::TryInto,
fmt::Debug,
fs,
io::{Read, Write},
net::SocketAddr,
num::NonZeroUsize,
path::PathBuf,
time::Duration,
};
use tokio::sync::mpsc;
const REQUEST_TIMEOUT_DEFAULT_S: Duration = Duration::from_secs(30);
const CONNECTION_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
const RESEND_IDENTIFY_INVERVAL: Duration = Duration::from_secs(3600);
const NETWORKING_CHANNEL_SIZE: usize = 10_000;
const KAD_QUERY_TIMEOUT_S: Duration = Duration::from_secs(10);
const REPLICATION_FACTOR: NonZeroUsize = match NonZeroUsize::new(CLOSE_GROUP_SIZE + 2) {
Some(v) => v,
None => panic!("CLOSE_GROUP_SIZE should not be zero"),
};
const KAD_STREAM_PROTOCOL_ID: StreamProtocol = StreamProtocol::new("/autonomi/kad/1.0.0");
pub const MAX_PACKET_SIZE: usize = 1024 * 1024 * 5;
const PERIODIC_KAD_BOOTSTRAP_INTERVAL_MAX_S: u64 = 21600;
#[derive(Debug)]
pub struct NetworkBuilder {
bootstrap_cache: Option<BootstrapCacheStore>,
concurrency_limit: Option<usize>,
initial_contacts: Vec<Multiaddr>,
keypair: Keypair,
listen_addr: Option<SocketAddr>,
local: bool,
#[cfg(feature = "open-metrics")]
metrics_registries: Option<MetricsRegistries>,
#[cfg(feature = "open-metrics")]
metrics_server_port: Option<u16>,
no_upnp: bool,
relay_client: bool,
request_timeout: Option<Duration>,
}
impl NetworkBuilder {
pub fn new(keypair: Keypair, local: bool, initial_contacts: Vec<Multiaddr>) -> Self {
Self {
bootstrap_cache: None,
concurrency_limit: None,
initial_contacts,
keypair,
listen_addr: None,
local,
#[cfg(feature = "open-metrics")]
metrics_registries: None,
#[cfg(feature = "open-metrics")]
metrics_server_port: None,
no_upnp: true,
relay_client: false,
request_timeout: None,
}
}
pub fn bootstrap_cache(&mut self, bootstrap_cache: BootstrapCacheStore) {
self.bootstrap_cache = Some(bootstrap_cache);
}
pub fn relay_client(&mut self, relay_client: bool) {
self.relay_client = relay_client;
}
pub fn listen_addr(&mut self, listen_addr: SocketAddr) {
self.listen_addr = Some(listen_addr);
}
pub fn request_timeout(&mut self, request_timeout: Duration) {
self.request_timeout = Some(request_timeout);
}
pub fn concurrency_limit(&mut self, concurrency_limit: usize) {
self.concurrency_limit = Some(concurrency_limit);
}
#[cfg(feature = "open-metrics")]
pub fn metrics_registries(&mut self, registries: MetricsRegistries) {
self.metrics_registries = Some(registries);
}
#[cfg(feature = "open-metrics")]
pub fn metrics_server_port(&mut self, port: Option<u16>) {
self.metrics_server_port = port;
}
pub fn no_upnp(&mut self, no_upnp: bool) {
self.no_upnp = no_upnp;
}
pub fn build_node(
self,
root_dir: PathBuf,
) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
let bootstrap_interval = rand::thread_rng().gen_range(
PERIODIC_KAD_BOOTSTRAP_INTERVAL_MAX_S / 2..PERIODIC_KAD_BOOTSTRAP_INTERVAL_MAX_S,
);
let mut kad_cfg = kad::Config::new(KAD_STREAM_PROTOCOL_ID);
let _ = kad_cfg
.set_kbucket_inserts(libp2p::kad::BucketInserts::Manual)
.set_replication_interval(None)
.set_publication_interval(None)
.set_max_packet_size(MAX_PACKET_SIZE)
.set_replication_factor(REPLICATION_FACTOR)
.set_query_timeout(KAD_QUERY_TIMEOUT_S)
.set_record_ttl(None)
.set_periodic_bootstrap_interval(Some(Duration::from_secs(bootstrap_interval)))
.set_provider_publication_interval(None);
let store_cfg = {
let storage_dir_path = root_dir.join("record_store");
check_and_wipe_storage_dir_if_necessary(
root_dir.clone(),
storage_dir_path.clone(),
get_network_id_str(),
)?;
if let Err(error) = std::fs::create_dir_all(&storage_dir_path) {
return Err(NetworkError::FailedToCreateRecordStoreDir {
path: storage_dir_path,
source: error,
});
}
let peer_id = PeerId::from(self.keypair.public());
let encryption_seed: [u8; 16] = peer_id
.to_bytes()
.get(..16)
.expect("Cann't get encryption_seed from keypair")
.try_into()
.expect("Cann't get 16 bytes from serialised key_pair");
NodeRecordStoreConfig {
max_value_bytes: MAX_PACKET_SIZE, storage_dir: storage_dir_path,
historic_quote_dir: root_dir.clone(),
encryption_seed,
..Default::default()
}
};
let listen_addr = self.listen_addr;
let (network, events_receiver, mut swarm_driver) =
self.build(kad_cfg, store_cfg, ProtocolSupport::Full);
let listen_socket_addr = listen_addr.ok_or(NetworkError::ListenAddressNotProvided)?;
let addr_quic = Multiaddr::from(listen_socket_addr.ip())
.with(Protocol::Udp(listen_socket_addr.port()))
.with(Protocol::QuicV1);
swarm_driver
.listen_on(addr_quic)
.expect("Multiaddr should be supported by our configured transports");
Ok((network, events_receiver, swarm_driver))
}
fn build(
self,
kad_cfg: kad::Config,
record_store_cfg: NodeRecordStoreConfig,
req_res_protocol: ProtocolSupport,
) -> (Network, mpsc::Receiver<NetworkEvent>, SwarmDriver) {
let identify_protocol_str = IDENTIFY_PROTOCOL_STR
.read()
.expect("Failed to obtain read lock for IDENTIFY_PROTOCOL_STR")
.clone();
let peer_id = PeerId::from(self.keypair.public());
info!(
"Process (PID: {}) with PeerId: {peer_id}",
std::process::id()
);
info!(
"Self PeerID {peer_id} is represented as kbucket_key {:?}",
PrettyPrintKBucketKey(NetworkAddress::from(peer_id).as_kbucket_key())
);
#[cfg(feature = "open-metrics")]
let mut metrics_registries = self.metrics_registries.unwrap_or_default();
#[cfg(feature = "open-metrics")]
let main_transport = transport::build_transport(&self.keypair, &mut metrics_registries);
#[cfg(not(feature = "open-metrics"))]
let main_transport = transport::build_transport(&self.keypair);
let transport = if !self.local {
debug!("Preventing non-global dials");
libp2p::core::transport::global_only::Transport::new(main_transport).boxed()
} else {
main_transport
};
let (relay_transport, relay_behaviour) =
libp2p::relay::client::new(self.keypair.public().to_peer_id());
let relay_transport = relay_transport
.upgrade(libp2p::core::upgrade::Version::V1Lazy)
.authenticate(
libp2p::noise::Config::new(&self.keypair)
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(libp2p::yamux::Config::default())
.or_transport(transport);
let transport = relay_transport
.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed();
#[cfg(feature = "open-metrics")]
let metrics_recorder = if let Some(port) = self.metrics_server_port {
let metrics_recorder = NetworkMetricsRecorder::new(&mut metrics_registries);
let metadata_sub_reg = metrics_registries
.metadata
.sub_registry_with_prefix("ant_networking");
metadata_sub_reg.register(
"peer_id",
"Identifier of a peer of the network",
Info::new(vec![("peer_id".to_string(), peer_id.to_string())]),
);
metadata_sub_reg.register(
"identify_protocol_str",
"The protocol version string that is used to connect to the correct network",
Info::new(vec![(
"identify_protocol_str".to_string(),
identify_protocol_str.clone(),
)]),
);
run_metrics_server(metrics_registries, port);
Some(metrics_recorder)
} else {
None
};
let request_response = {
let cfg = RequestResponseConfig::default()
.with_request_timeout(self.request_timeout.unwrap_or(REQUEST_TIMEOUT_DEFAULT_S));
let req_res_version_str = REQ_RESPONSE_VERSION_STR
.read()
.expect("Failed to obtain read lock for REQ_RESPONSE_VERSION_STR")
.clone();
info!("Building request response with {req_res_version_str:?}",);
request_response::cbor::Behaviour::new(
[(
StreamProtocol::try_from_owned(req_res_version_str)
.expect("StreamProtocol should start with a /"),
req_res_protocol,
)],
cfg,
)
};
let (network_event_sender, network_event_receiver) = mpsc::channel(NETWORKING_CHANNEL_SIZE);
let (network_swarm_cmd_sender, network_swarm_cmd_receiver) =
mpsc::channel(NETWORKING_CHANNEL_SIZE);
let (local_swarm_cmd_sender, local_swarm_cmd_receiver) =
mpsc::channel(NETWORKING_CHANNEL_SIZE);
let kademlia = {
#[cfg(feature = "open-metrics")]
let record_stored_metrics = metrics_recorder.as_ref().map(|r| r.records_stored.clone());
let node_record_store = NodeRecordStore::with_config(
peer_id,
record_store_cfg,
network_event_sender.clone(),
local_swarm_cmd_sender.clone(),
#[cfg(feature = "open-metrics")]
record_stored_metrics,
);
let store = node_record_store;
debug!("Using Kademlia with NodeRecordStore!");
kad::Behaviour::with_config(peer_id, store, kad_cfg)
};
let agent_version =
ant_protocol::version::construct_node_user_agent(env!("CARGO_PKG_VERSION").to_string());
info!("Building Identify with identify_protocol_str: {identify_protocol_str:?} and agent_version: {agent_version:?}");
let identify = {
let cfg = libp2p::identify::Config::new(identify_protocol_str, self.keypair.public())
.with_agent_version(agent_version)
.with_interval(RESEND_IDENTIFY_INVERVAL)
.with_hide_listen_addrs(true);
libp2p::identify::Behaviour::new(cfg)
};
let upnp = if !self.local && !self.no_upnp && !self.relay_client {
debug!("Enabling UPnP port opening behavior");
Some(libp2p::upnp::tokio::Behaviour::default())
} else {
None
}
.into();
let relay_server = if !self.relay_client {
let relay_server_cfg = relay::Config {
max_reservations: 128, max_circuits: 1024, max_circuits_per_peer: 256, circuit_src_rate_limiters: vec![], max_circuit_bytes: MAX_PACKET_SIZE as u64,
..Default::default()
};
Some(libp2p::relay::Behaviour::new(peer_id, relay_server_cfg))
} else {
None
}
.into();
let behaviour = NodeBehaviour {
blocklist: libp2p::allow_block_list::Behaviour::default(),
do_not_disturb: crate::behaviour::do_not_disturb::Behaviour::default(),
relay_client: relay_behaviour,
relay_server,
upnp,
request_response,
kademlia,
identify,
};
let swarm_config = libp2p::swarm::Config::with_tokio_executor()
.with_idle_connection_timeout(CONNECTION_KEEP_ALIVE_TIMEOUT);
let swarm = Swarm::new(transport, behaviour, peer_id, swarm_config);
let replication_fetcher = ReplicationFetcher::new(peer_id, network_event_sender.clone());
let relay_manager = if self.relay_client {
let relay_manager = RelayManager::new(peer_id);
#[cfg(feature = "open-metrics")]
let mut relay_manager = relay_manager;
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &metrics_recorder {
relay_manager.set_reservation_health_metrics(
metrics_recorder.relay_reservation_health.clone(),
);
}
Some(relay_manager)
} else {
info!("Relay manager is disabled for this node.");
None
};
let external_address_manager = if !self.local && !self.relay_client {
Some(ExternalAddressManager::new(peer_id))
} else {
info!("External address manager is disabled for this node.");
None
};
let is_upnp_enabled = swarm.behaviour().upnp.is_enabled();
let swarm_driver = SwarmDriver {
swarm,
self_peer_id: peer_id,
local: self.local,
is_relay_client: self.relay_client,
#[cfg(feature = "open-metrics")]
close_group: Vec::with_capacity(CLOSE_GROUP_SIZE),
peers_in_rt: 0,
initial_bootstrap: InitialBootstrap::new(self.initial_contacts),
initial_bootstrap_trigger: InitialBootstrapTrigger::new(is_upnp_enabled),
bootstrap_cache: self.bootstrap_cache,
dial_queue: Default::default(),
relay_manager,
connected_relay_clients: Default::default(),
external_address_manager,
replication_fetcher,
#[cfg(feature = "open-metrics")]
metrics_recorder,
network_cmd_sender: network_swarm_cmd_sender.clone(),
network_cmd_receiver: network_swarm_cmd_receiver,
local_cmd_sender: local_swarm_cmd_sender.clone(),
local_cmd_receiver: local_swarm_cmd_receiver,
event_sender: network_event_sender,
pending_get_closest_peers: Default::default(),
pending_requests: Default::default(),
dialed_peers: CircularVec::new(255),
network_discovery: NetworkDiscovery::new(&peer_id),
live_connected_peers: Default::default(),
latest_established_connection_ids: Default::default(),
handling_statistics: Default::default(),
handled_times: 0,
hard_disk_write_error: 0,
bad_nodes: Default::default(),
quotes_history: Default::default(),
replication_targets: Default::default(),
last_replication: None,
last_connection_pruning_time: Instant::now(),
peers_version: Default::default(),
};
let network = Network::new(
network_swarm_cmd_sender,
local_swarm_cmd_sender,
peer_id,
self.keypair,
);
(network, network_event_receiver, swarm_driver)
}
}
fn check_and_wipe_storage_dir_if_necessary(
root_dir: PathBuf,
storage_dir_path: PathBuf,
cur_version_str: String,
) -> Result<()> {
let mut prev_version_str = String::new();
let version_file = root_dir.join("network_key_version");
{
match fs::File::open(version_file.clone()) {
Ok(mut file) => {
file.read_to_string(&mut prev_version_str)?;
}
Err(err) => {
warn!("Failed in accessing version file {version_file:?}: {err:?}");
info!("Creating a new version file at {version_file:?}");
fs::File::create(version_file.clone())?;
}
}
}
if cur_version_str != prev_version_str {
warn!("Trying to wipe out storage dir {storage_dir_path:?}, as cur_version {cur_version_str:?} doesn't match prev_version {prev_version_str:?}");
let _ = fs::remove_dir_all(storage_dir_path);
let mut file = fs::OpenOptions::new()
.write(true)
.truncate(true)
.open(version_file.clone())?;
info!("Writing cur_version {cur_version_str:?} into version file at {version_file:?}");
file.write_all(cur_version_str.as_bytes())?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::check_and_wipe_storage_dir_if_necessary;
use std::{fs, io::Read};
#[tokio::test]
async fn version_file_update() {
let temp_dir = std::env::temp_dir();
let unique_dir_name = uuid::Uuid::new_v4().to_string();
let root_dir = temp_dir.join(unique_dir_name);
fs::create_dir_all(&root_dir).expect("Failed to create root directory");
let version_file = root_dir.join("network_key_version");
let storage_dir = root_dir.join("record_store");
let cur_version = uuid::Uuid::new_v4().to_string();
assert!(check_and_wipe_storage_dir_if_necessary(
root_dir.clone(),
storage_dir.clone(),
cur_version.clone()
)
.is_ok());
{
let mut content_str = String::new();
let mut file = fs::OpenOptions::new()
.read(true)
.open(version_file.clone())
.expect("Failed to open version file");
file.read_to_string(&mut content_str)
.expect("Failed to read from version file");
assert_eq!(content_str, cur_version);
drop(file);
}
fs::create_dir_all(&storage_dir).expect("Failed to create storage directory");
assert!(fs::metadata(storage_dir.clone()).is_ok());
let cur_version = uuid::Uuid::new_v4().to_string();
assert!(check_and_wipe_storage_dir_if_necessary(
root_dir.clone(),
storage_dir.clone(),
cur_version.clone()
)
.is_ok());
{
let mut content_str = String::new();
let mut file = fs::OpenOptions::new()
.read(true)
.open(version_file.clone())
.expect("Failed to open version file");
file.read_to_string(&mut content_str)
.expect("Failed to read from version file");
assert_eq!(content_str, cur_version);
drop(file);
}
assert!(fs::metadata(storage_dir.clone()).is_err());
}
}