#[macro_use]
extern crate tracing;
mod bootstrap;
mod circular_vec;
mod cmd;
mod driver;
mod error;
mod event;
mod get_record_handler;
mod log_markers;
#[cfg(feature = "open-metrics")]
mod metrics;
#[cfg(feature = "open-metrics")]
mod metrics_service;
mod network_discovery;
mod record_store;
mod record_store_api;
mod replication_fetcher;
mod spends;
pub mod target_arch;
mod transfers;
mod transport;
pub use target_arch::{interval, sleep, spawn, Instant, Interval};
pub use self::{
cmd::{NodeIssue, SwarmLocalState},
driver::{GetRecordCfg, NetworkBuilder, PutRecordCfg, SwarmDriver, VerificationKind},
error::{GetRecordError, NetworkError},
event::{MsgResponder, NetworkEvent},
record_store::{calculate_cost_for_records, NodeRecordStore},
transfers::{get_raw_signed_spends_from_record, get_signed_spend_from_record},
};
use self::{cmd::SwarmCmd, error::Result};
use backoff::{Error as BackoffError, ExponentialBackoff};
use futures::future::select_all;
use libp2p::{
identity::Keypair,
kad::{KBucketDistance, KBucketKey, Quorum, Record, RecordKey},
multiaddr::Protocol,
Multiaddr, PeerId,
};
use rand::Rng;
use sn_protocol::{
error::Error as ProtocolError,
messages::{ChunkProof, Cmd, Nonce, Query, QueryResponse, Request, Response},
storage::{RecordType, RetryStrategy},
NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey,
};
use sn_transfers::{MainPubkey, NanoTokens, PaymentQuote, QuotingMetrics};
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
path::PathBuf,
sync::Arc,
};
use tokio::sync::{
mpsc::{self, Sender},
oneshot,
};
use tokio::time::Duration;
use tracing::trace;
pub type PayeeQuote = (PeerId, MainPubkey, PaymentQuote);
pub const CLOSE_GROUP_SIZE: usize = 5;
pub const REPLICATE_RANGE: usize = CLOSE_GROUP_SIZE + 2;
#[inline]
pub const fn close_group_majority() -> usize {
CLOSE_GROUP_SIZE / 2 + 1
}
const MAX_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(750);
const MIN_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(300);
#[allow(clippy::result_large_err)]
pub fn sort_peers_by_address<'a>(
peers: &'a Vec<PeerId>,
address: &NetworkAddress,
expected_entries: usize,
) -> Result<Vec<&'a PeerId>> {
sort_peers_by_key(peers, &address.as_kbucket_key(), expected_entries)
}
#[allow(clippy::result_large_err)]
pub fn sort_peers_by_key<'a, T>(
peers: &'a Vec<PeerId>,
key: &KBucketKey<T>,
expected_entries: usize,
) -> Result<Vec<&'a PeerId>> {
if CLOSE_GROUP_SIZE > peers.len() {
warn!("Not enough peers in the k-bucket to satisfy the request");
return Err(NetworkError::NotEnoughPeers {
found: peers.len(),
required: CLOSE_GROUP_SIZE,
});
}
let mut peer_distances: Vec<(&PeerId, KBucketDistance)> = Vec::with_capacity(peers.len());
for peer_id in peers {
let addr = NetworkAddress::from_peer(*peer_id);
let distance = key.distance(&addr.as_kbucket_key());
peer_distances.push((peer_id, distance));
}
peer_distances.sort_by(|a, b| a.1.cmp(&b.1));
let sorted_peers: Vec<_> = peer_distances
.into_iter()
.take(expected_entries)
.map(|(peer_id, _)| peer_id)
.collect();
Ok(sorted_peers)
}
#[derive(Clone)]
pub struct Network {
pub swarm_cmd_sender: mpsc::Sender<SwarmCmd>,
pub peer_id: Arc<PeerId>,
pub root_dir_path: Arc<PathBuf>,
keypair: Arc<Keypair>,
}
impl Network {
pub fn sign(&self, msg: &[u8]) -> Result<Vec<u8>> {
self.keypair.sign(msg).map_err(NetworkError::from)
}
pub fn verify(&self, msg: &[u8], sig: &[u8]) -> bool {
self.keypair.public().verify(msg, sig)
}
pub fn get_pub_key(&self) -> Vec<u8> {
self.keypair.public().encode_protobuf()
}
pub async fn dial(&self, addr: Multiaddr) -> Result<()> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::Dial { addr, sender });
receiver.await?
}
pub async fn client_get_closest_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
self.get_closest_peers(key, true).await
}
pub async fn node_get_closest_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
self.get_closest_peers(key, false).await
}
pub async fn get_kbuckets(&self) -> Result<BTreeMap<u32, Vec<PeerId>>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetKBuckets { sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn get_close_group_local_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetCloseGroupLocalPeers {
key: key.clone(),
sender,
});
match receiver.await {
Ok(close_peers) => {
if tracing::level_enabled!(tracing::Level::TRACE) {
let close_peers_pretty_print: Vec<_> = close_peers
.iter()
.map(|peer_id| {
format!(
"{peer_id:?}({:?})",
PrettyPrintKBucketKey(
NetworkAddress::from_peer(*peer_id).as_kbucket_key()
)
)
})
.collect();
trace!(
"Local knowledge of close peers to {key:?} are: {close_peers_pretty_print:?}"
);
}
Ok(close_peers)
}
Err(err) => {
error!("When getting local knowledge of close peers to {key:?}, failed with error {err:?}");
Err(NetworkError::InternalMsgChannelDropped)
}
}
}
pub async fn get_all_local_peers(&self) -> Result<Vec<PeerId>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetAllLocalPeers { sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn get_closest_k_value_local_peers(&self) -> Result<Vec<PeerId>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetClosestKLocalPeers { sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn verify_chunk_existence(
&self,
chunk_address: NetworkAddress,
nonce: Nonce,
expected_proof: ChunkProof,
quorum: Quorum,
retry_strategy: Option<RetryStrategy>,
) -> Result<()> {
let mut total_attempts = 1;
total_attempts += retry_strategy
.map(|strategy| strategy.get_count())
.unwrap_or(0);
let pretty_key = PrettyPrintRecordKey::from(&chunk_address.to_record_key()).into_owned();
let expected_n_verified = get_quorum_value(&quorum);
let mut close_nodes = Vec::new();
let mut retry_attempts = 0;
while retry_attempts < total_attempts {
if retry_attempts % 2 == 0 {
close_nodes = self.get_closest_peers(&chunk_address, true).await?;
}
retry_attempts += 1;
info!(
"Getting ChunkProof for {pretty_key:?}. Attempts: {retry_attempts:?}/{total_attempts:?}",
);
let request = Request::Query(Query::GetChunkExistenceProof {
key: chunk_address.clone(),
nonce,
});
let responses = self
.send_and_get_responses(&close_nodes, &request, true)
.await;
let n_verified = responses
.into_iter()
.filter_map(|(peer, resp)| {
if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) =
resp
{
if expected_proof.verify(&proof) {
debug!("Got a valid ChunkProof from {peer:?}");
Some(())
} else {
warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?");
None
}
} else {
debug!("Did not get a valid response for the ChunkProof from {peer:?}");
None
}
})
.count();
debug!("Got {n_verified} verified chunk existence proofs for chunk_address {chunk_address:?}");
if n_verified >= expected_n_verified {
return Ok(());
}
warn!("The obtained {n_verified} verified proofs did not match the expected {expected_n_verified} verified proofs");
let waiting_time = if retry_attempts == 1 {
MIN_WAIT_BEFORE_READING_A_PUT
} else {
MIN_WAIT_BEFORE_READING_A_PUT + MIN_WAIT_BEFORE_READING_A_PUT
};
sleep(waiting_time).await;
}
Err(NetworkError::FailedToVerifyChunkProof(
chunk_address.clone(),
))
}
pub async fn get_store_costs_from_network(
&self,
record_address: NetworkAddress,
ignore_peers: Vec<PeerId>,
) -> Result<PayeeQuote> {
let close_nodes = self.get_closest_peers(&record_address, true).await?;
let request = Request::Query(Query::GetStoreCost(record_address.clone()));
let responses = self
.send_and_get_responses(&close_nodes, &request, true)
.await;
let mut all_costs = vec![];
let mut all_quotes = vec![];
for response in responses.into_values().flatten() {
debug!(
"StoreCostReq for {record_address:?} received response: {:?}",
response
);
match response {
Response::Query(QueryResponse::GetStoreCost {
quote: Ok(quote),
payment_address,
peer_address,
}) => {
all_costs.push((peer_address.clone(), payment_address, quote.clone()));
all_quotes.push((peer_address, quote));
}
Response::Query(QueryResponse::GetStoreCost {
quote: Err(ProtocolError::RecordExists(_)),
payment_address,
peer_address,
}) => {
all_costs.push((peer_address, payment_address, PaymentQuote::zero()));
}
_ => {
error!("Non store cost response received, was {:?}", response);
}
}
}
for peer_id in close_nodes.iter() {
let request = Request::Cmd(Cmd::QuoteVerification {
target: NetworkAddress::from_peer(*peer_id),
quotes: all_quotes.clone(),
});
self.send_req_ignore_reply(request, *peer_id);
}
all_costs.sort_by(|(peer_address_a, _, _), (peer_address_b, _, _)| {
record_address
.distance(peer_address_a)
.cmp(&record_address.distance(peer_address_b))
});
#[allow(clippy::mutable_key_type)]
let ignore_peers = ignore_peers
.into_iter()
.map(NetworkAddress::from_peer)
.collect::<BTreeSet<_>>();
let all_costs = all_costs
.into_iter()
.filter(|(peer_address, ..)| !ignore_peers.contains(peer_address))
.take(close_group_majority())
.collect();
get_fees_from_store_cost_responses(all_costs)
}
#[cfg(target_arch = "wasm32")]
pub async fn get_record_from_network(
&self,
key: RecordKey,
cfg: &GetRecordCfg,
) -> Result<Record> {
let pretty_key = PrettyPrintRecordKey::from(&key);
info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",);
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetNetworkRecord {
key: key.clone(),
sender,
cfg: cfg.clone(),
});
let result = receiver.await.map_err(|e| {
error!("When fetching record {pretty_key:?}, encountered a channel error {e:?}");
NetworkError::InternalMsgChannelDropped
})?;
result.map_err(NetworkError::from)
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn get_record_from_network(
&self,
key: RecordKey,
cfg: &GetRecordCfg,
) -> Result<Record> {
let retry_duration = cfg.retry_strategy.map(|strategy| strategy.get_duration());
backoff::future::retry(
ExponentialBackoff {
max_elapsed_time: retry_duration,
..Default::default()
},
|| async {
let pretty_key = PrettyPrintRecordKey::from(&key);
info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",);
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetNetworkRecord {
key: key.clone(),
sender,
cfg: cfg.clone(),
});
let result = receiver.await.map_err(|e| {
error!("When fetching record {pretty_key:?}, encountered a channel error {e:?}");
NetworkError::InternalMsgChannelDropped
}).map_err(|err| BackoffError::Transient { err, retry_after: None })?;
match &result {
Ok(_) => {
info!("Record returned: {pretty_key:?}.");
}
Err(GetRecordError::RecordDoesNotMatch(_)) => {
warn!("The returned record does not match target {pretty_key:?}.");
}
Err(GetRecordError::NotEnoughCopies { expected, got, .. }) => {
warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}.");
}
Err(GetRecordError::RecordNotFound) => {
warn!("No holder of record '{pretty_key:?}' found.");
}
Err(GetRecordError::SplitRecord { .. }) => {
error!("Encountered a split record for {pretty_key:?}.");
}
Err(GetRecordError::QueryTimeout) => {
error!("Encountered query timeout for {pretty_key:?}.");
}
};
if cfg.retry_strategy.is_none() {
if let Err(e) = result {
return Err(BackoffError::Permanent(NetworkError::from(e)));
}
}
if result.is_err() {
trace!("Getting record from network of {pretty_key:?} via backoff...");
}
result.map_err(|err| BackoffError::Transient {
err: NetworkError::from(err),
retry_after: None,
})
},
)
.await
}
pub async fn get_local_storecost(
&self,
key: RecordKey,
) -> Result<(NanoTokens, QuotingMetrics)> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetLocalStoreCost { key, sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub fn notify_payment_received(&self) {
self.send_swarm_cmd(SwarmCmd::PaymentReceived);
}
pub async fn get_local_record(&self, key: &RecordKey) -> Result<Option<Record>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetLocalRecord {
key: key.clone(),
sender,
});
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn is_peer_shunned(&self, target: NetworkAddress) -> Result<bool> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::IsPeerShunned { target, sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
let pretty_key = PrettyPrintRecordKey::from(&record.key);
let retry_duration = cfg.retry_strategy.map(|strategy| strategy.get_duration());
backoff::future::retry(
ExponentialBackoff {
max_elapsed_time: retry_duration,
..Default::default()
}, || async {
info!(
"Attempting to PUT record with key: {pretty_key:?} to network, with cfg {cfg:?}, retrying via backoff..."
);
self.put_record_once(record.clone(), cfg).await.map_err(|err|
{
warn!("Failed to PUT record with key: {pretty_key:?} to network (retry via backoff) with error: {err:?}");
if cfg.retry_strategy.is_some() {
BackoffError::Transient { err, retry_after: None }
} else {
BackoffError::Permanent(err)
}
})
}).await
}
async fn put_record_once(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
let record_key = record.key.clone();
let pretty_key = PrettyPrintRecordKey::from(&record_key);
info!(
"Putting record of {} - length {:?} to network",
pretty_key,
record.value.len()
);
let (sender, receiver) = oneshot::channel();
if let Some(put_record_to_peers) = &cfg.use_put_record_to {
self.send_swarm_cmd(SwarmCmd::PutRecordTo {
peers: put_record_to_peers.clone(),
record: record.clone(),
sender,
quorum: cfg.put_quorum,
});
} else {
self.send_swarm_cmd(SwarmCmd::PutRecord {
record: record.clone(),
sender,
quorum: cfg.put_quorum,
});
}
let response = receiver.await?;
if let Some((verification_kind, get_cfg)) = &cfg.verification {
let wait_duration = rand::thread_rng()
.gen_range(MIN_WAIT_BEFORE_READING_A_PUT..MAX_WAIT_BEFORE_READING_A_PUT);
sleep(wait_duration).await;
debug!("Attempting to verify {pretty_key:?} after we've slept for {wait_duration:?}");
if let VerificationKind::ChunkProof {
expected_proof,
nonce,
} = verification_kind
{
self.verify_chunk_existence(
NetworkAddress::from_record_key(&record_key),
*nonce,
expected_proof.clone(),
get_cfg.get_quorum,
get_cfg.retry_strategy,
)
.await?;
} else {
match self
.get_record_from_network(record.key.clone(), get_cfg)
.await
{
Ok(_) => {
debug!("Record {pretty_key:?} verified to be stored.");
}
Err(NetworkError::GetRecordError(GetRecordError::RecordNotFound)) => {
warn!("Record {pretty_key:?} not found after PUT, either rejected or not yet stored by nodes when we asked");
return Err(NetworkError::RecordNotStoredByNodes(
NetworkAddress::from_record_key(&record_key),
));
}
Err(e) => {
debug!(
"Failed to verify record {pretty_key:?} to be stored with error: {e:?}"
);
return Err(e);
}
}
}
}
response
}
pub fn put_local_record(&self, record: Record) {
trace!(
"Writing Record locally, for {:?} - length {:?}",
PrettyPrintRecordKey::from(&record.key),
record.value.len()
);
self.send_swarm_cmd(SwarmCmd::PutLocalRecord { record })
}
pub async fn is_record_key_present_locally(&self, key: &RecordKey) -> Result<bool> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::RecordStoreHasKey {
key: key.clone(),
sender,
});
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn get_all_local_record_addresses(
&self,
) -> Result<HashMap<NetworkAddress, RecordType>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetAllLocalRecordAddresses { sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn send_request(&self, req: Request, peer: PeerId) -> Result<Response> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::SendRequest {
req,
peer,
sender: Some(sender),
});
receiver.await?
}
pub fn send_req_ignore_reply(&self, req: Request, peer: PeerId) {
let swarm_cmd = SwarmCmd::SendRequest {
req,
peer,
sender: None,
};
self.send_swarm_cmd(swarm_cmd)
}
pub fn send_response(&self, resp: Response, channel: MsgResponder) {
self.send_swarm_cmd(SwarmCmd::SendResponse { resp, channel })
}
pub async fn get_swarm_local_state(&self) -> Result<SwarmLocalState> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetSwarmLocalState(sender));
let state = receiver.await?;
Ok(state)
}
pub fn trigger_interval_replication(&self) {
self.send_swarm_cmd(SwarmCmd::TriggerIntervalReplication)
}
pub fn record_node_issues(&self, peer_id: PeerId, issue: NodeIssue) {
self.send_swarm_cmd(SwarmCmd::RecordNodeIssue { peer_id, issue });
}
pub fn historical_verify_quotes(&self, quotes: Vec<(PeerId, PaymentQuote)>) {
self.send_swarm_cmd(SwarmCmd::QuoteVerification { quotes });
}
fn send_swarm_cmd(&self, cmd: SwarmCmd) {
send_swarm_cmd(self.swarm_cmd_sender.clone(), cmd);
}
pub async fn get_closest_peers(
&self,
key: &NetworkAddress,
client: bool,
) -> Result<Vec<PeerId>> {
trace!("Getting the closest peers to {key:?}");
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetClosestPeersToAddressFromNetwork {
key: key.clone(),
sender,
});
let k_bucket_peers = receiver.await?;
let mut closest_peers = k_bucket_peers;
if client {
closest_peers.retain(|&x| x != *self.peer_id);
}
if tracing::level_enabled!(tracing::Level::TRACE) {
let close_peers_pretty_print: Vec<_> = closest_peers
.iter()
.map(|peer_id| {
format!(
"{peer_id:?}({:?})",
PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key())
)
})
.collect();
trace!("Network knowledge of close peers to {key:?} are: {close_peers_pretty_print:?}");
}
let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?;
Ok(closest_peers.into_iter().cloned().collect())
}
pub async fn send_and_get_responses(
&self,
peers: &[PeerId],
req: &Request,
get_all_responses: bool,
) -> BTreeMap<PeerId, Result<Response>> {
debug!("send_and_get_responses for {req:?}");
let mut list_of_futures = peers
.iter()
.map(|peer| {
Box::pin(async {
let resp = self.send_request(req.clone(), *peer).await;
(*peer, resp)
})
})
.collect::<Vec<_>>();
let mut responses = BTreeMap::new();
while !list_of_futures.is_empty() {
let ((peer, resp), _, remaining_futures) = select_all(list_of_futures).await;
let resp_string = match &resp {
Ok(resp) => format!("{resp}"),
Err(err) => format!("{err:?}"),
};
debug!("Got response from {peer:?} for the req: {req:?}, resp: {resp_string}");
if !get_all_responses && resp.is_ok() {
return BTreeMap::from([(peer, resp)]);
}
responses.insert(peer, resp);
list_of_futures = remaining_futures;
}
debug!("Received all responses for {req:?}");
responses
}
}
fn get_fees_from_store_cost_responses(
mut all_costs: Vec<(NetworkAddress, MainPubkey, PaymentQuote)>,
) -> Result<PayeeQuote> {
all_costs.sort_by(
|(address_a, _main_key_a, cost_a), (address_b, _main_key_b, cost_b)| match cost_a
.cost
.cmp(&cost_b.cost)
{
std::cmp::Ordering::Equal => address_a.cmp(address_b),
other => other,
},
);
trace!("Got all costs: {all_costs:?}");
let payee = all_costs
.into_iter()
.next()
.ok_or(NetworkError::NoStoreCostResponses)?;
info!("Final fees calculated as: {payee:?}");
let payee_id = if let Some(peer_id) = payee.0.as_peer_id() {
peer_id
} else {
error!("Can't get PeerId from payee {:?}", payee.0);
return Err(NetworkError::NoStoreCostResponses);
};
Ok((payee_id, payee.1, payee.2))
}
pub fn get_quorum_value(quorum: &Quorum) -> usize {
match quorum {
Quorum::Majority => close_group_majority(),
Quorum::All => CLOSE_GROUP_SIZE,
Quorum::N(v) => v.get(),
Quorum::One => 1,
}
}
pub fn multiaddr_is_global(multiaddr: &Multiaddr) -> bool {
!multiaddr.iter().any(|addr| match addr {
Protocol::Ip4(ip) => {
ip.is_unspecified()
| ip.is_private()
| ip.is_loopback()
| ip.is_link_local()
| ip.is_documentation()
| ip.is_broadcast()
}
_ => false,
})
}
pub(crate) fn multiaddr_pop_p2p(multiaddr: &mut Multiaddr) -> Option<PeerId> {
if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
let _ = multiaddr.pop();
Some(peer_id)
} else {
None
}
}
pub(crate) fn multiaddr_strip_p2p(multiaddr: &Multiaddr) -> Multiaddr {
multiaddr
.iter()
.filter(|p| !matches!(p, Protocol::P2p(_)))
.collect()
}
pub(crate) fn send_swarm_cmd(swarm_cmd_sender: Sender<SwarmCmd>, cmd: SwarmCmd) {
let capacity = swarm_cmd_sender.capacity();
if capacity == 0 {
error!(
"SwarmCmd channel is full. Await capacity to send: {:?}",
cmd
);
}
let _handle = spawn(async move {
if let Err(error) = swarm_cmd_sender.send(cmd).await {
error!("Failed to send SwarmCmd: {}", error);
}
});
}
#[cfg(test)]
mod tests {
use eyre::bail;
use super::*;
use sn_transfers::PaymentQuote;
#[test]
fn test_get_fee_from_store_cost_responses() -> Result<()> {
let mut costs = vec![];
for i in 1..CLOSE_GROUP_SIZE {
let addr = MainPubkey::new(bls::SecretKey::random().public_key());
costs.push((
NetworkAddress::from_peer(PeerId::random()),
addr,
PaymentQuote::test_dummy(Default::default(), NanoTokens::from(i as u64)),
));
}
let expected_price = costs[0].2.cost.as_nano();
let (_peer_id, _key, price) = get_fees_from_store_cost_responses(costs)?;
assert_eq!(
price.cost.as_nano(),
expected_price,
"price should be {expected_price}"
);
Ok(())
}
#[test]
fn test_get_some_fee_from_store_cost_responses_even_if_one_errs_and_sufficient(
) -> eyre::Result<()> {
let responses_count = CLOSE_GROUP_SIZE as u64 - 1;
let mut costs = vec![];
for i in 1..responses_count {
let addr = MainPubkey::new(bls::SecretKey::random().public_key());
costs.push((
NetworkAddress::from_peer(PeerId::random()),
addr,
PaymentQuote::test_dummy(Default::default(), NanoTokens::from(i)),
));
println!("price added {i}");
}
let expected_price = costs[0].2.cost.as_nano();
let (_peer_id, _key, price) = match get_fees_from_store_cost_responses(costs) {
Err(_) => bail!("Should not have errored as we have enough responses"),
Ok(cost) => cost,
};
assert_eq!(
price.cost.as_nano(),
expected_price,
"price should be {expected_price}"
);
Ok(())
}
#[test]
fn test_network_sign_verify() -> eyre::Result<()> {
let (network, _, _) =
NetworkBuilder::new(Keypair::generate_ed25519(), false, std::env::temp_dir())
.build_client()?;
let msg = b"test message";
let sig = network.sign(msg)?;
assert!(network.verify(msg, &sig));
Ok(())
}
}