use crate::{
bootstrap::{ContinuousBootstrap, BOOTSTRAP_INTERVAL},
circular_vec::CircularVec,
cmd::{LocalSwarmCmd, NetworkSwarmCmd},
error::{NetworkError, Result},
event::{NetworkEvent, NodeEvent},
external_address::ExternalAddressManager,
log_markers::Marker,
multiaddr_pop_p2p,
network_discovery::NetworkDiscovery,
record_store::{ClientRecordStore, NodeRecordStore, NodeRecordStoreConfig},
record_store_api::UnifiedRecordStore,
relay_manager::RelayManager,
replication_fetcher::ReplicationFetcher,
target_arch::{interval, spawn, Instant},
GetRecordError, Network, CLOSE_GROUP_SIZE,
};
#[cfg(feature = "open-metrics")]
use crate::{
metrics::service::run_metrics_server, metrics::NetworkMetricsRecorder, MetricsRegistries,
};
use crate::{transport, NodeIssue};
use futures::future::Either;
use futures::StreamExt;
#[cfg(feature = "local")]
use libp2p::mdns;
use libp2p::{core::muxing::StreamMuxerBox, relay};
use libp2p::{
identity::Keypair,
kad::{self, QueryId, Quorum, Record, RecordKey, K_VALUE},
multiaddr::Protocol,
request_response::{self, Config as RequestResponseConfig, OutboundRequestId, ProtocolSupport},
swarm::{
dial_opts::{DialOpts, PeerCondition},
ConnectionId, DialError, NetworkBehaviour, StreamProtocol, Swarm,
},
Multiaddr, PeerId,
};
use libp2p::{swarm::SwarmEvent, Transport as _};
#[cfg(feature = "open-metrics")]
use prometheus_client::metrics::info::Info;
use rand::Rng;
use sn_evm::PaymentQuote;
use sn_protocol::{
messages::{ChunkProof, Nonce, Request, Response},
storage::{try_deserialize_record, RetryStrategy},
version::{
get_key_version_str, IDENTIFY_CLIENT_VERSION_STR, IDENTIFY_NODE_VERSION_STR,
IDENTIFY_PROTOCOL_STR, REQ_RESPONSE_VERSION_STR,
},
NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey,
};
use sn_registers::SignedRegister;
use std::{
collections::{btree_map::Entry, BTreeMap, HashMap, HashSet},
convert::TryInto,
fmt::Debug,
fs,
io::{Read, Write},
net::{IpAddr, SocketAddr},
num::NonZeroUsize,
path::PathBuf,
};
use tokio::sync::{mpsc, oneshot};
use tokio::time::Duration;
use tracing::warn;
use xor_name::XorName;
pub(crate) const CLOSET_RECORD_CHECK_INTERVAL: Duration = Duration::from_secs(15);
pub(crate) const RELAY_MANAGER_RESERVATION_INTERVAL: Duration = Duration::from_secs(30);
const KAD_STREAM_PROTOCOL_ID: StreamProtocol = StreamProtocol::new("/autonomi/kad/1.0.0");
pub(crate) enum PendingGetClosestType {
NetworkDiscovery,
FunctionCall(oneshot::Sender<Vec<PeerId>>),
}
type PendingGetClosest = HashMap<QueryId, (PendingGetClosestType, Vec<PeerId>)>;
type GetRecordResultMap = HashMap<XorName, (Record, HashSet<PeerId>)>;
pub(crate) type PendingGetRecord = HashMap<
QueryId,
(
RecordKey, Vec<oneshot::Sender<std::result::Result<Record, GetRecordError>>>, GetRecordResultMap,
GetRecordCfg,
),
>;
pub(crate) type BadNodes = BTreeMap<PeerId, (Vec<(NodeIssue, Instant)>, bool)>;
pub const MAX_PACKET_SIZE: usize = 1024 * 1024 * 5; const REQUEST_TIMEOUT_DEFAULT_S: Duration = Duration::from_secs(30);
const CONNECTION_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
const RESEND_IDENTIFY_INVERVAL: Duration = Duration::from_secs(3600);
const NETWORKING_CHANNEL_SIZE: usize = 10_000;
const KAD_QUERY_TIMEOUT_S: Duration = Duration::from_secs(10);
const PERIODIC_KAD_BOOTSTRAP_INTERVAL_MAX_S: u64 = 21600;
const REPLICATION_FACTOR: NonZeroUsize = match NonZeroUsize::new(CLOSE_GROUP_SIZE) {
Some(v) => v,
None => panic!("CLOSE_GROUP_SIZE should not be zero"),
};
#[derive(Clone)]
pub struct GetRecordCfg {
pub get_quorum: Quorum,
pub retry_strategy: Option<RetryStrategy>,
pub target_record: Option<Record>,
pub expected_holders: HashSet<PeerId>,
pub is_register: bool,
}
impl GetRecordCfg {
pub fn does_target_match(&self, record: &Record) -> bool {
if let Some(ref target_record) = self.target_record {
if self.is_register {
let pretty_key = PrettyPrintRecordKey::from(&target_record.key);
let fetched_register = match try_deserialize_record::<SignedRegister>(record) {
Ok(fetched_register) => fetched_register,
Err(err) => {
error!("When try to deserialize register from fetched record {pretty_key:?}, have error {err:?}");
return false;
}
};
let target_register = match try_deserialize_record::<SignedRegister>(target_record)
{
Ok(target_register) => target_register,
Err(err) => {
error!("When try to deserialize register from target record {pretty_key:?}, have error {err:?}");
return false;
}
};
target_register.base_register() == fetched_register.base_register()
&& target_register.ops() == fetched_register.ops()
} else {
target_record == record
}
} else {
true
}
}
}
impl Debug for GetRecordCfg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut f = f.debug_struct("GetRecordCfg");
f.field("get_quorum", &self.get_quorum)
.field("retry_strategy", &self.retry_strategy);
match &self.target_record {
Some(record) => {
let pretty_key = PrettyPrintRecordKey::from(&record.key);
f.field("target_record", &pretty_key);
}
None => {
f.field("target_record", &"None");
}
};
f.field("expected_holders", &self.expected_holders).finish()
}
}
#[derive(Debug, Clone)]
pub struct PutRecordCfg {
pub put_quorum: Quorum,
pub retry_strategy: Option<RetryStrategy>,
pub use_put_record_to: Option<Vec<PeerId>>,
pub verification: Option<(VerificationKind, GetRecordCfg)>,
}
#[derive(Debug, Clone)]
pub enum VerificationKind {
Network,
Crdt,
ChunkProof {
expected_proof: ChunkProof,
nonce: Nonce,
},
}
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "NodeEvent")]
pub(super) struct NodeBehaviour {
pub(super) blocklist:
libp2p::allow_block_list::Behaviour<libp2p::allow_block_list::BlockedPeers>,
pub(super) identify: libp2p::identify::Behaviour,
#[cfg(feature = "local")]
pub(super) mdns: mdns::tokio::Behaviour,
#[cfg(feature = "upnp")]
pub(super) upnp: libp2p::swarm::behaviour::toggle::Toggle<libp2p::upnp::tokio::Behaviour>,
pub(super) relay_client: libp2p::relay::client::Behaviour,
pub(super) relay_server: libp2p::relay::Behaviour,
pub(super) kademlia: kad::Behaviour<UnifiedRecordStore>,
pub(super) request_response: request_response::cbor::Behaviour<Request, Response>,
}
#[derive(Debug)]
pub struct NetworkBuilder {
is_behind_home_network: bool,
keypair: Keypair,
local: bool,
listen_addr: Option<SocketAddr>,
request_timeout: Option<Duration>,
concurrency_limit: Option<usize>,
initial_peers: Vec<Multiaddr>,
#[cfg(feature = "open-metrics")]
metrics_registries: Option<MetricsRegistries>,
#[cfg(feature = "open-metrics")]
metrics_server_port: Option<u16>,
#[cfg(feature = "upnp")]
upnp: bool,
}
impl NetworkBuilder {
pub fn new(keypair: Keypair, local: bool) -> Self {
Self {
is_behind_home_network: false,
keypair,
local,
listen_addr: None,
request_timeout: None,
concurrency_limit: None,
initial_peers: Default::default(),
#[cfg(feature = "open-metrics")]
metrics_registries: None,
#[cfg(feature = "open-metrics")]
metrics_server_port: None,
#[cfg(feature = "upnp")]
upnp: false,
}
}
pub fn is_behind_home_network(&mut self, enable: bool) {
self.is_behind_home_network = enable;
}
pub fn listen_addr(&mut self, listen_addr: SocketAddr) {
self.listen_addr = Some(listen_addr);
}
pub fn request_timeout(&mut self, request_timeout: Duration) {
self.request_timeout = Some(request_timeout);
}
pub fn concurrency_limit(&mut self, concurrency_limit: usize) {
self.concurrency_limit = Some(concurrency_limit);
}
pub fn initial_peers(&mut self, initial_peers: Vec<Multiaddr>) {
self.initial_peers = initial_peers;
}
#[cfg(feature = "open-metrics")]
pub fn metrics_registries(&mut self, registries: MetricsRegistries) {
self.metrics_registries = Some(registries);
}
#[cfg(feature = "open-metrics")]
pub fn metrics_server_port(&mut self, port: Option<u16>) {
self.metrics_server_port = port;
}
#[cfg(feature = "upnp")]
pub fn upnp(&mut self, upnp: bool) {
self.upnp = upnp;
}
pub fn build_node(
self,
root_dir: PathBuf,
) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
let bootstrap_interval = rand::thread_rng().gen_range(
PERIODIC_KAD_BOOTSTRAP_INTERVAL_MAX_S / 2..PERIODIC_KAD_BOOTSTRAP_INTERVAL_MAX_S,
);
let mut kad_cfg = kad::Config::new(KAD_STREAM_PROTOCOL_ID);
let _ = kad_cfg
.set_kbucket_inserts(libp2p::kad::BucketInserts::Manual)
.set_replication_interval(None)
.set_publication_interval(None)
.set_max_packet_size(MAX_PACKET_SIZE)
.set_replication_factor(REPLICATION_FACTOR)
.set_query_timeout(KAD_QUERY_TIMEOUT_S)
.disjoint_query_paths(true)
.set_record_ttl(None)
.set_replication_factor(REPLICATION_FACTOR)
.set_periodic_bootstrap_interval(Some(Duration::from_secs(bootstrap_interval)))
.set_provider_publication_interval(None);
let store_cfg = {
let storage_dir_path = root_dir.join("record_store");
check_and_wipe_storage_dir_if_necessary(
root_dir.clone(),
storage_dir_path.clone(),
get_key_version_str(),
)?;
if let Err(error) = std::fs::create_dir_all(&storage_dir_path) {
return Err(NetworkError::FailedToCreateRecordStoreDir {
path: storage_dir_path,
source: error,
});
}
let peer_id = PeerId::from(self.keypair.public());
let encryption_seed: [u8; 16] = peer_id
.to_bytes()
.get(..16)
.expect("Cann't get encryption_seed from keypair")
.try_into()
.expect("Cann't get 16 bytes from serialised key_pair");
NodeRecordStoreConfig {
max_value_bytes: MAX_PACKET_SIZE, storage_dir: storage_dir_path,
historic_quote_dir: root_dir.clone(),
encryption_seed,
..Default::default()
}
};
let listen_addr = self.listen_addr;
#[cfg(feature = "upnp")]
let upnp = self.upnp;
let (network, events_receiver, mut swarm_driver) = self.build(
kad_cfg,
Some(store_cfg),
false,
ProtocolSupport::Full,
IDENTIFY_NODE_VERSION_STR.to_string(),
#[cfg(feature = "upnp")]
upnp,
)?;
let listen_socket_addr = listen_addr.ok_or(NetworkError::ListenAddressNotProvided)?;
let addr_quic = Multiaddr::from(listen_socket_addr.ip())
.with(Protocol::Udp(listen_socket_addr.port()))
.with(Protocol::QuicV1);
swarm_driver
.listen_on(addr_quic)
.expect("Multiaddr should be supported by our configured transports");
#[cfg(any(feature = "websockets", target_arch = "wasm32"))]
{
let addr_ws = Multiaddr::from(listen_socket_addr.ip())
.with(Protocol::Tcp(listen_socket_addr.port()))
.with(Protocol::Ws("/".into()));
swarm_driver
.listen_on(addr_ws)
.expect("Multiaddr should be supported by our configured transports");
}
Ok((network, events_receiver, swarm_driver))
}
pub fn build_client(self) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
let mut kad_cfg = kad::Config::new(KAD_STREAM_PROTOCOL_ID); let _ = kad_cfg
.set_kbucket_inserts(libp2p::kad::BucketInserts::Manual)
.set_max_packet_size(MAX_PACKET_SIZE)
.set_replication_factor(REPLICATION_FACTOR)
.disjoint_query_paths(true)
.set_replication_factor(REPLICATION_FACTOR);
let (network, net_event_recv, driver) = self.build(
kad_cfg,
None,
true,
ProtocolSupport::Outbound,
IDENTIFY_CLIENT_VERSION_STR.to_string(),
#[cfg(feature = "upnp")]
false,
)?;
Ok((network, net_event_recv, driver))
}
fn build(
self,
kad_cfg: kad::Config,
record_store_cfg: Option<NodeRecordStoreConfig>,
is_client: bool,
req_res_protocol: ProtocolSupport,
identify_version: String,
#[cfg(feature = "upnp")] upnp: bool,
) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
let peer_id = PeerId::from(self.keypair.public());
#[cfg(not(target_arch = "wasm32"))]
info!(
"Process (PID: {}) with PeerId: {peer_id}",
std::process::id()
);
info!(
"Self PeerID {peer_id} is represented as kbucket_key {:?}",
PrettyPrintKBucketKey(NetworkAddress::from_peer(peer_id).as_kbucket_key())
);
#[cfg(feature = "open-metrics")]
let mut metrics_registries = self.metrics_registries.unwrap_or_default();
#[cfg(feature = "open-metrics")]
let main_transport = transport::build_transport(&self.keypair, &mut metrics_registries);
#[cfg(not(feature = "open-metrics"))]
let main_transport = transport::build_transport(&self.keypair);
let transport = if !self.local {
debug!("Preventing non-global dials");
libp2p::core::transport::global_only::Transport::new(main_transport).boxed()
} else {
main_transport
};
let (relay_transport, relay_behaviour) =
libp2p::relay::client::new(self.keypair.public().to_peer_id());
let relay_transport = relay_transport
.upgrade(libp2p::core::upgrade::Version::V1Lazy)
.authenticate(
libp2p::noise::Config::new(&self.keypair)
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(libp2p::yamux::Config::default())
.or_transport(transport);
let transport = relay_transport
.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed();
#[cfg(feature = "open-metrics")]
let metrics_recorder = if let Some(port) = self.metrics_server_port {
let metrics_recorder = NetworkMetricsRecorder::new(&mut metrics_registries);
let metadata_sub_reg = metrics_registries
.metadata
.sub_registry_with_prefix("sn_networking");
metadata_sub_reg.register(
"peer_id",
"Identifier of a peer of the network",
Info::new(vec![("peer_id".to_string(), peer_id.to_string())]),
);
metadata_sub_reg.register(
"identify_protocol_str",
"The protocol version string that is used to connect to the correct network",
Info::new(vec![(
"identify_protocol_str".to_string(),
IDENTIFY_PROTOCOL_STR.to_string(),
)]),
);
run_metrics_server(metrics_registries, port);
Some(metrics_recorder)
} else {
None
};
let request_response = {
let cfg = RequestResponseConfig::default()
.with_request_timeout(self.request_timeout.unwrap_or(REQUEST_TIMEOUT_DEFAULT_S));
info!(
"Building request response with {:?}",
REQ_RESPONSE_VERSION_STR.as_str()
);
request_response::cbor::Behaviour::new(
[(
StreamProtocol::new(&REQ_RESPONSE_VERSION_STR),
req_res_protocol,
)],
cfg,
)
};
let (network_event_sender, network_event_receiver) = mpsc::channel(NETWORKING_CHANNEL_SIZE);
let (network_swarm_cmd_sender, network_swarm_cmd_receiver) =
mpsc::channel(NETWORKING_CHANNEL_SIZE);
let (local_swarm_cmd_sender, local_swarm_cmd_receiver) =
mpsc::channel(NETWORKING_CHANNEL_SIZE);
let kademlia = {
match record_store_cfg {
Some(store_cfg) => {
let node_record_store = NodeRecordStore::with_config(
peer_id,
store_cfg,
network_event_sender.clone(),
local_swarm_cmd_sender.clone(),
);
#[cfg(feature = "open-metrics")]
let mut node_record_store = node_record_store;
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &metrics_recorder {
node_record_store = node_record_store
.set_record_count_metric(metrics_recorder.records_stored.clone());
}
let store = UnifiedRecordStore::Node(node_record_store);
debug!("Using Kademlia with NodeRecordStore!");
kad::Behaviour::with_config(peer_id, store, kad_cfg)
}
None => {
let store = UnifiedRecordStore::Client(ClientRecordStore::default());
debug!("Using Kademlia with ClientRecordStore!");
kad::Behaviour::with_config(peer_id, store, kad_cfg)
}
}
};
#[cfg(feature = "local")]
let mdns_config = mdns::Config {
query_interval: Duration::from_secs(5),
..Default::default()
};
#[cfg(feature = "local")]
let mdns = mdns::tokio::Behaviour::new(mdns_config, peer_id)?;
let identify_protocol_str = IDENTIFY_PROTOCOL_STR.to_string();
info!("Building Identify with identify_protocol_str: {identify_protocol_str:?} and identify_version: {identify_version:?}");
let identify = {
let mut cfg =
libp2p::identify::Config::new(identify_protocol_str, self.keypair.public())
.with_agent_version(identify_version);
cfg.interval = RESEND_IDENTIFY_INVERVAL;
libp2p::identify::Behaviour::new(cfg)
};
#[cfg(feature = "upnp")]
let upnp = if !self.local && !is_client && upnp {
debug!("Enabling UPnP port opening behavior");
Some(libp2p::upnp::tokio::Behaviour::default())
} else {
None
}
.into(); let relay_server = {
let relay_server_cfg = relay::Config {
max_reservations: 128, max_circuits: 1024, max_circuits_per_peer: 256, circuit_src_rate_limiters: vec![], max_circuit_bytes: MAX_PACKET_SIZE as u64,
..Default::default()
};
libp2p::relay::Behaviour::new(peer_id, relay_server_cfg)
};
let behaviour = NodeBehaviour {
blocklist: libp2p::allow_block_list::Behaviour::default(),
relay_client: relay_behaviour,
relay_server,
#[cfg(feature = "upnp")]
upnp,
request_response,
kademlia,
identify,
#[cfg(feature = "local")]
mdns,
};
#[cfg(not(target_arch = "wasm32"))]
let swarm_config = libp2p::swarm::Config::with_tokio_executor()
.with_idle_connection_timeout(CONNECTION_KEEP_ALIVE_TIMEOUT);
#[cfg(target_arch = "wasm32")]
let swarm_config = libp2p::swarm::Config::with_wasm_executor()
.with_idle_connection_timeout(CONNECTION_KEEP_ALIVE_TIMEOUT);
let swarm = Swarm::new(transport, behaviour, peer_id, swarm_config);
let bootstrap = ContinuousBootstrap::new();
let replication_fetcher = ReplicationFetcher::new(peer_id, network_event_sender.clone());
let mut relay_manager = RelayManager::new(peer_id);
if !is_client {
relay_manager.enable_hole_punching(self.is_behind_home_network);
}
let external_address_manager = ExternalAddressManager::new(peer_id);
let swarm_driver = SwarmDriver {
swarm,
self_peer_id: peer_id,
local: self.local,
is_client,
is_behind_home_network: self.is_behind_home_network,
#[cfg(feature = "open-metrics")]
close_group: Vec::with_capacity(CLOSE_GROUP_SIZE),
peers_in_rt: 0,
bootstrap,
relay_manager,
external_address_manager,
replication_fetcher,
#[cfg(feature = "open-metrics")]
metrics_recorder,
network_cmd_sender: network_swarm_cmd_sender.clone(),
network_cmd_receiver: network_swarm_cmd_receiver,
local_cmd_sender: local_swarm_cmd_sender.clone(),
local_cmd_receiver: local_swarm_cmd_receiver,
event_sender: network_event_sender,
pending_get_closest_peers: Default::default(),
pending_requests: Default::default(),
pending_get_record: Default::default(),
dialed_peers: CircularVec::new(255),
network_discovery: NetworkDiscovery::new(&peer_id),
bootstrap_peers: Default::default(),
live_connected_peers: Default::default(),
latest_established_connection_ids: Default::default(),
handling_statistics: Default::default(),
handled_times: 0,
hard_disk_write_error: 0,
bad_nodes: Default::default(),
quotes_history: Default::default(),
replication_targets: Default::default(),
last_replication: None,
last_connection_pruning_time: Instant::now(),
};
let network = Network::new(
network_swarm_cmd_sender,
local_swarm_cmd_sender,
peer_id,
self.keypair,
);
Ok((network, network_event_receiver, swarm_driver))
}
}
fn check_and_wipe_storage_dir_if_necessary(
root_dir: PathBuf,
storage_dir_path: PathBuf,
cur_version_str: String,
) -> Result<()> {
let mut prev_version_str = String::new();
let version_file = root_dir.join("network_key_version");
{
match fs::File::open(version_file.clone()) {
Ok(mut file) => {
file.read_to_string(&mut prev_version_str)?;
}
Err(err) => {
warn!("Failed in accessing version file {version_file:?}: {err:?}");
info!("Creating a new version file at {version_file:?}");
fs::File::create(version_file.clone())?;
}
}
}
if cur_version_str != prev_version_str {
warn!("Trying to wipe out storege dir {storage_dir_path:?}, as cur_version {cur_version_str:?} doesn't match prev_version {prev_version_str:?}");
let _ = fs::remove_dir_all(storage_dir_path);
let mut file = fs::OpenOptions::new()
.write(true)
.truncate(true)
.open(version_file.clone())?;
info!("Writing cur_version {cur_version_str:?} into version file at {version_file:?}");
file.write_all(cur_version_str.as_bytes())?;
}
Ok(())
}
pub struct SwarmDriver {
pub(crate) swarm: Swarm<NodeBehaviour>,
pub(crate) self_peer_id: PeerId,
pub(crate) local: bool,
pub(crate) is_client: bool,
pub(crate) is_behind_home_network: bool,
#[cfg(feature = "open-metrics")]
pub(crate) close_group: Vec<PeerId>,
pub(crate) peers_in_rt: usize,
pub(crate) bootstrap: ContinuousBootstrap,
pub(crate) external_address_manager: ExternalAddressManager,
pub(crate) relay_manager: RelayManager,
pub(crate) replication_fetcher: ReplicationFetcher,
#[cfg(feature = "open-metrics")]
pub(crate) metrics_recorder: Option<NetworkMetricsRecorder>,
network_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
pub(crate) local_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
local_cmd_receiver: mpsc::Receiver<LocalSwarmCmd>,
network_cmd_receiver: mpsc::Receiver<NetworkSwarmCmd>,
event_sender: mpsc::Sender<NetworkEvent>, pub(crate) pending_get_closest_peers: PendingGetClosest,
pub(crate) pending_requests:
HashMap<OutboundRequestId, Option<oneshot::Sender<Result<Response>>>>,
pub(crate) pending_get_record: PendingGetRecord,
pub(crate) dialed_peers: CircularVec<PeerId>,
pub(crate) network_discovery: NetworkDiscovery,
pub(crate) bootstrap_peers: BTreeMap<Option<u32>, HashSet<PeerId>>,
pub(crate) live_connected_peers: BTreeMap<ConnectionId, (PeerId, Instant)>,
pub(crate) latest_established_connection_ids: HashMap<usize, (IpAddr, Instant)>,
handling_statistics: BTreeMap<String, Vec<Duration>>,
handled_times: usize,
pub(crate) hard_disk_write_error: usize,
pub(crate) bad_nodes: BadNodes,
pub(crate) quotes_history: BTreeMap<PeerId, PaymentQuote>,
pub(crate) replication_targets: BTreeMap<PeerId, Instant>,
pub(crate) last_replication: Option<Instant>,
pub(crate) last_connection_pruning_time: Instant,
}
impl SwarmDriver {
pub async fn run(mut self) {
let mut bootstrap_interval = interval(BOOTSTRAP_INTERVAL);
let mut set_farthest_record_interval = interval(CLOSET_RECORD_CHECK_INTERVAL);
let mut relay_manager_reservation_interval = interval(RELAY_MANAGER_RESERVATION_INTERVAL);
let mut previous_incoming_connection_error_event = None;
loop {
tokio::select! {
biased;
local_cmd = self.local_cmd_receiver.recv() => match local_cmd {
Some(cmd) => {
let start = Instant::now();
let cmd_string = format!("{cmd:?}");
if let Err(err) = self.handle_local_cmd(cmd) {
warn!("Error while handling local cmd: {err}");
}
trace!("LocalCmd handled in {:?}: {cmd_string:?}", start.elapsed());
},
None => continue,
},
some_cmd = self.network_cmd_receiver.recv() => match some_cmd {
Some(cmd) => {
let start = Instant::now();
let cmd_string = format!("{cmd:?}");
if let Err(err) = self.handle_network_cmd(cmd) {
warn!("Error while handling cmd: {err}");
}
trace!("SwarmCmd handled in {:?}: {cmd_string:?}", start.elapsed());
},
None => continue,
},
swarm_event = self.swarm.select_next_some() => {
if let Some(previous_event) = previous_incoming_connection_error_event.take() {
if let Err(err) = self.handle_swarm_events(swarm_event) {
warn!("Error while handling swarm event: {err}");
}
if let Err(err) = self.handle_swarm_events(previous_event) {
warn!("Error while handling swarm event: {err}");
}
continue;
}
if matches!(swarm_event, SwarmEvent::IncomingConnectionError {..}) {
previous_incoming_connection_error_event = Some(swarm_event);
continue;
}
if let Err(err) = self.handle_swarm_events(swarm_event) {
warn!("Error while handling swarm event: {err}");
}
},
_ = bootstrap_interval.tick() => {
if let Some(new_interval) = self.run_bootstrap_continuously(bootstrap_interval.period()).await {
bootstrap_interval = new_interval;
}
}
_ = set_farthest_record_interval.tick() => {
if !self.is_client {
let closest_k_peers = self.get_closest_k_value_local_peers();
if let Some(distance) = self.get_responsbile_range_estimate(&closest_k_peers) {
info!("Set responsible range to {distance}");
self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance);
self.replication_fetcher.set_replication_distance_range(distance);
}
}
}
_ = relay_manager_reservation_interval.tick() => self.relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes),
}
}
}
fn get_responsbile_range_estimate(
&mut self,
closest_k_peers: &[PeerId],
) -> Option<u32> {
let mut farthest_distance = None;
if closest_k_peers.is_empty() {
return farthest_distance;
}
let our_address = NetworkAddress::from_peer(self.self_peer_id);
let target_index = std::cmp::min(K_VALUE.get() / 2, closest_k_peers.len()) - 1;
let address = NetworkAddress::from_peer(closest_k_peers[target_index]);
farthest_distance = our_address.distance(&address).ilog2();
farthest_distance
}
pub(crate) fn queue_network_swarm_cmd(&self, event: NetworkSwarmCmd) {
let event_sender = self.network_cmd_sender.clone();
let capacity = event_sender.capacity();
let _handle = spawn(async move {
if capacity == 0 {
warn!(
"NetworkSwarmCmd channel is full. Await capacity to send: {:?}",
event
);
}
if let Err(error) = event_sender.send(event).await {
error!("SwarmDriver failed to send event: {}", error);
}
});
}
pub(crate) fn send_event(&self, event: NetworkEvent) {
let event_sender = self.event_sender.clone();
let capacity = event_sender.capacity();
let _handle = spawn(async move {
if capacity == 0 {
warn!(
"NetworkEvent channel is full. Await capacity to send: {:?}",
event
);
}
if let Err(error) = event_sender.send(event).await {
error!("SwarmDriver failed to send event: {}", error);
}
});
}
pub(crate) fn get_closest_k_value_local_peers(&mut self) -> Vec<PeerId> {
let self_peer_id = self.self_peer_id.into();
let peers = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&self_peer_id)
.map(|key| key.into_preimage());
std::iter::once(self.self_peer_id)
.chain(peers)
.take(K_VALUE.get())
.collect()
}
pub(crate) fn dial(&mut self, mut addr: Multiaddr) -> Result<(), DialError> {
debug!(%addr, "Dialing manually");
let peer_id = multiaddr_pop_p2p(&mut addr);
let opts = match peer_id {
Some(peer_id) => DialOpts::peer_id(peer_id)
.condition(PeerCondition::NotDialing)
.addresses(vec![addr])
.build(),
None => DialOpts::unknown_peer_id().address(addr).build(),
};
self.swarm.dial(opts)
}
pub(crate) fn log_handling(&mut self, handle_string: String, handle_time: Duration) {
if handle_string.is_empty() {
return;
}
match self.handling_statistics.entry(handle_string) {
Entry::Occupied(mut entry) => {
let records = entry.get_mut();
records.push(handle_time);
}
Entry::Vacant(entry) => {
entry.insert(vec![handle_time]);
}
}
self.handled_times += 1;
if self.handled_times >= 100 {
self.handled_times = 0;
let mut stats: Vec<(String, usize, Duration)> = self
.handling_statistics
.iter()
.map(|(kind, durations)| {
let count = durations.len();
let avg_time = durations.iter().sum::<Duration>() / count as u32;
(kind.clone(), count, avg_time)
})
.collect();
stats.sort_by(|a, b| b.1.cmp(&a.1)); trace!("SwarmDriver Handling Statistics: {:?}", stats);
self.handling_statistics.clear();
}
}
pub(crate) fn record_metrics(&self, marker: Marker) {
marker.log();
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = self.metrics_recorder.as_ref() {
metrics_recorder.record_from_marker(marker)
}
}
#[cfg(feature = "open-metrics")]
pub(crate) fn record_change_in_close_group(&self, new_close_group: Vec<PeerId>) {
if let Some(metrics_recorder) = self.metrics_recorder.as_ref() {
metrics_recorder.record_change_in_close_group(new_close_group);
}
}
pub(crate) fn listen_on(&mut self, addr: Multiaddr) -> Result<()> {
let id = self.swarm.listen_on(addr.clone())?;
info!("Listening on {id:?} with addr: {addr:?}");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::check_and_wipe_storage_dir_if_necessary;
use std::{fs, io::Read};
#[tokio::test]
async fn version_file_update() {
let temp_dir = std::env::temp_dir();
let unique_dir_name = uuid::Uuid::new_v4().to_string();
let root_dir = temp_dir.join(unique_dir_name);
fs::create_dir_all(&root_dir).expect("Failed to create root directory");
let version_file = root_dir.join("network_key_version");
let storage_dir = root_dir.join("record_store");
let cur_version = uuid::Uuid::new_v4().to_string();
assert!(check_and_wipe_storage_dir_if_necessary(
root_dir.clone(),
storage_dir.clone(),
cur_version.clone()
)
.is_ok());
{
let mut content_str = String::new();
let mut file = fs::OpenOptions::new()
.read(true)
.open(version_file.clone())
.expect("Failed to open version file");
file.read_to_string(&mut content_str)
.expect("Failed to read from version file");
assert_eq!(content_str, cur_version);
drop(file);
}
fs::create_dir_all(&storage_dir).expect("Failed to create storage directory");
assert!(fs::metadata(storage_dir.clone()).is_ok());
let cur_version = uuid::Uuid::new_v4().to_string();
assert!(check_and_wipe_storage_dir_if_necessary(
root_dir.clone(),
storage_dir.clone(),
cur_version.clone()
)
.is_ok());
{
let mut content_str = String::new();
let mut file = fs::OpenOptions::new()
.read(true)
.open(version_file.clone())
.expect("Failed to open version file");
file.read_to_string(&mut content_str)
.expect("Failed to read from version file");
assert_eq!(content_str, cur_version);
drop(file);
}
assert!(fs::metadata(storage_dir.clone()).is_err());
}
}