#[cfg(feature = "open-metrics")]
use crate::metrics::NetworkMetrics;
#[cfg(feature = "open-metrics")]
use crate::metrics_service::run_metrics_server;
use crate::{
bootstrap::{ContinuousBootstrap, BOOTSTRAP_INTERVAL},
circular_vec::CircularVec,
cmd::SwarmCmd,
error::{Error, Result},
event::NetworkEvent,
event::NodeEvent,
get_record_handler::PendingGetRecord,
multiaddr_pop_p2p,
network_discovery::NetworkDiscovery,
record_store::{ClientRecordStore, NodeRecordStore, NodeRecordStoreConfig},
record_store_api::UnifiedRecordStore,
replication_fetcher::ReplicationFetcher,
Network, CLOSE_GROUP_SIZE,
};
use futures::StreamExt;
#[cfg(all(not(feature = "websockets"), not(target_arch = "wasm32")))]
use libp2p::core::muxing::StreamMuxerBox;
#[cfg(feature = "local-discovery")]
use libp2p::mdns;
#[cfg(all(not(feature = "websockets"), not(target_arch = "wasm32")))]
use libp2p::quic::{tokio::Transport as TokioTransport, Config as TransportConfig};
#[cfg(all(not(target_arch = "wasm32"), feature = "websockets"))]
use libp2p::websocket::WsConfig;
use crate::target_arch::{interval, spawn, Instant};
#[cfg(all(not(target_arch = "wasm32"), feature = "websockets"))]
use libp2p::tcp::{tokio::Transport as TokioTransport, Config as TransportConfig};
#[cfg(target_arch = "wasm32")]
use libp2p::websocket_websys::Transport as WebSocketTransport;
use libp2p::{
identity::Keypair,
kad::{self, QueryId, Quorum, Record, K_VALUE},
multiaddr::Protocol,
request_response::{self, Config as RequestResponseConfig, OutboundRequestId, ProtocolSupport},
swarm::{
behaviour::toggle::Toggle,
dial_opts::{DialOpts, PeerCondition},
ConnectionId, DialError, NetworkBehaviour, StreamProtocol, Swarm,
},
Multiaddr, PeerId, Transport,
};
#[cfg(feature = "open-metrics")]
use prometheus_client::registry::Registry;
use sn_protocol::{
messages::{ChunkProof, Nonce, Request, Response},
storage::RetryStrategy,
NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey,
};
use std::{
collections::{btree_map::Entry, BTreeMap, HashMap, HashSet},
fmt::Debug,
net::SocketAddr,
num::NonZeroUsize,
path::PathBuf,
};
use tiny_keccak::{Hasher, Sha3};
use tokio::sync::{mpsc, oneshot};
use tokio::time::Duration;
use tracing::warn;
pub(crate) enum PendingGetClosestType {
NetworkDiscovery,
FunctionCall(oneshot::Sender<Vec<PeerId>>),
}
type PendingGetClosest = HashMap<QueryId, (PendingGetClosestType, Vec<PeerId>)>;
const MAX_PACKET_SIZE: usize = 1024 * 1024 * 5; const REQUEST_TIMEOUT_DEFAULT_S: Duration = Duration::from_secs(30);
const CONNECTION_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(30);
const SN_NODE_VERSION_STR: &str = concat!("safe/node/", env!("CARGO_PKG_VERSION"));
const REQ_RESPONSE_VERSION_STR: &str = concat!("/safe/node/", env!("CARGO_PKG_VERSION"));
const IDENTIFY_CLIENT_VERSION_STR: &str = concat!("safe/client/", env!("CARGO_PKG_VERSION"));
const IDENTIFY_PROTOCOL_STR: &str = concat!("safe/", env!("CARGO_PKG_VERSION"));
const NETWORKING_CHANNEL_SIZE: usize = 10_000;
const KAD_QUERY_TIMEOUT_S: Duration = Duration::from_secs(10);
pub(crate) fn truncate_patch_version(full_str: &str) -> &str {
if full_str.matches('.').count() == 2 {
match full_str.rfind('.') {
Some(pos) => &full_str[..pos],
None => full_str,
}
} else {
full_str
}
}
#[derive(Clone)]
pub struct GetRecordCfg {
pub get_quorum: Quorum,
pub retry_strategy: Option<RetryStrategy>,
pub target_record: Option<Record>,
pub expected_holders: HashSet<PeerId>,
}
impl GetRecordCfg {
pub fn does_target_match(&self, record: &Record) -> bool {
self.target_record.as_ref().is_some_and(|t| t == record)
}
}
impl Debug for GetRecordCfg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut f = f.debug_struct("GetRecordCfg");
f.field("get_quorum", &self.get_quorum)
.field("retry_strategy", &self.retry_strategy);
match &self.target_record {
Some(record) => {
let pretty_key = PrettyPrintRecordKey::from(&record.key);
f.field("target_record", &pretty_key);
}
None => {
f.field("target_record", &"None");
}
};
f.field("expected_holders", &self.expected_holders).finish()
}
}
#[derive(Debug, Clone)]
pub struct PutRecordCfg {
pub put_quorum: Quorum,
pub retry_strategy: Option<RetryStrategy>,
pub use_put_record_to: Option<Vec<PeerId>>,
pub verification: Option<(VerificationKind, GetRecordCfg)>,
}
#[derive(Debug, Clone)]
pub enum VerificationKind {
Network,
ChunkProof {
expected_proof: ChunkProof,
nonce: Nonce,
},
}
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "NodeEvent")]
pub(super) struct NodeBehaviour {
pub(super) request_response: request_response::cbor::Behaviour<Request, Response>,
pub(super) kademlia: kad::Behaviour<UnifiedRecordStore>,
#[cfg(feature = "local-discovery")]
pub(super) mdns: mdns::tokio::Behaviour,
pub(super) identify: libp2p::identify::Behaviour,
pub(super) gossipsub: Toggle<libp2p::gossipsub::Behaviour>,
}
#[derive(Debug)]
pub struct NetworkBuilder {
keypair: Keypair,
local: bool,
root_dir: PathBuf,
listen_addr: Option<SocketAddr>,
enable_gossip: bool,
request_timeout: Option<Duration>,
concurrency_limit: Option<usize>,
#[cfg(feature = "open-metrics")]
metrics_registry: Option<Registry>,
#[cfg(feature = "open-metrics")]
metrics_server_port: u16,
}
impl NetworkBuilder {
pub fn new(keypair: Keypair, local: bool, root_dir: PathBuf) -> Self {
Self {
keypair,
local,
root_dir,
listen_addr: None,
enable_gossip: false,
request_timeout: None,
concurrency_limit: None,
#[cfg(feature = "open-metrics")]
metrics_registry: None,
#[cfg(feature = "open-metrics")]
metrics_server_port: 0,
}
}
pub fn listen_addr(&mut self, listen_addr: SocketAddr) {
self.listen_addr = Some(listen_addr);
}
pub fn enable_gossip(&mut self) {
self.enable_gossip = true;
}
pub fn request_timeout(&mut self, request_timeout: Duration) {
self.request_timeout = Some(request_timeout);
}
pub fn concurrency_limit(&mut self, concurrency_limit: usize) {
self.concurrency_limit = Some(concurrency_limit);
}
#[cfg(feature = "open-metrics")]
pub fn metrics_registry(&mut self, metrics_registry: Registry) {
self.metrics_registry = Some(metrics_registry);
}
#[cfg(feature = "open-metrics")]
pub fn metrics_server_port(&mut self, port: u16) {
self.metrics_server_port = port;
}
pub fn build_node(self) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
let mut kad_cfg = kad::Config::default();
let _ = kad_cfg
.set_kbucket_inserts(libp2p::kad::BucketInserts::Manual)
.set_replication_interval(None)
.set_publication_interval(None)
.set_max_packet_size(MAX_PACKET_SIZE)
.set_replication_factor(
NonZeroUsize::new(CLOSE_GROUP_SIZE).ok_or_else(|| Error::InvalidCloseGroupSize)?,
)
.set_query_timeout(KAD_QUERY_TIMEOUT_S)
.disjoint_query_paths(true)
.set_record_ttl(None)
.set_provider_publication_interval(None);
let store_cfg = {
let storage_dir_path = self.root_dir.join("record_store");
if let Err(error) = std::fs::create_dir_all(&storage_dir_path) {
return Err(Error::FailedToCreateRecordStoreDir {
path: storage_dir_path,
source: error,
});
}
NodeRecordStoreConfig {
max_value_bytes: MAX_PACKET_SIZE, storage_dir: storage_dir_path,
..Default::default()
}
};
let listen_addr = self.listen_addr;
let (network, events_receiver, mut swarm_driver) = self.build(
kad_cfg,
Some(store_cfg),
false,
ProtocolSupport::Full,
truncate_patch_version(SN_NODE_VERSION_STR).to_string(),
)?;
let listen_socket_addr = listen_addr.ok_or(Error::ListenAddressNotProvided)?;
let start_addr = Multiaddr::from(listen_socket_addr.ip());
let listen_addr = if cfg!(any(feature = "websockets", target_arch = "wasm32")) {
start_addr
.with(Protocol::Tcp(listen_socket_addr.port()))
.with(Protocol::Ws("/".into()))
} else {
start_addr
.with(Protocol::Udp(listen_socket_addr.port()))
.with(Protocol::QuicV1)
};
debug!("Attempting to listen on: {listen_addr:?}");
let _listener_id = swarm_driver
.swarm
.listen_on(listen_addr)
.expect("Failed to listen on the provided address");
Ok((network, events_receiver, swarm_driver))
}
pub fn build_client(self) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
let mut kad_cfg = kad::Config::default(); let _ = kad_cfg
.set_max_packet_size(MAX_PACKET_SIZE)
.disjoint_query_paths(true)
.set_replication_factor(
NonZeroUsize::new(CLOSE_GROUP_SIZE).ok_or_else(|| Error::InvalidCloseGroupSize)?,
);
let (network, net_event_recv, driver) = self.build(
kad_cfg,
None,
true,
ProtocolSupport::Outbound,
truncate_patch_version(IDENTIFY_CLIENT_VERSION_STR).to_string(),
)?;
Ok((network, net_event_recv, driver))
}
fn build(
self,
kad_cfg: kad::Config,
record_store_cfg: Option<NodeRecordStoreConfig>,
is_client: bool,
req_res_protocol: ProtocolSupport,
identify_version: String,
) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
let peer_id = PeerId::from(self.keypair.public());
#[cfg(not(target_arch = "wasm32"))]
info!(
"Process (PID: {}) with PeerId: {peer_id}",
std::process::id()
);
info!(
"Self PeerID {peer_id} is represented as kbucket_key {:?}",
PrettyPrintKBucketKey(NetworkAddress::from_peer(peer_id).as_kbucket_key())
);
#[cfg(feature = "open-metrics")]
let network_metrics = {
let mut metrics_registry = self.metrics_registry.unwrap_or_default();
let metrics = NetworkMetrics::new(&mut metrics_registry);
run_metrics_server(metrics_registry, self.metrics_server_port);
metrics
};
let request_response = {
let cfg = RequestResponseConfig::default()
.with_request_timeout(self.request_timeout.unwrap_or(REQUEST_TIMEOUT_DEFAULT_S));
request_response::cbor::Behaviour::new(
[(
StreamProtocol::new(truncate_patch_version(REQ_RESPONSE_VERSION_STR)),
req_res_protocol,
)],
cfg,
)
};
let (network_event_sender, network_event_receiver) = mpsc::channel(NETWORKING_CHANNEL_SIZE);
let (swarm_cmd_sender, swarm_cmd_receiver) = mpsc::channel(NETWORKING_CHANNEL_SIZE);
let kademlia = {
match record_store_cfg {
Some(store_cfg) => {
let node_record_store = NodeRecordStore::with_config(
peer_id,
store_cfg,
network_event_sender.clone(),
swarm_cmd_sender.clone(),
);
#[cfg(feature = "open-metrics")]
let node_record_store = node_record_store
.set_record_count_metric(network_metrics.records_stored.clone());
let store = UnifiedRecordStore::Node(node_record_store);
debug!("Using Kademlia with NodeRecordStore!");
kad::Behaviour::with_config(peer_id, store, kad_cfg)
}
None => {
let store = UnifiedRecordStore::Client(ClientRecordStore::default());
debug!("Using Kademlia with ClientRecordStore!");
kad::Behaviour::with_config(peer_id, store, kad_cfg)
}
}
};
#[cfg(feature = "local-discovery")]
let mdns_config = mdns::Config {
query_interval: Duration::from_secs(5),
..Default::default()
};
#[cfg(feature = "local-discovery")]
let mdns = mdns::tokio::Behaviour::new(mdns_config, peer_id)?;
let identify = {
let cfg = libp2p::identify::Config::new(
truncate_patch_version(IDENTIFY_PROTOCOL_STR).to_string(),
self.keypair.public(),
)
.with_agent_version(identify_version);
libp2p::identify::Behaviour::new(cfg)
};
#[cfg(all(not(feature = "websockets"), not(target_arch = "wasm32")))]
let main_transport = TokioTransport::new(TransportConfig::new(&self.keypair))
.map(|(peer_id, muxer), _| (peer_id, StreamMuxerBox::new(muxer)))
.boxed();
#[cfg(target_arch = "wasm32")]
let main_transport = WebSocketTransport::default()
.upgrade(libp2p::core::upgrade::Version::V1)
.authenticate(
libp2p::noise::Config::new(&self.keypair)
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(libp2p::yamux::Config::default())
.boxed();
#[cfg(all(not(target_arch = "wasm32"), feature = "websockets"))]
let tcp = TokioTransport::new(TransportConfig::default());
#[cfg(all(not(target_arch = "wasm32"), feature = "websockets"))]
let main_transport = WsConfig::new(tcp)
.upgrade(libp2p::core::upgrade::Version::V1)
.authenticate(
libp2p::noise::Config::new(&self.keypair)
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(libp2p::yamux::Config::default())
.boxed();
let gossipsub = if self.enable_gossip {
let gossipsub_config = libp2p::gossipsub::ConfigBuilder::default()
.validation_mode(libp2p::gossipsub::ValidationMode::Permissive)
.message_id_fn(|msg| {
let mut sha3 = Sha3::v256();
let mut msg_id = [0; 32];
sha3.update(&msg.data);
sha3.finalize(&mut msg_id);
msg_id.into()
})
.heartbeat_interval(Duration::from_secs(5))
.iwant_followup_time(Duration::from_secs(10))
.published_message_ids_cache_time(Duration::from_secs(60))
.build()
.map_err(|err| Error::GossipsubConfigError(err.to_string()))?;
let message_authenticity = libp2p::gossipsub::MessageAuthenticity::Anonymous;
let gossipsub: libp2p::gossipsub::Behaviour =
libp2p::gossipsub::Behaviour::new(message_authenticity, gossipsub_config)
.expect("Failed to instantiate Gossipsub behaviour.");
Some(gossipsub)
} else {
None
};
let gossipsub = Toggle::from(gossipsub);
let transport = if !self.local {
debug!("Preventing non-global dials");
libp2p::core::transport::global_only::Transport::new(main_transport).boxed()
} else {
main_transport
};
let behaviour = NodeBehaviour {
request_response,
kademlia,
identify,
#[cfg(feature = "local-discovery")]
mdns,
gossipsub,
};
#[cfg(not(target_arch = "wasm32"))]
let swarm_config = libp2p::swarm::Config::with_tokio_executor()
.with_idle_connection_timeout(CONNECTION_KEEP_ALIVE_TIMEOUT);
#[cfg(target_arch = "wasm32")]
let swarm_config = libp2p::swarm::Config::with_wasm_executor()
.with_idle_connection_timeout(CONNECTION_KEEP_ALIVE_TIMEOUT);
let swarm = Swarm::new(transport, behaviour, peer_id, swarm_config);
let bootstrap = ContinuousBootstrap::new();
let replication_fetcher = ReplicationFetcher::new(peer_id);
let swarm_driver = SwarmDriver {
swarm,
self_peer_id: peer_id,
local: self.local,
is_client,
connected_peers: 0,
bootstrap,
close_group: Default::default(),
replication_fetcher,
#[cfg(feature = "open-metrics")]
network_metrics,
cmd_receiver: swarm_cmd_receiver,
event_sender: network_event_sender,
pending_get_closest_peers: Default::default(),
pending_requests: Default::default(),
pending_get_record: Default::default(),
dialed_peers: CircularVec::new(255),
is_gossip_handler: false,
network_discovery: NetworkDiscovery::new(&peer_id),
bootstrap_peers: Default::default(),
live_connected_peers: Default::default(),
handling_statistics: Default::default(),
handled_times: 0,
};
Ok((
Network {
swarm_cmd_sender,
peer_id,
root_dir_path: self.root_dir,
keypair: self.keypair,
},
network_event_receiver,
swarm_driver,
))
}
}
pub struct SwarmDriver {
pub(crate) swarm: Swarm<NodeBehaviour>,
pub(crate) self_peer_id: PeerId,
pub(crate) local: bool,
pub(crate) is_client: bool,
pub(crate) connected_peers: usize,
pub(crate) bootstrap: ContinuousBootstrap,
pub(crate) close_group: Vec<PeerId>,
pub(crate) replication_fetcher: ReplicationFetcher,
#[cfg(feature = "open-metrics")]
#[allow(unused)]
pub(crate) network_metrics: NetworkMetrics,
cmd_receiver: mpsc::Receiver<SwarmCmd>,
event_sender: mpsc::Sender<NetworkEvent>, pub(crate) pending_get_closest_peers: PendingGetClosest,
pub(crate) pending_requests:
HashMap<OutboundRequestId, Option<oneshot::Sender<Result<Response>>>>,
pub(crate) pending_get_record: PendingGetRecord,
pub(crate) dialed_peers: CircularVec<PeerId>,
pub(crate) is_gossip_handler: bool,
pub(crate) network_discovery: NetworkDiscovery,
pub(crate) bootstrap_peers: BTreeMap<Option<u32>, HashSet<PeerId>>,
pub(crate) live_connected_peers: BTreeMap<ConnectionId, (PeerId, Instant)>,
handling_statistics: BTreeMap<String, Vec<Duration>>,
handled_times: usize,
}
impl SwarmDriver {
pub async fn run(mut self) {
let mut bootstrap_interval = interval(BOOTSTRAP_INTERVAL);
loop {
tokio::select! {
swarm_event = self.swarm.select_next_some() => {
if let Err(err) = self.handle_swarm_events(swarm_event) {
warn!("Error while handling swarm event: {err}");
}
},
some_cmd = self.cmd_receiver.recv() => match some_cmd {
Some(cmd) => {
let start = Instant::now();
let cmd_string = format!("{cmd:?}");
if let Err(err) = self.handle_cmd(cmd) {
warn!("Error while handling cmd: {err}");
}
trace!("SwarmCmd handled in {:?}: {cmd_string:?}", start.elapsed());
},
None => continue,
},
_ = bootstrap_interval.tick() => {
if let Some(new_interval) = self.run_bootstrap_continuously(bootstrap_interval.period()).await {
bootstrap_interval = new_interval;
}
}
}
}
}
pub(crate) fn send_event(&self, event: NetworkEvent) {
let event_sender = self.event_sender.clone();
let capacity = event_sender.capacity();
let _handle = spawn(async move {
if capacity == 0 {
warn!(
"NetworkEvent channel is full. Await capacity to send: {:?}",
event
);
}
if let Err(error) = event_sender.send(event).await {
error!("SwarmDriver failed to send event: {}", error);
}
});
}
pub(crate) fn get_all_local_peers(&mut self) -> Vec<PeerId> {
let mut all_peers: Vec<PeerId> = vec![];
for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
for entry in kbucket.iter() {
all_peers.push(entry.node.key.clone().into_preimage());
}
}
all_peers.push(self.self_peer_id);
all_peers
}
pub(crate) fn get_closest_k_value_local_peers(&mut self) -> Vec<PeerId> {
let self_peer_id = self.self_peer_id.into();
let peers = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&self_peer_id)
.map(|key| key.into_preimage());
std::iter::once(self.self_peer_id)
.chain(peers)
.take(K_VALUE.get())
.collect()
}
pub(crate) fn dial(&mut self, mut addr: Multiaddr) -> Result<(), DialError> {
trace!(%addr, "Dialing manually");
let peer_id = multiaddr_pop_p2p(&mut addr);
let opts = match peer_id {
Some(peer_id) => DialOpts::peer_id(peer_id)
.condition(PeerCondition::NotDialing)
.addresses(vec![addr])
.build(),
None => DialOpts::unknown_peer_id().address(addr).build(),
};
self.swarm.dial(opts)
}
pub(crate) fn dial_with_opts(&mut self, opts: DialOpts) -> Result<(), DialError> {
trace!(?opts, "Dialing manually");
self.swarm.dial(opts)
}
pub(crate) fn log_handling(&mut self, handle_string: String, handle_time: Duration) {
if handle_string.is_empty() {
return;
}
match self.handling_statistics.entry(handle_string) {
Entry::Occupied(mut entry) => {
let records = entry.get_mut();
if records.len() == 10 {
let _ = records.pop();
} else {
records.push(handle_time);
}
}
Entry::Vacant(entry) => {
entry.insert(vec![handle_time]);
}
}
self.handled_times += 1;
if self.handled_times >= 100 {
self.handled_times = 0;
let kinds: Vec<String> = self.handling_statistics.keys().cloned().collect();
let avg_times: Vec<Duration> = self
.handling_statistics
.values()
.map(|durations| {
let sum: Duration = durations.iter().sum();
sum / durations.len() as u32
})
.collect();
trace!("SwarmDriver Handling Statistics have kinds: {kinds:?}");
trace!("SwarmDriver Handling Statistics have ave_times: {avg_times:?}");
}
}
}