#[macro_use]
extern crate tracing;
mod bootstrap;
mod circular_vec;
mod cmd;
mod driver;
mod error;
mod event;
#[cfg(feature = "open-metrics")]
mod metrics;
#[cfg(feature = "open-metrics")]
mod metrics_service;
mod network_discovery;
mod record_store;
mod record_store_api;
mod replication_fetcher;
mod transfers;
use self::{cmd::SwarmCmd, error::Result};
pub use self::{
cmd::SwarmLocalState,
driver::{GetRecordCfg, NetworkBuilder, PutRecordCfg, SwarmDriver},
error::{Error, GetRecordError},
event::{MsgResponder, NetworkEvent},
record_store::NodeRecordStore,
transfers::get_singed_spends_from_record,
};
use bytes::Bytes;
use futures::future::select_all;
use libp2p::{
identity::Keypair,
kad::{KBucketDistance, KBucketKey, Quorum, Record, RecordKey},
multiaddr::Protocol,
Multiaddr, PeerId,
};
use rand::Rng;
use sn_protocol::{
error::Error as ProtocolError,
messages::{Query, QueryResponse, Request, Response},
storage::{RecordHeader, RecordKind, RecordType},
NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey,
};
use sn_transfers::{MainPubkey, NanoTokens, PaymentQuote};
use std::{
collections::{BTreeMap, HashMap, HashSet},
path::PathBuf,
};
use tokio::sync::{mpsc, oneshot};
use tracing::warn;
pub const CLOSE_GROUP_SIZE: usize = 5;
pub const REPLICATE_RANGE: usize = CLOSE_GROUP_SIZE + 2;
#[inline]
pub const fn close_group_majority() -> usize {
CLOSE_GROUP_SIZE / 2 + 1
}
const MAX_REVERIFICATION_WAIT_TIME_S: std::time::Duration = std::time::Duration::from_millis(750);
const MIN_REVERIFICATION_WAIT_TIME_S: std::time::Duration = std::time::Duration::from_millis(150);
const GET_RETRY_ATTEMPTS: usize = 3;
const PUT_RETRY_ATTEMPTS: usize = 10;
#[allow(clippy::result_large_err)]
pub fn sort_peers_by_address<'a>(
peers: &'a HashSet<PeerId>,
address: &NetworkAddress,
expected_entries: usize,
) -> Result<Vec<&'a PeerId>> {
sort_peers_by_key(peers, &address.as_kbucket_key(), expected_entries)
}
#[allow(clippy::result_large_err)]
pub fn sort_peers_by_key<'a, T>(
peers: &'a HashSet<PeerId>,
key: &KBucketKey<T>,
expected_entries: usize,
) -> Result<Vec<&'a PeerId>> {
if CLOSE_GROUP_SIZE > peers.len() {
warn!("Not enough peers in the k-bucket to satisfy the request");
return Err(Error::NotEnoughPeers {
found: peers.len(),
required: CLOSE_GROUP_SIZE,
});
}
let mut peer_distances: Vec<(&PeerId, KBucketDistance)> = Vec::with_capacity(peers.len());
for peer_id in peers {
let addr = NetworkAddress::from_peer(*peer_id);
let distance = key.distance(&addr.as_kbucket_key());
peer_distances.push((peer_id, distance));
}
peer_distances.sort_by(|a, b| a.1.cmp(&b.1));
let sorted_peers: Vec<_> = peer_distances
.into_iter()
.take(expected_entries)
.map(|(peer_id, _)| peer_id)
.collect();
Ok(sorted_peers)
}
#[derive(Clone)]
pub struct Network {
pub swarm_cmd_sender: mpsc::Sender<SwarmCmd>,
pub peer_id: PeerId,
pub root_dir_path: PathBuf,
keypair: Keypair,
}
impl Network {
pub fn sign(&self, msg: &[u8]) -> Result<Vec<u8>> {
self.keypair.sign(msg).map_err(Error::from)
}
pub fn verify(&self, msg: &[u8], sig: &[u8]) -> bool {
self.keypair.public().verify(msg, sig)
}
pub async fn dial(&self, addr: Multiaddr) -> Result<()> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::Dial { addr, sender })?;
receiver.await?
}
pub async fn client_get_closest_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
self.get_closest_peers(key, true).await
}
pub async fn node_get_closest_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
self.get_closest_peers(key, false).await
}
pub async fn get_kbuckets(&self) -> Result<BTreeMap<u32, Vec<PeerId>>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetKBuckets { sender })?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub async fn get_close_group_local_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetCloseGroupLocalPeers {
key: key.clone(),
sender,
})?;
match receiver.await {
Ok(close_peers) => {
if tracing::level_enabled!(tracing::Level::TRACE) {
let close_peers_pretty_print: Vec<_> = close_peers
.iter()
.map(|peer_id| {
format!(
"{peer_id:?}({:?})",
PrettyPrintKBucketKey(
NetworkAddress::from_peer(*peer_id).as_kbucket_key()
)
)
})
.collect();
trace!(
"Local knowledge of close peers to {key:?} are: {close_peers_pretty_print:?}"
);
}
Ok(close_peers)
}
Err(err) => {
error!("When getting local knowledge of close peers to {key:?}, failed with error {err:?}");
Err(Error::InternalMsgChannelDropped)
}
}
}
pub async fn get_all_local_peers(&self) -> Result<Vec<PeerId>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetAllLocalPeers { sender })?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub async fn get_closest_k_value_local_peers(&self) -> Result<HashSet<PeerId>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetClosestKLocalPeers { sender })?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub async fn get_store_costs_from_network(
&self,
record_address: NetworkAddress,
) -> Result<(MainPubkey, PaymentQuote)> {
let mut close_nodes = self.get_closest_peers(&record_address, true).await?;
close_nodes.sort_by(|a, b| {
let a = NetworkAddress::from_peer(*a);
let b = NetworkAddress::from_peer(*b);
record_address
.distance(&a)
.cmp(&record_address.distance(&b))
});
close_nodes.truncate(close_group_majority());
let request = Request::Query(Query::GetStoreCost(record_address.clone()));
let responses = self
.send_and_get_responses(close_nodes, &request, true)
.await;
let mut all_costs = vec![];
for response in responses.into_iter().flatten() {
debug!(
"StoreCostReq for {record_address:?} received response: {:?}",
response
);
match response {
Response::Query(QueryResponse::GetStoreCost {
quote: Ok(quote),
payment_address,
}) => {
all_costs.push((payment_address, quote));
}
Response::Query(QueryResponse::GetStoreCost {
quote: Err(ProtocolError::RecordExists(_)),
payment_address,
}) => {
all_costs.push((payment_address, PaymentQuote::zero()));
}
_ => {
error!("Non store cost response received, was {:?}", response);
}
}
}
get_fees_from_store_cost_responses(all_costs)
}
pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::GossipsubSubscribe(topic_id))?;
Ok(())
}
pub fn unsubscribe_from_topic(&self, topic_id: String) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::GossipsubUnsubscribe(topic_id))?;
Ok(())
}
pub fn publish_on_topic(&self, topic_id: String, msg: Bytes) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::GossipsubPublish { topic_id, msg })?;
Ok(())
}
pub async fn get_record_from_network(
&self,
key: RecordKey,
cfg: &GetRecordCfg,
) -> Result<Record> {
let total_attempts = if cfg.re_attempt {
GET_RETRY_ATTEMPTS
} else {
1
};
let mut retry_attempts = 0;
let pretty_key = PrettyPrintRecordKey::from(&key);
while retry_attempts < total_attempts {
retry_attempts += 1;
info!(
"Getting record of {pretty_key:?} attempts {retry_attempts:?}/{total_attempts:?} with cfg {cfg:?}",
);
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetNetworkRecord {
key: key.clone(),
sender,
quorum: cfg.get_quorum,
expected_holders: cfg.expected_holders.clone(),
})?;
match receiver.await.map_err(|e| {
error!("When fetching record {pretty_key:?} , encountered a channel error {e:?}.");
Error::InternalMsgChannelDropped
})? {
Ok(returned_record) => {
let header = RecordHeader::from_record(&returned_record)?;
let is_chunk = matches!(header.kind, RecordKind::Chunk);
info!("Record returned: {pretty_key:?}",);
if cfg.target_record.is_none()
|| (cfg.target_record.is_some()
&& (cfg.target_record == Some(returned_record.clone()) || is_chunk))
{
return Ok(returned_record);
} else if retry_attempts >= total_attempts {
info!("Error: Returned record does not match target");
return Err(Error::ReturnedRecordDoesNotMatch(
PrettyPrintRecordKey::from(&returned_record.key).into_owned(),
));
}
}
Err(GetRecordError::RecordNotEnoughCopies(returned_record)) => {
debug!("Not enough copies found yet for {pretty_key:?}");
if retry_attempts >= total_attempts && matches!(cfg.get_quorum, Quorum::One) {
if cfg.target_record.is_none()
|| (cfg.target_record.is_some()
&& cfg.target_record == Some(returned_record.clone()))
{
return Ok(returned_record);
} else {
return Err(Error::ReturnedRecordDoesNotMatch(
PrettyPrintRecordKey::from(&returned_record.key).into_owned(),
));
}
} else if retry_attempts >= total_attempts {
return Err(GetRecordError::RecordNotEnoughCopies(returned_record).into());
}
}
Err(GetRecordError::RecordNotFound) => {
if retry_attempts >= total_attempts {
break;
}
warn!("No holder of record '{pretty_key:?}' found. Retrying the fetch ...",);
}
Err(GetRecordError::SplitRecord { result_map }) => {
error!("Getting record {pretty_key:?} attempts #{retry_attempts}/{total_attempts} , encountered split");
if retry_attempts >= total_attempts {
return Err(GetRecordError::SplitRecord { result_map }.into());
}
warn!("Fetched split Record '{pretty_key:?}' from network!. Retrying...",);
}
Err(GetRecordError::QueryTimeout) => {
error!("Getting record {pretty_key:?} attempts #{retry_attempts}/{total_attempts} , encountered Timeout");
if retry_attempts >= total_attempts {
break;
}
warn!("Did not retrieve Record '{pretty_key:?}' from network!. Retrying...",);
}
}
if cfg.re_attempt {
let wait_duration = rand::thread_rng()
.gen_range(MIN_REVERIFICATION_WAIT_TIME_S..MAX_REVERIFICATION_WAIT_TIME_S);
tokio::time::sleep(wait_duration).await;
}
}
Err(GetRecordError::RecordNotFound.into())
}
pub async fn get_local_storecost(&self) -> Result<NanoTokens> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetLocalStoreCost { sender })?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub async fn get_local_record(&self, key: &RecordKey) -> Result<Option<Record>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetLocalRecord {
key: key.clone(),
sender,
})?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
let pretty_key = PrettyPrintRecordKey::from(&record.key);
let mut last_err = Error::FailedToVerifyRecordWasStored(pretty_key.clone().into_owned());
let total_attempts = if cfg.re_attempt {
PUT_RETRY_ATTEMPTS
} else {
1
};
for retry in 1..total_attempts + 1 {
info!(
"Attempting to PUT record with key: {pretty_key:?} to network. Attempts {retry:?}/{total_attempts:?} with cfg {cfg:?}"
);
let res = self.put_record_once(record.clone(), cfg).await;
match res {
Ok(_) => return Ok(()),
Err(e) => {
warn!("Failed to PUT record with key: {pretty_key:?} to network. Attempts {retry:?}/{total_attempts:?} with error: {e:?}");
last_err = e;
}
}
}
Err(last_err)
}
async fn put_record_once(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
let record_key = record.key.clone();
let pretty_key = PrettyPrintRecordKey::from(&record_key);
info!(
"Putting record of {} - length {:?} to network",
pretty_key,
record.value.len()
);
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::PutRecord {
record: record.clone(),
sender,
quorum: cfg.put_quorum,
})?;
let response = receiver.await?;
if let Some((_record_kind, get_cfg)) = &cfg.verification {
let wait_duration = rand::thread_rng()
.gen_range(MIN_REVERIFICATION_WAIT_TIME_S..MAX_REVERIFICATION_WAIT_TIME_S);
tokio::time::sleep(wait_duration).await;
debug!("Attempting to verify {pretty_key:?} after we've slept for {wait_duration:?}");
self.get_record_from_network(record.key.clone(), get_cfg)
.await
.map_err(|e| {
warn!("Failed to verify record {pretty_key:?} was stored: {e:?}");
Error::FailedToVerifyRecordWasStored(pretty_key.clone().into_owned())
})?;
}
response
}
pub fn put_local_record(&self, record: Record) -> Result<()> {
trace!(
"Writing Record locally, for {:?} - length {:?}",
PrettyPrintRecordKey::from(&record.key),
record.value.len()
);
self.send_swarm_cmd(SwarmCmd::PutLocalRecord { record })
}
pub fn remove_failed_local_record(&self, key: RecordKey) -> Result<()> {
trace!("Removing Record locally, for {:?}", key);
self.send_swarm_cmd(SwarmCmd::RemoveFailedLocalRecord { key })
}
pub async fn is_record_key_present_locally(&self, key: &RecordKey) -> Result<bool> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::RecordStoreHasKey {
key: key.clone(),
sender,
})?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
pub async fn get_all_local_record_addresses(
&self,
) -> Result<HashMap<NetworkAddress, RecordType>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetAllLocalRecordAddresses { sender })?;
receiver
.await
.map_err(|_e| Error::InternalMsgChannelDropped)
}
#[allow(clippy::mutable_key_type)] pub fn add_keys_to_replication_fetcher(
&self,
holder: PeerId,
keys: HashMap<NetworkAddress, RecordType>,
) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::AddKeysToReplicationFetcher { holder, keys })
}
pub async fn send_request(&self, req: Request, peer: PeerId) -> Result<Response> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::SendRequest {
req,
peer,
sender: Some(sender),
})?;
receiver.await?
}
pub fn send_req_ignore_reply(&self, req: Request, peer: PeerId) -> Result<()> {
let swarm_cmd = SwarmCmd::SendRequest {
req,
peer,
sender: None,
};
self.send_swarm_cmd(swarm_cmd)
}
pub fn send_response(&self, resp: Response, channel: MsgResponder) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::SendResponse { resp, channel })
}
pub async fn get_swarm_local_state(&self) -> Result<SwarmLocalState> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetSwarmLocalState(sender))?;
let state = receiver.await?;
Ok(state)
}
pub fn start_handle_gossip(&self) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::GossipHandler)
}
fn send_swarm_cmd(&self, cmd: SwarmCmd) -> Result<()> {
let capacity = self.swarm_cmd_sender.capacity();
let cmd_sender = self.swarm_cmd_sender.clone();
if capacity == 0 {
if matches!(cmd, SwarmCmd::AddKeysToReplicationFetcher { .. }) {
warn!(
"SwarmCmd channel is full. Dropping AddKeysToReplicationFetcher: {:?}",
cmd
);
return Ok(());
} else {
error!(
"SwarmCmd channel is full. Await capacity to send: {:?}",
cmd
);
}
}
let _handle = tokio::spawn(async move {
if let Err(error) = cmd_sender.send(cmd).await {
error!("Failed to send SwarmCmd: {}", error);
}
});
Ok(())
}
pub async fn get_closest_peers(
&self,
key: &NetworkAddress,
client: bool,
) -> Result<Vec<PeerId>> {
trace!("Getting the closest peers to {key:?}");
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetClosestPeersToAddressFromNetwork {
key: key.clone(),
sender,
})?;
let k_bucket_peers = receiver.await?;
let mut closest_peers = k_bucket_peers;
if client {
let _existed = closest_peers.remove(&self.peer_id);
}
if tracing::level_enabled!(tracing::Level::TRACE) {
let close_peers_pretty_print: Vec<_> = closest_peers
.iter()
.map(|peer_id| {
format!(
"{peer_id:?}({:?})",
PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key())
)
})
.collect();
trace!("Network knowledge of close peers to {key:?} are: {close_peers_pretty_print:?}");
}
let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?;
Ok(closest_peers.into_iter().cloned().collect())
}
pub async fn send_and_get_responses(
&self,
peers: Vec<PeerId>,
req: &Request,
get_all_responses: bool,
) -> Vec<Result<Response>> {
debug!("send_and_get_responses for {req:?}");
let mut list_of_futures = peers
.iter()
.map(|peer| Box::pin(self.send_request(req.clone(), *peer)))
.collect::<Vec<_>>();
let mut responses = Vec::new();
while !list_of_futures.is_empty() {
let (res, _, remaining_futures) = select_all(list_of_futures).await;
let res_string = match &res {
Ok(res) => format!("{res}"),
Err(err) => format!("{err:?}"),
};
debug!("Got response for the req: {req:?}, res: {res_string}");
if !get_all_responses && res.is_ok() {
return vec![res];
}
responses.push(res);
list_of_futures = remaining_futures;
}
debug!("Received all responses for {req:?}");
responses
}
}
fn get_fees_from_store_cost_responses(
mut all_costs: Vec<(MainPubkey, PaymentQuote)>,
) -> Result<(MainPubkey, PaymentQuote)> {
all_costs.sort_by(|(pub_key_a, cost_a), (pub_key_b, cost_b)| {
match cost_a.cost.cmp(&cost_b.cost) {
std::cmp::Ordering::Equal => pub_key_a.cmp(pub_key_b),
other => other,
}
});
trace!("Got all costs: {all_costs:?}");
let lowest = all_costs
.into_iter()
.next()
.ok_or(Error::NoStoreCostResponses)?;
info!("Final fees calculated as: {lowest:?}");
Ok(lowest)
}
pub fn multiaddr_is_global(multiaddr: &Multiaddr) -> bool {
!multiaddr.iter().any(|addr| match addr {
Protocol::Ip4(ip) => {
ip.is_unspecified()
| ip.is_private()
| ip.is_loopback()
| ip.is_link_local()
| ip.is_documentation()
| ip.is_broadcast()
}
_ => false,
})
}
pub(crate) fn multiaddr_pop_p2p(multiaddr: &mut Multiaddr) -> Option<PeerId> {
if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
let _ = multiaddr.pop();
Some(peer_id)
} else {
None
}
}
pub(crate) fn multiaddr_strip_p2p(multiaddr: &Multiaddr) -> Multiaddr {
multiaddr
.iter()
.filter(|p| !matches!(p, Protocol::P2p(_)))
.collect()
}
#[cfg(test)]
mod tests {
use eyre::bail;
use super::*;
use sn_transfers::PaymentQuote;
#[test]
fn test_get_fee_from_store_cost_responses() -> Result<()> {
let mut costs = vec![];
for i in 1..CLOSE_GROUP_SIZE {
let addr = MainPubkey::new(bls::SecretKey::random().public_key());
costs.push((
addr,
PaymentQuote::test_dummy(Default::default(), NanoTokens::from(i as u64)),
));
}
let expected_price = costs[0].1.cost.as_nano();
let (_key, price) = get_fees_from_store_cost_responses(costs)?;
assert_eq!(
price.cost.as_nano(),
expected_price,
"price should be {expected_price}"
);
Ok(())
}
#[test]
fn test_get_some_fee_from_store_cost_responses_even_if_one_errs_and_sufficient(
) -> eyre::Result<()> {
let responses_count = CLOSE_GROUP_SIZE as u64 - 1;
let mut costs = vec![];
for i in 1..responses_count {
let addr = MainPubkey::new(bls::SecretKey::random().public_key());
costs.push((
addr,
PaymentQuote::test_dummy(Default::default(), NanoTokens::from(i)),
));
println!("price added {i}");
}
let expected_price = costs[0].1.cost.as_nano();
let (_key, price) = match get_fees_from_store_cost_responses(costs) {
Err(_) => bail!("Should not have errored as we have enough responses"),
Ok(cost) => cost,
};
assert_eq!(
price.cost.as_nano(),
expected_price,
"price should be {expected_price}"
);
Ok(())
}
#[test]
fn test_network_sign_verify() -> eyre::Result<()> {
let (network, _, _) =
NetworkBuilder::new(Keypair::generate_ed25519(), false, std::env::temp_dir())
.build_client()?;
let msg = b"test message";
let sig = network.sign(msg)?;
assert!(network.verify(msg, &sig));
Ok(())
}
}