#[macro_use]
extern crate tracing;
mod bootstrap;
mod circular_vec;
mod cmd;
mod driver;
mod error;
mod event;
mod external_address;
mod fifo_register;
mod log_markers;
#[cfg(feature = "open-metrics")]
mod metrics;
mod network_discovery;
mod record_store;
mod record_store_api;
mod relay_manager;
mod replication_fetcher;
pub mod target_arch;
mod transactions;
mod transport;
use cmd::LocalSwarmCmd;
use xor_name::XorName;
pub use self::{
cmd::{NodeIssue, SwarmLocalState},
driver::{
GetRecordCfg, NetworkBuilder, PutRecordCfg, SwarmDriver, VerificationKind, MAX_PACKET_SIZE,
},
error::{GetRecordError, NetworkError},
event::{MsgResponder, NetworkEvent},
record_store::NodeRecordStore,
transactions::get_transactions_from_record,
};
#[cfg(feature = "open-metrics")]
pub use metrics::service::MetricsRegistries;
pub use target_arch::{interval, sleep, spawn, Instant, Interval};
use self::{cmd::NetworkSwarmCmd, error::Result};
use ant_evm::{PaymentQuote, QuotingMetrics};
use ant_protocol::{
error::Error as ProtocolError,
messages::{ChunkProof, Nonce, Query, QueryResponse, Request, Response},
storage::{RecordType, RetryStrategy, Scratchpad},
NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
};
use futures::future::select_all;
use libp2p::{
identity::Keypair,
kad::{KBucketDistance, KBucketKey, Quorum, Record, RecordKey},
multiaddr::Protocol,
request_response::OutboundFailure,
Multiaddr, PeerId,
};
use rand::Rng;
use std::{
collections::{BTreeMap, HashMap},
net::IpAddr,
sync::Arc,
};
use tokio::sync::{
mpsc::{self, Sender},
oneshot,
};
use tokio::time::Duration;
use {
ant_protocol::storage::Transaction,
ant_protocol::storage::{
try_deserialize_record, try_serialize_record, RecordHeader, RecordKind,
},
ant_registers::SignedRegister,
std::collections::HashSet,
};
#[inline]
pub const fn close_group_majority() -> usize {
CLOSE_GROUP_SIZE / 2 + 1
}
const MAX_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(750);
const MIN_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(300);
pub fn sort_peers_by_address<'a>(
peers: &'a Vec<PeerId>,
address: &NetworkAddress,
expected_entries: usize,
) -> Result<Vec<&'a PeerId>> {
sort_peers_by_key(peers, &address.as_kbucket_key(), expected_entries)
}
pub fn sort_peers_by_key<'a, T>(
peers: &'a Vec<PeerId>,
key: &KBucketKey<T>,
expected_entries: usize,
) -> Result<Vec<&'a PeerId>> {
if CLOSE_GROUP_SIZE > peers.len() {
warn!("Not enough peers in the k-bucket to satisfy the request");
return Err(NetworkError::NotEnoughPeers {
found: peers.len(),
required: CLOSE_GROUP_SIZE,
});
}
let mut peer_distances: Vec<(&PeerId, KBucketDistance)> = Vec::with_capacity(peers.len());
for peer_id in peers {
let addr = NetworkAddress::from_peer(*peer_id);
let distance = key.distance(&addr.as_kbucket_key());
peer_distances.push((peer_id, distance));
}
peer_distances.sort_by(|a, b| a.1.cmp(&b.1));
let sorted_peers: Vec<_> = peer_distances
.into_iter()
.take(expected_entries)
.map(|(peer_id, _)| peer_id)
.collect();
Ok(sorted_peers)
}
#[derive(Clone, Debug)]
pub struct Network {
inner: Arc<NetworkInner>,
}
#[derive(Debug)]
struct NetworkInner {
network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
peer_id: PeerId,
keypair: Keypair,
}
impl Network {
pub fn new(
network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
peer_id: PeerId,
keypair: Keypair,
) -> Self {
Self {
inner: Arc::new(NetworkInner {
network_swarm_cmd_sender,
local_swarm_cmd_sender,
peer_id,
keypair,
}),
}
}
pub fn peer_id(&self) -> PeerId {
self.inner.peer_id
}
pub fn keypair(&self) -> &Keypair {
&self.inner.keypair
}
pub(crate) fn network_swarm_cmd_sender(&self) -> &mpsc::Sender<NetworkSwarmCmd> {
&self.inner.network_swarm_cmd_sender
}
pub(crate) fn local_swarm_cmd_sender(&self) -> &mpsc::Sender<LocalSwarmCmd> {
&self.inner.local_swarm_cmd_sender
}
pub fn sign(&self, msg: &[u8]) -> Result<Vec<u8>> {
self.keypair().sign(msg).map_err(NetworkError::from)
}
pub fn verify(&self, msg: &[u8], sig: &[u8]) -> bool {
self.keypair().public().verify(msg, sig)
}
pub fn get_pub_key(&self) -> Vec<u8> {
self.keypair().public().encode_protobuf()
}
pub async fn dial(&self, addr: Multiaddr) -> Result<()> {
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::Dial { addr, sender });
receiver.await?
}
pub async fn client_get_all_close_peers_in_range_or_close_group(
&self,
key: &NetworkAddress,
) -> Result<Vec<PeerId>> {
self.get_all_close_peers_in_range_or_close_group(key, true)
.await
}
pub async fn node_get_closest_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
self.get_all_close_peers_in_range_or_close_group(key, false)
.await
}
pub async fn get_local_peers_with_multiaddr(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetPeersWithMultiaddr { sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn get_kbuckets(&self) -> Result<BTreeMap<u32, Vec<PeerId>>> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetKBuckets { sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn get_closest_k_value_local_peers(&self) -> Result<Vec<PeerId>> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetClosestKLocalPeers { sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn get_replicate_candidates(&self, data_addr: NetworkAddress) -> Result<Vec<PeerId>> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetReplicateCandidates { data_addr, sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn verify_chunk_existence(
&self,
chunk_address: NetworkAddress,
nonce: Nonce,
expected_proof: ChunkProof,
quorum: Quorum,
retry_strategy: Option<RetryStrategy>,
) -> Result<()> {
let total_attempts = retry_strategy
.map(|strategy| strategy.attempts())
.unwrap_or(1);
let pretty_key = PrettyPrintRecordKey::from(&chunk_address.to_record_key()).into_owned();
let expected_n_verified = get_quorum_value(&quorum);
let mut close_nodes = Vec::new();
let mut retry_attempts = 0;
while retry_attempts < total_attempts {
if retry_attempts % 2 == 0 {
close_nodes = self
.client_get_all_close_peers_in_range_or_close_group(&chunk_address)
.await?;
}
retry_attempts += 1;
info!(
"Getting ChunkProof for {pretty_key:?}. Attempts: {retry_attempts:?}/{total_attempts:?}",
);
let request = Request::Query(Query::GetChunkExistenceProof {
key: chunk_address.clone(),
nonce,
difficulty: 1,
});
let responses = self
.send_and_get_responses(&close_nodes, &request, true)
.await;
let n_verified = responses
.into_iter()
.filter_map(|(peer, resp)| {
if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(proofs))) =
resp
{
if proofs.is_empty() {
warn!("Failed to verify the ChunkProof from {peer:?}. Returned proof is empty.");
None
} else if let Ok(ref proof) = proofs[0].1 {
if expected_proof.verify(proof) {
debug!("Got a valid ChunkProof from {peer:?}");
Some(())
} else {
warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?");
None
}
} else {
warn!("Failed to verify the ChunkProof from {peer:?}, returned with error {:?}", proofs[0].1);
None
}
} else {
debug!("Did not get a valid response for the ChunkProof from {peer:?}");
None
}
})
.count();
debug!("Got {n_verified} verified chunk existence proofs for chunk_address {chunk_address:?}");
if n_verified >= expected_n_verified {
return Ok(());
}
warn!("The obtained {n_verified} verified proofs did not match the expected {expected_n_verified} verified proofs");
let waiting_time = if retry_attempts == 1 {
MIN_WAIT_BEFORE_READING_A_PUT
} else {
MIN_WAIT_BEFORE_READING_A_PUT + MIN_WAIT_BEFORE_READING_A_PUT
};
sleep(waiting_time).await;
}
Err(NetworkError::FailedToVerifyChunkProof(
chunk_address.clone(),
))
}
pub async fn get_store_quote_from_network(
&self,
record_address: NetworkAddress,
ignore_peers: Vec<PeerId>,
) -> Result<Vec<(PeerId, PaymentQuote)>> {
let mut close_nodes = self
.client_get_all_close_peers_in_range_or_close_group(&record_address)
.await?;
close_nodes.retain(|peer_id| !ignore_peers.contains(peer_id));
info!(
"For record {record_address:?} quoting {} nodes. ignore_peers is {ignore_peers:?}",
close_nodes.len()
);
if close_nodes.is_empty() {
error!("Can't get store_cost of {record_address:?}, as all close_nodes are ignored");
return Err(NetworkError::NoStoreCostResponses);
}
let request = Request::Query(Query::GetStoreQuote {
key: record_address.clone(),
nonce: None,
difficulty: 0,
});
let responses = self
.send_and_get_responses(&close_nodes, &request, true)
.await;
let mut peer_already_have_it = 0;
let enough_peers_already_have_it = close_nodes.len() / 2;
let mut all_quotes = vec![];
let mut quotes_to_pay = vec![];
for (peer, response) in responses {
info!("StoreCostReq for {record_address:?} received response: {response:?}");
match response {
Ok(Response::Query(QueryResponse::GetStoreQuote {
quote: Ok(quote),
peer_address,
storage_proofs,
})) => {
if !storage_proofs.is_empty() {
debug!("Storage proofing during GetStoreQuote to be implemented.");
}
if !quote.check_is_signed_by_claimed_peer(peer) {
warn!("Received invalid quote from {peer_address:?}, {quote:?}");
continue;
}
all_quotes.push((peer_address.clone(), quote.clone()));
quotes_to_pay.push((peer, quote));
}
Ok(Response::Query(QueryResponse::GetStoreQuote {
quote: Err(ProtocolError::RecordExists(_)),
peer_address,
storage_proofs,
})) => {
if !storage_proofs.is_empty() {
debug!("Storage proofing during GetStoreQuote to be implemented.");
}
peer_already_have_it += 1;
info!("Address {record_address:?} was already paid for according to {peer_address:?} ({peer_already_have_it}/{enough_peers_already_have_it})");
if peer_already_have_it >= enough_peers_already_have_it {
info!("Address {record_address:?} was already paid for according to {peer_already_have_it} peers, ending quote request");
return Ok(vec![]);
}
}
Err(err) => {
error!("Got an error while requesting quote from peer {peer:?}: {err}");
}
_ => {
error!("Got an unexpected response while requesting quote from peer {peer:?}: {response:?}");
}
}
}
Ok(quotes_to_pay)
}
pub async fn get_register_record_from_network(
&self,
key: RecordKey,
) -> Result<HashMap<XorName, Record>> {
let record_address = NetworkAddress::from_record_key(&key);
let close_nodes = self
.client_get_all_close_peers_in_range_or_close_group(&record_address)
.await?;
let self_address = NetworkAddress::from_peer(self.peer_id());
let request = Request::Query(Query::GetRegisterRecord {
requester: self_address,
key: record_address.clone(),
});
let responses = self
.send_and_get_responses(&close_nodes, &request, true)
.await;
let mut all_register_copies = HashMap::new();
for response in responses.into_values().flatten() {
match response {
Response::Query(QueryResponse::GetRegisterRecord(Ok((holder, content)))) => {
let register_record = Record::new(key.clone(), content.to_vec());
let content_hash = XorName::from_content(®ister_record.value);
debug!(
"RegisterRecordReq of {record_address:?} received register of version {content_hash:?} from {holder:?}"
);
let _ = all_register_copies.insert(content_hash, register_record);
}
_ => {
error!(
"RegisterRecordReq of {record_address:?} received error response, was {:?}",
response
);
}
}
}
Ok(all_register_copies)
}
pub async fn get_record_from_network(
&self,
key: RecordKey,
cfg: &GetRecordCfg,
) -> Result<Record> {
let pretty_key = PrettyPrintRecordKey::from(&key);
let mut backoff = cfg
.retry_strategy
.unwrap_or(RetryStrategy::None)
.backoff()
.into_iter();
loop {
info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",);
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord {
key: key.clone(),
sender,
cfg: cfg.clone(),
});
let result = match receiver.await {
Ok(result) => result,
Err(err) => {
error!(
"When fetching record {pretty_key:?}, encountered a channel error {err:?}"
);
return Err(NetworkError::InternalMsgChannelDropped);
}
};
let err = match result {
Ok(record) => {
info!("Record returned: {pretty_key:?}.");
return Ok(record);
}
Err(err) => err,
};
match &err {
GetRecordError::RecordDoesNotMatch(_) => {
warn!("The returned record does not match target {pretty_key:?}.");
}
GetRecordError::NotEnoughCopies { expected, got, .. } => {
warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}.");
}
GetRecordError::RecordNotFound => {
warn!("No holder of record '{pretty_key:?}' found.");
}
GetRecordError::RecordKindMismatch => {
error!("Record kind mismatch for {pretty_key:?}. This error should not happen here.");
}
GetRecordError::SplitRecord { result_map } => {
error!("Encountered a split record for {pretty_key:?}.");
if let Some(record) = Self::handle_split_record_error(result_map, &key)? {
info!("Merged the split record (register) for {pretty_key:?}, into a single record");
return Ok(record);
}
}
GetRecordError::QueryTimeout => {
error!("Encountered query timeout for {pretty_key:?}.");
}
}
match backoff.next() {
Some(Some(duration)) => {
crate::target_arch::sleep(duration).await;
debug!("Getting record from network of {pretty_key:?} via backoff...");
}
_ => break Err(err.into()),
}
}
}
fn handle_split_record_error(
result_map: &HashMap<XorName, (Record, HashSet<PeerId>)>,
key: &RecordKey,
) -> std::result::Result<Option<Record>, NetworkError> {
let pretty_key = PrettyPrintRecordKey::from(key);
let results_count = result_map.len();
let mut accumulated_transactions = HashSet::new();
let mut collected_registers = Vec::new();
let mut valid_scratchpad: Option<Scratchpad> = None;
if results_count > 1 {
let mut record_kind = None;
info!("For record {pretty_key:?}, we have more than one result returned.");
for (record, _) in result_map.values() {
let Ok(header) = RecordHeader::from_record(record) else {
continue;
};
let kind = record_kind.get_or_insert(header.kind);
if *kind != header.kind {
error!("Encountered a split record for {pretty_key:?} with different RecordHeaders. Expected {kind:?} but got {:?}. Skipping",header.kind);
continue;
}
match kind {
RecordKind::Chunk
| RecordKind::ChunkWithPayment
| RecordKind::TransactionWithPayment
| RecordKind::RegisterWithPayment
| RecordKind::ScratchpadWithPayment => {
error!("Encountered a split record for {pretty_key:?} with unexpected RecordKind {kind:?}, skipping.");
continue;
}
RecordKind::Transaction => {
info!("For record {pretty_key:?}, we have a split record for a transaction attempt. Accumulating transactions");
match get_transactions_from_record(record) {
Ok(transactions) => {
accumulated_transactions.extend(transactions);
}
Err(_) => {
continue;
}
}
}
RecordKind::Register => {
info!("For record {pretty_key:?}, we have a split record for a register. Accumulating registers");
let Ok(register) = try_deserialize_record::<SignedRegister>(record) else {
error!(
"Failed to deserialize register {pretty_key}. Skipping accumulation"
);
continue;
};
match register.verify() {
Ok(_) => {
collected_registers.push(register);
}
Err(_) => {
error!(
"Failed to verify register for {pretty_key} at address: {}. Skipping accumulation",
register.address()
);
continue;
}
}
}
RecordKind::Scratchpad => {
info!("For record {pretty_key:?}, we have a split record for a scratchpad. Selecting the one with the highest count");
let Ok(scratchpad) = try_deserialize_record::<Scratchpad>(record) else {
error!(
"Failed to deserialize scratchpad {pretty_key}. Skipping accumulation"
);
continue;
};
if !scratchpad.is_valid() {
warn!(
"Rejecting Scratchpad for {pretty_key} PUT with invalid signature during split record error"
);
continue;
}
if let Some(old) = &valid_scratchpad {
if old.count() >= scratchpad.count() {
info!(
"Rejecting Scratchpad for {pretty_key} with lower count than the previous one"
);
continue;
} else {
valid_scratchpad = Some(scratchpad);
}
} else {
valid_scratchpad = Some(scratchpad);
}
}
}
}
}
if accumulated_transactions.len() > 1 {
info!("For record {pretty_key:?} task found split record for a transaction, accumulated and sending them as a single record");
let accumulated_transactions = accumulated_transactions
.into_iter()
.collect::<Vec<Transaction>>();
let record = Record {
key: key.clone(),
value: try_serialize_record(&accumulated_transactions, RecordKind::Transaction)
.map_err(|err| {
error!(
"Error while serializing the accumulated transactions for {pretty_key:?}: {err:?}"
);
NetworkError::from(err)
})?
.to_vec(),
publisher: None,
expires: None,
};
return Ok(Some(record));
} else if !collected_registers.is_empty() {
info!("For record {pretty_key:?} task found multiple registers, merging them.");
let signed_register = collected_registers.iter().fold(collected_registers[0].clone(), |mut acc, x| {
if let Err(e) = acc.merge(x) {
warn!("Ignoring forked register as we failed to merge conflicting registers at {}: {e}", x.address());
}
acc
});
let record_value = try_serialize_record(&signed_register, RecordKind::Register)
.map_err(|err| {
error!(
"Error while serializing the merged register for {pretty_key:?}: {err:?}"
);
NetworkError::from(err)
})?
.to_vec();
let record = Record {
key: key.clone(),
value: record_value,
publisher: None,
expires: None,
};
return Ok(Some(record));
} else if let Some(scratchpad) = valid_scratchpad {
info!("Found a valid scratchpad for {pretty_key:?}, returning it");
let record = Record {
key: key.clone(),
value: try_serialize_record(&scratchpad, RecordKind::Scratchpad)
.map_err(|err| {
error!(
"Error while serializing valid scratchpad for {pretty_key:?}: {err:?}"
);
NetworkError::from(err)
})?
.to_vec(),
publisher: None,
expires: None,
};
return Ok(Some(record));
}
Ok(None)
}
pub async fn get_local_quoting_metrics(
&self,
key: RecordKey,
) -> Result<(QuotingMetrics, bool)> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalQuotingMetrics { key, sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub fn notify_payment_received(&self) {
self.send_local_swarm_cmd(LocalSwarmCmd::PaymentReceived);
}
pub async fn get_local_record(&self, key: &RecordKey) -> Result<Option<Record>> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalRecord {
key: key.clone(),
sender,
});
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn is_peer_shunned(&self, target: NetworkAddress) -> Result<bool> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::IsPeerShunned { target, sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
let pretty_key = PrettyPrintRecordKey::from(&record.key);
let mut backoff = cfg
.retry_strategy
.unwrap_or(RetryStrategy::None)
.backoff()
.into_iter();
loop {
info!(
"Attempting to PUT record with key: {pretty_key:?} to network, with cfg {cfg:?}, retrying via backoff..."
);
let err = match self.put_record_once(record.clone(), cfg).await {
Ok(_) => break Ok(()),
Err(err) => err,
};
warn!("Failed to PUT record with key: {pretty_key:?} to network (retry via backoff) with error: {err:?}");
match backoff.next() {
Some(Some(duration)) => {
crate::target_arch::sleep(duration).await;
}
_ => break Err(err),
}
}
}
async fn put_record_once(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
let record_key = record.key.clone();
let pretty_key = PrettyPrintRecordKey::from(&record_key);
info!(
"Putting record of {} - length {:?} to network",
pretty_key,
record.value.len()
);
let (sender, receiver) = oneshot::channel();
if let Some(put_record_to_peers) = &cfg.use_put_record_to {
self.send_network_swarm_cmd(NetworkSwarmCmd::PutRecordTo {
peers: put_record_to_peers.clone(),
record: record.clone(),
sender,
quorum: cfg.put_quorum,
});
} else {
self.send_network_swarm_cmd(NetworkSwarmCmd::PutRecord {
record: record.clone(),
sender,
quorum: cfg.put_quorum,
});
}
let response = receiver.await?;
if let Some((verification_kind, get_cfg)) = &cfg.verification {
let wait_duration = rand::thread_rng()
.gen_range(MIN_WAIT_BEFORE_READING_A_PUT..MAX_WAIT_BEFORE_READING_A_PUT);
sleep(wait_duration).await;
debug!("Attempting to verify {pretty_key:?} after we've slept for {wait_duration:?}");
if let VerificationKind::ChunkProof {
expected_proof,
nonce,
} = verification_kind
{
self.verify_chunk_existence(
NetworkAddress::from_record_key(&record_key),
*nonce,
expected_proof.clone(),
get_cfg.get_quorum,
get_cfg.retry_strategy,
)
.await?;
} else {
match self
.get_record_from_network(record.key.clone(), get_cfg)
.await
{
Ok(_) => {
debug!("Record {pretty_key:?} verified to be stored.");
}
Err(NetworkError::GetRecordError(GetRecordError::RecordNotFound)) => {
warn!("Record {pretty_key:?} not found after PUT, either rejected or not yet stored by nodes when we asked");
return Err(NetworkError::RecordNotStoredByNodes(
NetworkAddress::from_record_key(&record_key),
));
}
Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { .. }))
if matches!(verification_kind, VerificationKind::Crdt) =>
{
warn!("Record {pretty_key:?} is split, which is okay since we're dealing with CRDTs");
}
Err(e) => {
debug!(
"Failed to verify record {pretty_key:?} to be stored with error: {e:?}"
);
return Err(e);
}
}
}
}
response
}
pub fn notify_fetch_completed(&self, key: RecordKey, record_type: RecordType) {
self.send_local_swarm_cmd(LocalSwarmCmd::FetchCompleted((key, record_type)))
}
pub fn put_local_record(&self, record: Record) {
debug!(
"Writing Record locally, for {:?} - length {:?}",
PrettyPrintRecordKey::from(&record.key),
record.value.len()
);
self.send_local_swarm_cmd(LocalSwarmCmd::PutLocalRecord { record })
}
pub async fn is_record_key_present_locally(&self, key: &RecordKey) -> Result<bool> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::RecordStoreHasKey {
key: key.clone(),
sender,
});
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn get_all_local_record_addresses(
&self,
) -> Result<HashMap<NetworkAddress, RecordType>> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender });
receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}
pub async fn send_request(&self, req: Request, peer: PeerId) -> Result<Response> {
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
req: req.clone(),
peer,
sender: Some(sender),
});
let mut r = receiver.await?;
if let Err(error) = &r {
error!("Error in response: {:?}", error);
match error {
NetworkError::OutboundError(OutboundFailure::Io(_))
| NetworkError::OutboundError(OutboundFailure::ConnectionClosed) => {
warn!(
"Outbound failed for {req:?} .. {error:?}, redialing once and reattempting"
);
let (sender, receiver) = oneshot::channel();
debug!("Reattempting to send_request {req:?} to {peer:?}");
self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
req,
peer,
sender: Some(sender),
});
r = receiver.await?;
}
_ => {
warn!("Error in response: {:?}", error);
}
}
}
r
}
pub fn send_req_ignore_reply(&self, req: Request, peer: PeerId) {
let swarm_cmd = NetworkSwarmCmd::SendRequest {
req,
peer,
sender: None,
};
self.send_network_swarm_cmd(swarm_cmd)
}
pub fn send_response(&self, resp: Response, channel: MsgResponder) {
self.send_network_swarm_cmd(NetworkSwarmCmd::SendResponse { resp, channel })
}
pub async fn get_swarm_local_state(&self) -> Result<SwarmLocalState> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetSwarmLocalState(sender));
let state = receiver.await?;
Ok(state)
}
pub fn trigger_interval_replication(&self) {
self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIntervalReplication)
}
pub fn record_node_issues(&self, peer_id: PeerId, issue: NodeIssue) {
self.send_local_swarm_cmd(LocalSwarmCmd::RecordNodeIssue { peer_id, issue });
}
pub fn historical_verify_quotes(&self, quotes: Vec<(PeerId, PaymentQuote)>) {
self.send_local_swarm_cmd(LocalSwarmCmd::QuoteVerification { quotes });
}
pub fn trigger_irrelevant_record_cleanup(&self) {
self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIrrelevantRecordCleanup)
}
pub fn add_network_density_sample(&self, distance: KBucketDistance) {
self.send_local_swarm_cmd(LocalSwarmCmd::AddNetworkDensitySample { distance })
}
fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) {
send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd);
}
fn send_local_swarm_cmd(&self, cmd: LocalSwarmCmd) {
send_local_swarm_cmd(self.local_swarm_cmd_sender().clone(), cmd);
}
pub async fn get_all_close_peers_in_range_or_close_group(
&self,
key: &NetworkAddress,
client: bool,
) -> Result<Vec<PeerId>> {
let pretty_key = PrettyPrintKBucketKey(key.as_kbucket_key());
debug!("Getting the all closest peers in range of {pretty_key:?}");
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork {
key: key.clone(),
sender,
});
let found_peers = receiver.await?;
let result_len = found_peers.len();
let mut closest_peers = found_peers;
if client {
closest_peers.retain(|&x| x != self.peer_id());
if result_len != closest_peers.len() {
info!("Remove self client from the closest_peers");
}
}
if tracing::level_enabled!(tracing::Level::DEBUG) {
let close_peers_pretty_print: Vec<_> = closest_peers
.iter()
.map(|peer_id| {
format!(
"{peer_id:?}({:?})",
PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key())
)
})
.collect();
debug!(
"Network knowledge of closest peers to {pretty_key:?} are: {close_peers_pretty_print:?}"
);
}
let expanded_close_group = CLOSE_GROUP_SIZE + CLOSE_GROUP_SIZE / 2;
let closest_peers = sort_peers_by_address(&closest_peers, key, expanded_close_group)?;
Ok(closest_peers.into_iter().cloned().collect())
}
pub async fn send_and_get_responses(
&self,
peers: &[PeerId],
req: &Request,
get_all_responses: bool,
) -> BTreeMap<PeerId, Result<Response>> {
debug!("send_and_get_responses for {req:?}");
let mut list_of_futures = peers
.iter()
.map(|peer| {
Box::pin(async {
let resp = self.send_request(req.clone(), *peer).await;
(*peer, resp)
})
})
.collect::<Vec<_>>();
let mut responses = BTreeMap::new();
while !list_of_futures.is_empty() {
let ((peer, resp), _, remaining_futures) = select_all(list_of_futures).await;
let resp_string = match &resp {
Ok(resp) => format!("{resp}"),
Err(err) => format!("{err:?}"),
};
debug!("Got response from {peer:?} for the req: {req:?}, resp: {resp_string}");
if !get_all_responses && resp.is_ok() {
return BTreeMap::from([(peer, resp)]);
}
responses.insert(peer, resp);
list_of_futures = remaining_futures;
}
debug!("Received all responses for {req:?}");
responses
}
}
pub fn get_quorum_value(quorum: &Quorum) -> usize {
match quorum {
Quorum::Majority => close_group_majority(),
Quorum::All => CLOSE_GROUP_SIZE,
Quorum::N(v) => v.get(),
Quorum::One => 1,
}
}
pub fn multiaddr_is_global(multiaddr: &Multiaddr) -> bool {
!multiaddr.iter().any(|addr| match addr {
Protocol::Ip4(ip) => {
ip.is_unspecified()
| ip.is_private()
| ip.is_loopback()
| ip.is_link_local()
| ip.is_documentation()
| ip.is_broadcast()
}
_ => false,
})
}
pub(crate) fn multiaddr_pop_p2p(multiaddr: &mut Multiaddr) -> Option<PeerId> {
if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
let _ = multiaddr.pop();
Some(peer_id)
} else {
None
}
}
pub(crate) fn multiaddr_strip_p2p(multiaddr: &Multiaddr) -> Multiaddr {
let is_relayed = multiaddr.iter().any(|p| matches!(p, Protocol::P2pCircuit));
if is_relayed {
let mut before_relay_protocol = true;
let mut new_multi_addr = Multiaddr::empty();
for p in multiaddr.iter() {
if matches!(p, Protocol::P2pCircuit) {
before_relay_protocol = false;
}
if matches!(p, Protocol::P2p(_)) && !before_relay_protocol {
continue;
}
new_multi_addr.push(p);
}
new_multi_addr
} else {
multiaddr
.iter()
.filter(|p| !matches!(p, Protocol::P2p(_)))
.collect()
}
}
pub(crate) fn multiaddr_get_ip(addr: &Multiaddr) -> Option<IpAddr> {
addr.iter().find_map(|p| match p {
Protocol::Ip4(addr) => Some(IpAddr::V4(addr)),
Protocol::Ip6(addr) => Some(IpAddr::V6(addr)),
_ => None,
})
}
pub(crate) fn multiaddr_get_port(addr: &Multiaddr) -> Option<u16> {
addr.iter().find_map(|p| match p {
Protocol::Udp(port) => Some(port),
_ => None,
})
}
pub(crate) fn send_local_swarm_cmd(swarm_cmd_sender: Sender<LocalSwarmCmd>, cmd: LocalSwarmCmd) {
let capacity = swarm_cmd_sender.capacity();
if capacity == 0 {
error!(
"SwarmCmd channel is full. Await capacity to send: {:?}",
cmd
);
}
let _handle = spawn(async move {
if let Err(error) = swarm_cmd_sender.send(cmd).await {
error!("Failed to send SwarmCmd: {}", error);
}
});
}
pub(crate) fn send_network_swarm_cmd(
swarm_cmd_sender: Sender<NetworkSwarmCmd>,
cmd: NetworkSwarmCmd,
) {
let capacity = swarm_cmd_sender.capacity();
if capacity == 0 {
error!(
"SwarmCmd channel is full. Await capacity to send: {:?}",
cmd
);
}
let _handle = spawn(async move {
if let Err(error) = swarm_cmd_sender.send(cmd).await {
error!("Failed to send SwarmCmd: {}", error);
}
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_network_sign_verify() -> eyre::Result<()> {
let (network, _, _) =
NetworkBuilder::new(Keypair::generate_ed25519(), false).build_client()?;
let msg = b"test message";
let sig = network.sign(msg)?;
assert!(network.verify(msg, &sig));
Ok(())
}
}