#[macro_use]
extern crate tracing;
mod circular_vec;
mod cmd;
mod error;
mod event;
mod msg;
mod record_store;
mod replication_fetcher;
pub use self::{
cmd::SwarmLocalState,
error::Error,
event::{MsgResponder, NetworkEvent},
};
use self::{
circular_vec::CircularVec,
cmd::SwarmCmd,
error::Result,
event::NodeBehaviour,
record_store::{
DiskBackedRecordStore, DiskBackedRecordStoreConfig, REPLICATION_INTERVAL_LOWER_BOUND,
REPLICATION_INTERVAL_UPPER_BOUND,
},
replication_fetcher::ReplicationFetcher,
};
use futures::{future::select_all, StreamExt};
#[cfg(feature = "local-discovery")]
use libp2p::mdns;
use libp2p::{
identity::Keypair,
kad::{
KBucketDistance as Distance, KBucketKey, Kademlia, KademliaConfig, QueryId, Record,
RecordKey,
},
multiaddr::Protocol,
request_response::{self, Config as RequestResponseConfig, ProtocolSupport, RequestId},
swarm::{behaviour::toggle::Toggle, StreamProtocol, Swarm, SwarmBuilder},
Multiaddr, PeerId, Transport,
};
use rand::Rng;
use sn_protocol::{
messages::{Request, Response},
NetworkAddress,
};
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
num::NonZeroUsize,
path::PathBuf,
time::Duration,
};
use tokio::sync::{mpsc, oneshot};
use tracing::warn;
pub const CLOSE_GROUP_SIZE: usize = 8;
const REQUEST_TIMEOUT_DEFAULT_S: Duration = Duration::from_secs(30);
const CONNECTION_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(30);
pub const IDENTIFY_AGENT_STR: &str = "safe/node/";
const SN_NODE_VERSION_STR: &str = concat!("safe/node/", env!("CARGO_PKG_VERSION"));
const REQ_RESPONSE_VERSION_STR: &str = concat!("/safe/node/", env!("CARGO_PKG_VERSION"));
const IDENTIFY_CLIENT_VERSION_STR: &str = concat!("safe/client/", env!("CARGO_PKG_VERSION"));
const IDENTIFY_PROTOCOL_STR: &str = concat!("safe/", env!("CARGO_PKG_VERSION"));
const VERIFICATION_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
const VERIFICATION_ATTEMPTS: usize = 3;
const NETWORKING_CHANNEL_SIZE: usize = 10_000;
#[inline]
pub const fn close_group_majority() -> usize {
CLOSE_GROUP_SIZE / 2 + 1
}
type PendingGetClosest = HashMap<QueryId, (oneshot::Sender<HashSet<PeerId>>, HashSet<PeerId>)>;
pub struct SwarmDriver {
self_peer_id: PeerId,
swarm: Swarm<NodeBehaviour>,
cmd_receiver: mpsc::Receiver<SwarmCmd>,
event_sender: mpsc::Sender<NetworkEvent>,
pending_get_closest_peers: PendingGetClosest,
pending_requests: HashMap<RequestId, Option<oneshot::Sender<Result<Response>>>>,
pending_query: HashMap<QueryId, oneshot::Sender<Result<Record>>>,
replication_fetcher: ReplicationFetcher,
local: bool,
dialed_peers: CircularVec<PeerId>,
is_client: bool,
}
impl SwarmDriver {
pub fn new(
keypair: Keypair,
addr: SocketAddr,
local: bool,
root_dir: PathBuf,
) -> Result<(Network, mpsc::Receiver<NetworkEvent>, Self)> {
let replication_interval = rand::thread_rng()
.gen_range(REPLICATION_INTERVAL_LOWER_BOUND..REPLICATION_INTERVAL_UPPER_BOUND);
let mut kad_cfg = KademliaConfig::default();
let _ = kad_cfg
.set_kbucket_inserts(libp2p::kad::KademliaBucketInserts::Manual)
.set_replication_interval(None)
.set_publication_interval(None)
.set_max_packet_size(1024 * 1024)
.set_replication_factor(
NonZeroUsize::new(CLOSE_GROUP_SIZE).ok_or_else(|| Error::InvalidCloseGroupSize)?,
)
.set_query_timeout(Duration::from_secs(5 * 60))
.disjoint_query_paths(true)
.set_record_ttl(None)
.set_provider_publication_interval(None);
let (network, events_receiver, mut swarm_driver) = Self::with(
root_dir,
keypair,
kad_cfg,
local,
false,
replication_interval,
None,
ProtocolSupport::Full,
SN_NODE_VERSION_STR.to_string(),
)?;
let addr = Multiaddr::from(addr.ip()).with(Protocol::Tcp(addr.port()));
let _listener_id = swarm_driver
.swarm
.listen_on(addr)
.expect("Failed to listen on the provided address");
Ok((network, events_receiver, swarm_driver))
}
pub fn new_client(
local: bool,
request_timeout: Option<Duration>,
) -> Result<(Network, mpsc::Receiver<NetworkEvent>, Self)> {
let mut kad_cfg = KademliaConfig::default();
let _ = kad_cfg
.set_max_packet_size(1024 * 1024)
.disjoint_query_paths(true)
.set_replication_factor(
NonZeroUsize::new(CLOSE_GROUP_SIZE).ok_or_else(|| Error::InvalidCloseGroupSize)?,
);
Self::with(
std::env::temp_dir(),
Keypair::generate_ed25519(),
kad_cfg,
local,
true,
Duration::from_secs(1000),
request_timeout,
ProtocolSupport::Outbound,
IDENTIFY_CLIENT_VERSION_STR.to_string(),
)
}
fn send_event(&self, event: NetworkEvent) {
let event_sender = self.event_sender.clone();
let capacity = event_sender.capacity();
if capacity == 0 {
warn!(
"NetworkEvent channel is full. Dropping NetworkEvent: {:?}",
event
);
return;
}
let _handle = tokio::spawn(async move {
if let Err(error) = event_sender.send(event).await {
error!("SwarmDriver failed to send event: {}", error);
}
});
}
#[allow(clippy::too_many_arguments)]
fn with(
root_dir_path: PathBuf,
keypair: Keypair,
kad_cfg: KademliaConfig,
local: bool,
is_client: bool,
replication_interval: Duration,
request_response_timeout: Option<Duration>,
req_res_protocol: ProtocolSupport,
identify_version: String,
) -> Result<(Network, mpsc::Receiver<NetworkEvent>, Self)> {
let peer_id = PeerId::from(keypair.public());
info!("Node (PID: {}) with PeerId: {peer_id}", std::process::id());
info!("PeerId: {peer_id} has replication interval of {replication_interval:?}");
let request_response = {
let mut cfg = RequestResponseConfig::default();
let _ = cfg
.set_request_timeout(request_response_timeout.unwrap_or(REQUEST_TIMEOUT_DEFAULT_S))
.set_connection_keep_alive(CONNECTION_KEEP_ALIVE_TIMEOUT);
request_response::cbor::Behaviour::new(
[(
StreamProtocol::new(REQ_RESPONSE_VERSION_STR),
req_res_protocol,
)],
cfg,
)
};
let (network_event_sender, network_event_receiver) = mpsc::channel(NETWORKING_CHANNEL_SIZE);
let kademlia = {
let storage_dir_path = root_dir_path.join("record_store");
if let Err(error) = std::fs::create_dir_all(&storage_dir_path) {
return Err(Error::FailedToCreateRecordStoreDir {
path: storage_dir_path,
source: error,
});
}
let store_cfg = DiskBackedRecordStoreConfig {
max_value_bytes: 1024 * 1024,
storage_dir: storage_dir_path,
replication_interval,
..Default::default()
};
Kademlia::with_config(
peer_id,
DiskBackedRecordStore::with_config(
peer_id,
store_cfg,
Some(network_event_sender.clone()),
),
kad_cfg,
)
};
#[cfg(feature = "local-discovery")]
let mdns_config = mdns::Config {
query_interval: Duration::from_secs(5),
..Default::default()
};
#[cfg(feature = "local-discovery")]
let mdns = mdns::tokio::Behaviour::new(mdns_config, peer_id)?;
let identify = {
let cfg =
libp2p::identify::Config::new(IDENTIFY_PROTOCOL_STR.to_string(), keypair.public())
.with_agent_version(identify_version);
libp2p::identify::Behaviour::new(cfg)
};
let mut transport = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default())
.upgrade(libp2p::core::upgrade::Version::V1)
.authenticate(
libp2p::noise::Config::new(&keypair)
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(libp2p::yamux::Config::default())
.boxed();
if !local {
debug!("Preventing non-global dials");
transport = libp2p::core::transport::global_only::Transport::new(transport).boxed();
}
let autonat = if !local && !is_client {
let cfg = libp2p::autonat::Config {
boot_delay: Duration::from_secs(3),
timeout: Duration::from_secs(301),
throttle_server_period: Duration::from_secs(15),
..Default::default()
};
Some(libp2p::autonat::Behaviour::new(peer_id, cfg))
} else {
None
};
let autonat = Toggle::from(autonat);
let behaviour = NodeBehaviour {
request_response,
kademlia,
identify,
#[cfg(feature = "local-discovery")]
mdns,
autonat,
};
let swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, peer_id).build();
let (swarm_cmd_sender, swarm_cmd_receiver) = mpsc::channel(NETWORKING_CHANNEL_SIZE);
let swarm_driver = Self {
self_peer_id: peer_id,
swarm,
cmd_receiver: swarm_cmd_receiver,
event_sender: network_event_sender,
pending_get_closest_peers: Default::default(),
pending_requests: Default::default(),
pending_query: Default::default(),
replication_fetcher: Default::default(),
local,
dialed_peers: CircularVec::new(63),
is_client,
};
Ok((
Network {
swarm_cmd_sender,
peer_id,
root_dir_path,
},
network_event_receiver,
swarm_driver,
))
}
pub async fn run(mut self) {
loop {
tokio::select! {
swarm_event = self.swarm.select_next_some() => {
if let Err(err) = self.handle_swarm_events(swarm_event) {
warn!("Error while handling swarm event: {err}");
}
},
some_cmd = self.cmd_receiver.recv() => match some_cmd {
Some(cmd) => {
if let Err(err) = self.handle_cmd(cmd) {
warn!("Error while handling cmd: {err}");
}
},
None => continue,
},
}
}
}
}
pub fn sort_peers_by_address(
peers: Vec<PeerId>,
address: &NetworkAddress,
expected_entries: usize,
) -> Result<Vec<PeerId>> {
sort_peers_by_key(peers, &address.as_kbucket_key(), expected_entries)
}
pub fn sort_peers_by_key<T>(
mut peers: Vec<PeerId>,
key: &KBucketKey<T>,
expected_entries: usize,
) -> Result<Vec<PeerId>> {
peers.sort_by(|a, b| {
let a = NetworkAddress::from_peer(*a);
let b = NetworkAddress::from_peer(*b);
key.distance(&a.as_kbucket_key())
.cmp(&key.distance(&b.as_kbucket_key()))
});
let peers: Vec<PeerId> = peers.iter().take(expected_entries).cloned().collect();
if CLOSE_GROUP_SIZE > peers.len() {
warn!("Not enough peers in the k-bucket to satisfy the request");
return Err(Error::NotEnoughPeers {
found: peers.len(),
required: CLOSE_GROUP_SIZE,
});
}
Ok(peers)
}
#[derive(Clone)]
pub struct Network {
pub swarm_cmd_sender: mpsc::Sender<SwarmCmd>,
pub peer_id: PeerId,
pub root_dir_path: PathBuf,
}
impl Network {
pub async fn start_listening(&self, addr: Multiaddr) -> Result<()> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::StartListening { addr, sender })?;
receiver.await?
}
pub async fn dial(&self, addr: Multiaddr) -> Result<()> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::Dial { addr, sender })?;
receiver.await?
}
pub async fn client_get_closest_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
self.get_closest_peers(key, true).await
}
pub async fn node_get_closest_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
self.get_closest_peers(key, false).await
}
pub async fn get_closest_local_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetClosestLocalPeers {
key: key.clone(),
sender,
})?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub async fn get_all_local_peers(&self) -> Result<Vec<PeerId>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetAllLocalPeers { sender })?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub async fn node_send_to_closest(&self, request: &Request) -> Result<Vec<Result<Response>>> {
debug!(
"Sending {request:?} with dst {:?} to the closest peers.",
request.dst()
);
let closest_peers = self.node_get_closest_peers(&request.dst()).await?;
Ok(self
.send_and_get_responses(closest_peers, request, true)
.await)
}
pub async fn send_req_no_reply_to_closest(&self, request: &Request) -> Result<()> {
debug!(
"Sending {request:?} with dst {:?} to the closest peers.",
request.dst()
);
let closest_peers = self.node_get_closest_peers(&request.dst()).await?;
for peer in closest_peers {
self.send_req_ignore_reply(request.clone(), peer)?;
}
Ok(())
}
pub async fn send_req_no_reply_to_self_closest(&self, request: &Request) -> Result<()> {
debug!("Sending {request:?} to self closest peers.");
let closest_peers = self.client_get_closest_peers(&request.dst()).await?;
for peer in closest_peers {
self.send_req_ignore_reply(request.clone(), peer)?;
}
Ok(())
}
pub async fn client_send_to_closest(
&self,
request: &Request,
expect_all_responses: bool,
) -> Result<Vec<Result<Response>>> {
debug!(
"Sending {request:?} with dst {:?} to the closest peers.",
request.dst()
);
let closest_peers = self.client_get_closest_peers(&request.dst()).await?;
Ok(self
.send_and_get_responses(closest_peers, request, expect_all_responses)
.await)
}
pub async fn get_record_keys_closest_to_target(
&self,
target: &NetworkAddress,
distance: Distance,
) -> Result<Vec<RecordKey>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetRecordKeysClosestToTarget {
key: target.clone(),
distance,
sender,
})?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub async fn get_record_from_network(&self, key: RecordKey) -> Result<Record> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetNetworkRecord { key, sender })?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)?
}
pub async fn get_local_record(&self, key: &RecordKey) -> Result<Option<Record>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetLocalRecord {
key: key.clone(),
sender,
})?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub async fn put_record(&self, record: Record, verify_store: bool) -> Result<()> {
let the_record = record.clone();
debug!(
"Putting record of {:?} - length {:?} to network",
the_record.key,
record.value.len()
);
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::PutRecord { record, sender })?;
let response = receiver.await?;
if !verify_store {
return response;
}
let mut verification_attempts = 0;
let mut something_different_was_found = false;
while verification_attempts < VERIFICATION_ATTEMPTS {
something_different_was_found = false;
match self.get_record_from_network(the_record.key.clone()).await {
Ok(returned_record) => {
if the_record != returned_record {
something_different_was_found = true;
continue;
}
return Ok(());
}
Err(error) => {
verification_attempts += 1;
warn!(
"Did not retrieve Record '{:?}' from all nodes in the close group!. Retrying...", the_record.key
);
error!("{error:?}");
}
}
tokio::time::sleep(VERIFICATION_TIMEOUT).await;
}
if something_different_was_found {
Err(Error::ReturnedRecordDoesNotMatch(the_record.key))
} else {
Err(Error::FailedToVerifyRecordWasStored(the_record.key))
}
}
pub fn put_local_record(&self, record: Record) -> Result<()> {
debug!(
"Writing Record locally, for {:?} - length {:?}",
record.key,
record.value.len()
);
self.send_swarm_cmd(SwarmCmd::PutLocalRecord { record })
}
pub async fn is_key_present_locally(&self, key: &RecordKey) -> Result<bool> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::RecordStoreHasKey {
key: key.clone(),
sender,
})?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub async fn get_all_local_record_addresses(&self) -> Result<HashSet<NetworkAddress>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetAllLocalRecordAddresses { sender })?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub async fn add_keys_to_replication_fetcher(
&self,
peer: PeerId,
keys: Vec<NetworkAddress>,
) -> Result<Vec<(PeerId, NetworkAddress)>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::AddKeysToReplicationFetcher { peer, keys, sender })?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub async fn notify_fetch_result(
&self,
peer: PeerId,
key: NetworkAddress,
result: bool,
) -> Result<Vec<(PeerId, NetworkAddress)>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::NotifyFetchResult {
peer,
key,
result,
sender,
})?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub fn set_record_distance_range(&self, distance: Distance) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::SetRecordDistanceRange { distance })
}
pub async fn send_request(&self, req: Request, peer: PeerId) -> Result<Response> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::SendRequest {
req,
peer,
sender: Some(sender),
})?;
receiver.await?
}
pub fn send_req_ignore_reply(&self, req: Request, peer: PeerId) -> Result<()> {
let swarm_cmd = SwarmCmd::SendRequest {
req,
peer,
sender: None,
};
self.send_swarm_cmd(swarm_cmd)
}
pub fn send_response(&self, resp: Response, channel: MsgResponder) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::SendResponse { resp, channel })
}
pub async fn get_swarm_local_state(&self) -> Result<SwarmLocalState> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetSwarmLocalState(sender))?;
let state = receiver.await?;
Ok(state)
}
fn send_swarm_cmd(&self, cmd: SwarmCmd) -> Result<()> {
let capacity = self.swarm_cmd_sender.capacity();
if capacity == 0 {
error!("SwarmCmd channel is full. Dropping SwarmCmd: {:?}", cmd);
return Err(Error::NoSwarmCmdChannelCapacity);
}
let cmd_sender = self.swarm_cmd_sender.clone();
let _handle = tokio::spawn(async move {
if let Err(error) = cmd_sender.send(cmd).await {
error!("Failed to send SwarmCmd: {}", error);
}
});
Ok(())
}
async fn get_closest_peers(&self, key: &NetworkAddress, client: bool) -> Result<Vec<PeerId>> {
trace!("Getting the closest peers to {key:?}");
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetClosestPeers {
key: key.clone(),
sender,
})?;
let k_bucket_peers = receiver.await?;
let mut closest_peers: Vec<_> = k_bucket_peers.into_iter().collect();
if !client {
closest_peers.push(self.peer_id);
}
sort_peers_by_address(closest_peers, key, CLOSE_GROUP_SIZE)
}
pub async fn send_and_get_responses(
&self,
peers: Vec<PeerId>,
req: &Request,
get_all_responses: bool,
) -> Vec<Result<Response>> {
trace!("send_and_get_responses for {req:?}");
let mut list_of_futures = peers
.iter()
.map(|peer| Box::pin(self.send_request(req.clone(), *peer)))
.collect::<Vec<_>>();
let mut responses = Vec::new();
while !list_of_futures.is_empty() {
let (res, _, remaining_futures) = select_all(list_of_futures).await;
let res_string = match &res {
Ok(res) => format!("{res}"),
Err(err) => format!("{err:?}"),
};
trace!("Got response for the req: {req:?}, res: {res_string}");
if !get_all_responses && res.is_ok() {
return vec![res];
}
responses.push(res);
list_of_futures = remaining_futures;
}
trace!("got all responses for {req:?}");
responses
}
}
pub fn multiaddr_is_global(multiaddr: &Multiaddr) -> bool {
!multiaddr.iter().any(|addr| match addr {
Protocol::Ip4(ip) => {
ip.is_unspecified()
| ip.is_private()
| ip.is_loopback()
| ip.is_link_local()
| ip.is_documentation()
| ip.is_broadcast()
}
_ => false,
})
}
pub(crate) fn multiaddr_pop_p2p(multiaddr: &mut Multiaddr) -> Option<PeerId> {
if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
let _ = multiaddr.pop();
Some(peer_id)
} else {
None
}
}
pub(crate) fn multiaddr_strip_p2p(multiaddr: &Multiaddr) -> Multiaddr {
multiaddr
.iter()
.filter(|p| !matches!(p, Protocol::P2p(_)))
.collect()
}