use super::{
error::Result, event::NodeEventsChannel, quote::quotes_verification, Marker, NodeEvent,
};
#[cfg(feature = "open-metrics")]
use crate::metrics::NodeMetricsRecorder;
use crate::RunningNode;
use ant_bootstrap::BootstrapCacheStore;
use ant_evm::RewardsAddress;
#[cfg(feature = "open-metrics")]
use ant_networking::MetricsRegistries;
use ant_networking::{
target_arch::sleep, Instant, Network, NetworkBuilder, NetworkEvent, NodeIssue, SwarmDriver,
};
use ant_protocol::{
convert_distance_to_u256,
error::Error as ProtocolError,
messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
storage::RecordType,
NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
};
use bytes::Bytes;
use itertools::Itertools;
use libp2p::{identity::Keypair, Multiaddr, PeerId};
use num_traits::cast::ToPrimitive;
use rand::{
rngs::{OsRng, StdRng},
thread_rng, Rng, SeedableRng,
};
use std::{
collections::HashMap,
net::SocketAddr,
path::PathBuf,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use tokio::{
sync::mpsc::Receiver,
task::{spawn, JoinSet},
};
use ant_evm::{EvmNetwork, U256};
pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
const UNRELEVANT_RECORDS_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600);
const HIGHEST_SCORE: usize = 100;
const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 5000;
const TIME_STEP: usize = 20;
const NETWORK_DENSITY_SAMPLING_INTERVAL_MAX_S: u64 = 200;
pub struct NodeBuilder {
bootstrap_cache: Option<BootstrapCacheStore>,
initial_peers: Vec<Multiaddr>,
identity_keypair: Keypair,
evm_address: RewardsAddress,
evm_network: EvmNetwork,
addr: SocketAddr,
local: bool,
root_dir: PathBuf,
#[cfg(feature = "open-metrics")]
metrics_server_port: Option<u16>,
is_behind_home_network: bool,
#[cfg(feature = "upnp")]
upnp: bool,
}
impl NodeBuilder {
pub fn new(
identity_keypair: Keypair,
evm_address: RewardsAddress,
evm_network: EvmNetwork,
addr: SocketAddr,
local: bool,
root_dir: PathBuf,
#[cfg(feature = "upnp")] upnp: bool,
) -> Self {
Self {
bootstrap_cache: None,
initial_peers: vec![],
identity_keypair,
evm_address,
evm_network,
addr,
local,
root_dir,
#[cfg(feature = "open-metrics")]
metrics_server_port: None,
is_behind_home_network: false,
#[cfg(feature = "upnp")]
upnp,
}
}
#[cfg(feature = "open-metrics")]
pub fn metrics_server_port(&mut self, port: Option<u16>) {
self.metrics_server_port = port;
}
pub fn bootstrap_cache(&mut self, cache: BootstrapCacheStore) {
self.bootstrap_cache = Some(cache);
}
pub fn initial_peers(&mut self, peers: Vec<Multiaddr>) {
self.initial_peers = peers;
}
pub fn is_behind_home_network(&mut self, is_behind_home_network: bool) {
self.is_behind_home_network = is_behind_home_network;
}
pub fn build_and_run(self) -> Result<RunningNode> {
let mut network_builder = NetworkBuilder::new(self.identity_keypair, self.local);
#[cfg(feature = "open-metrics")]
let metrics_recorder = if self.metrics_server_port.is_some() {
let mut metrics_registries = MetricsRegistries::default();
let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
network_builder.metrics_registries(metrics_registries);
Some(metrics_recorder)
} else {
None
};
network_builder.listen_addr(self.addr);
#[cfg(feature = "open-metrics")]
network_builder.metrics_server_port(self.metrics_server_port);
network_builder.is_behind_home_network(self.is_behind_home_network);
if let Some(cache) = self.bootstrap_cache {
network_builder.bootstrap_cache(cache);
}
#[cfg(feature = "upnp")]
network_builder.upnp(self.upnp);
let (network, network_event_receiver, swarm_driver) =
network_builder.build_node(self.root_dir.clone())?;
let node_events_channel = NodeEventsChannel::default();
let node = NodeInner {
network: network.clone(),
events_channel: node_events_channel.clone(),
initial_peers: self.initial_peers,
reward_address: self.evm_address,
#[cfg(feature = "open-metrics")]
metrics_recorder,
evm_network: self.evm_network,
};
let node = Node {
inner: Arc::new(node),
};
let running_node = RunningNode {
network,
node_events_channel,
root_dir_path: self.root_dir,
rewards_address: self.evm_address,
};
node.run(swarm_driver, network_event_receiver);
Ok(running_node)
}
}
#[derive(Clone)]
pub(crate) struct Node {
inner: Arc<NodeInner>,
}
struct NodeInner {
events_channel: NodeEventsChannel,
initial_peers: Vec<Multiaddr>,
network: Network,
#[cfg(feature = "open-metrics")]
metrics_recorder: Option<NodeMetricsRecorder>,
reward_address: RewardsAddress,
evm_network: EvmNetwork,
}
impl Node {
pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
&self.inner.events_channel
}
pub(crate) fn initial_peers(&self) -> &Vec<Multiaddr> {
&self.inner.initial_peers
}
pub(crate) fn network(&self) -> &Network {
&self.inner.network
}
#[cfg(feature = "open-metrics")]
pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
self.inner.metrics_recorder.as_ref()
}
pub(crate) fn reward_address(&self) -> &RewardsAddress {
&self.inner.reward_address
}
pub(crate) fn evm_network(&self) -> &EvmNetwork {
&self.inner.evm_network
}
fn run(self, swarm_driver: SwarmDriver, mut network_event_receiver: Receiver<NetworkEvent>) {
let mut rng = StdRng::from_entropy();
let peers_connected = Arc::new(AtomicUsize::new(0));
let _handle = spawn(swarm_driver.run());
let _handle = spawn(async move {
let replication_interval: u64 = rng.gen_range(
PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
);
let replication_interval_time = Duration::from_secs(replication_interval);
debug!("Replication interval set to {replication_interval_time:?}");
let mut replication_interval = tokio::time::interval(replication_interval_time);
let _ = replication_interval.tick().await; let mut uptime_metrics_update_interval =
tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
let _ = uptime_metrics_update_interval.tick().await; let mut irrelevant_records_cleanup_interval =
tokio::time::interval(UNRELEVANT_RECORDS_CLEANUP_INTERVAL);
let _ = irrelevant_records_cleanup_interval.tick().await; let storage_challenge_interval: u64 =
rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
let mut storage_challenge_interval =
tokio::time::interval(storage_challenge_interval_time);
let _ = storage_challenge_interval.tick().await; let network_density_sampling_interval: u64 = rng.gen_range(
NETWORK_DENSITY_SAMPLING_INTERVAL_MAX_S / 2
..NETWORK_DENSITY_SAMPLING_INTERVAL_MAX_S,
);
let network_density_sampling_interval_time =
Duration::from_secs(network_density_sampling_interval);
debug!(
"Network density sampling interval set to {network_density_sampling_interval:?}"
);
let mut network_density_sampling_interval =
tokio::time::interval(network_density_sampling_interval_time);
let _ = network_density_sampling_interval.tick().await; loop {
let peers_connected = &peers_connected;
tokio::select! {
net_event = network_event_receiver.recv() => {
match net_event {
Some(event) => {
let start = Instant::now();
let event_string = format!("{event:?}");
self.handle_network_event(event, peers_connected);
trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
}
None => {
error!("The `NetworkEvent` channel is closed");
self.events_channel().broadcast(NodeEvent::ChannelClosed);
break;
}
}
}
_ = replication_interval.tick() => {
let start = Instant::now();
debug!("Periodic replication triggered");
let network = self.network().clone();
self.record_metrics(Marker::IntervalReplicationTriggered);
let _handle = spawn(async move {
Self::try_interval_replication(network);
trace!("Periodic replication took {:?}", start.elapsed());
});
}
_ = uptime_metrics_update_interval.tick() => {
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = self.metrics_recorder() {
let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
}
}
_ = irrelevant_records_cleanup_interval.tick() => {
let network = self.network().clone();
let _handle = spawn(async move {
Self::trigger_irrelevant_record_cleanup(network);
});
}
_ = storage_challenge_interval.tick() => {
let start = Instant::now();
debug!("Periodic storage challenge triggered");
let network = self.network().clone();
let _handle = spawn(async move {
Self::storage_challenge(network).await;
trace!("Periodic storage challenge took {:?}", start.elapsed());
});
}
_ = network_density_sampling_interval.tick() => {
continue;
}
}
}
});
}
pub(crate) fn record_metrics(&self, marker: Marker) {
marker.log();
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = self.metrics_recorder() {
metrics_recorder.record(marker)
}
}
fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
let start = Instant::now();
let event_string = format!("{event:?}");
let event_header;
debug!("Handling NetworkEvent {event_string:?}");
match event {
NetworkEvent::PeerAdded(peer_id, connected_peers) => {
event_header = "PeerAdded";
let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
self.events_channel()
.broadcast(NodeEvent::ConnectedToNetwork);
}
self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
let network = self.network().clone();
self.record_metrics(Marker::IntervalReplicationTriggered);
let _handle = spawn(async move {
Self::try_interval_replication(network);
});
}
NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
event_header = "PeerRemoved";
self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
let network = self.network().clone();
self.record_metrics(Marker::IntervalReplicationTriggered);
let _handle = spawn(async move {
Self::try_interval_replication(network);
});
}
NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
event_header = "PeerWithUnsupportedProtocol";
}
NetworkEvent::NewListenAddr(_) => {
event_header = "NewListenAddr";
if !cfg!(feature = "local") {
let network = self.network().clone();
let peers = self.initial_peers().clone();
let _handle = spawn(async move {
for addr in peers {
if let Err(err) = network.dial(addr.clone()).await {
tracing::error!("Failed to dial {addr}: {err:?}");
};
}
});
}
}
NetworkEvent::ResponseReceived { res } => {
event_header = "ResponseReceived";
debug!("NetworkEvent::ResponseReceived {res:?}");
if let Err(err) = self.handle_response(res) {
error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
}
}
NetworkEvent::KeysToFetchForReplication(keys) => {
event_header = "KeysToFetchForReplication";
debug!("Going to fetch {:?} keys for replication", keys.len());
self.record_metrics(Marker::fetching_keys_for_replication(&keys));
if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
error!("Error while trying to fetch replicated data {err:?}");
}
}
NetworkEvent::QueryRequestReceived { query, channel } => {
event_header = "QueryRequestReceived";
let network = self.network().clone();
let payment_address = *self.reward_address();
let _handle = spawn(async move {
let res = Self::handle_query(&network, query, payment_address).await;
debug!("Sending response {res:?}");
network.send_response(res, channel);
});
}
NetworkEvent::UnverifiedRecord(record) => {
event_header = "UnverifiedRecord";
let self_clone = self.clone();
let _handle = spawn(async move {
let key = PrettyPrintRecordKey::from(&record.key).into_owned();
match self_clone.validate_and_store_record(record).await {
Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
Err(err) => {
self_clone.record_metrics(Marker::RecordRejected(&key, &err));
}
}
});
}
NetworkEvent::TerminateNode { reason } => {
event_header = "TerminateNode";
error!("Received termination from swarm_driver due to {reason:?}");
self.events_channel()
.broadcast(NodeEvent::TerminateNode(format!("{reason:?}")));
}
NetworkEvent::FailedToFetchHolders(bad_nodes) => {
event_header = "FailedToFetchHolders";
let network = self.network().clone();
error!("Received notification from replication_fetcher, notifying {bad_nodes:?} failed to fetch replication copies from.");
let _handle = spawn(async move {
for peer_id in bad_nodes {
network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
}
});
}
NetworkEvent::QuoteVerification { quotes } => {
event_header = "QuoteVerification";
let network = self.network().clone();
let _handle = spawn(async move {
quotes_verification(&network, quotes).await;
});
}
}
trace!(
"Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
start.elapsed()
);
}
fn handle_response(&self, response: Response) -> Result<()> {
match response {
Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
warn!("Mishandled replicate response, should be handled earlier");
}
Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
error!("Response to replication shall be handled by called not by common handler, {resp:?}");
}
other => {
warn!("handle_response not implemented for {other:?}");
}
};
Ok(())
}
async fn handle_query(
network: &Network,
query: Query,
payment_address: RewardsAddress,
) -> Response {
let resp: QueryResponse = match query {
Query::GetStoreQuote {
key,
nonce,
difficulty,
} => {
debug!("Got GetStoreQuote request for {key:?} with difficulty {difficulty}");
let record_key = key.to_record_key();
let self_id = network.peer_id();
let maybe_quoting_metrics =
network.get_local_quoting_metrics(record_key.clone()).await;
let storage_proofs = if let Some(nonce) = nonce {
Self::respond_x_closest_record_proof(
network,
key.clone(),
nonce,
difficulty,
false,
)
.await
} else {
vec![]
};
match maybe_quoting_metrics {
Ok((quoting_metrics, is_already_stored)) => {
if is_already_stored {
QueryResponse::GetStoreQuote {
quote: Err(ProtocolError::RecordExists(
PrettyPrintRecordKey::from(&record_key).into_owned(),
)),
peer_address: NetworkAddress::from_peer(self_id),
storage_proofs,
}
} else {
QueryResponse::GetStoreQuote {
quote: Self::create_quote_for_storecost(
network,
&key,
"ing_metrics,
&payment_address,
),
peer_address: NetworkAddress::from_peer(self_id),
storage_proofs,
}
}
}
Err(err) => {
warn!("GetStoreQuote failed for {key:?}: {err}");
QueryResponse::GetStoreQuote {
quote: Err(ProtocolError::GetStoreQuoteFailed),
peer_address: NetworkAddress::from_peer(self_id),
storage_proofs,
}
}
}
}
Query::GetRegisterRecord { requester, key } => {
debug!("Got GetRegisterRecord from {requester:?} regarding {key:?} ");
let our_address = NetworkAddress::from_peer(network.peer_id());
let mut result = Err(ProtocolError::RegisterRecordNotFound {
holder: Box::new(our_address.clone()),
key: Box::new(key.clone()),
});
let record_key = key.as_record_key();
if let Some(record_key) = record_key {
if let Ok(Some(record)) = network.get_local_record(&record_key).await {
result = Ok((our_address, Bytes::from(record.value)));
}
}
QueryResponse::GetRegisterRecord(result)
}
Query::GetReplicatedRecord { requester, key } => {
debug!("Got GetReplicatedRecord from {requester:?} regarding {key:?}");
let our_address = NetworkAddress::from_peer(network.peer_id());
let mut result = Err(ProtocolError::ReplicatedRecordNotFound {
holder: Box::new(our_address.clone()),
key: Box::new(key.clone()),
});
let record_key = key.as_record_key();
if let Some(record_key) = record_key {
if let Ok(Some(record)) = network.get_local_record(&record_key).await {
result = Ok((our_address, Bytes::from(record.value)));
}
}
QueryResponse::GetReplicatedRecord(result)
}
Query::GetChunkExistenceProof {
key,
nonce,
difficulty,
} => {
debug!(
"Got GetChunkExistenceProof targeting chunk {key:?} with {difficulty} answers."
);
QueryResponse::GetChunkExistenceProof(
Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true)
.await,
)
}
Query::CheckNodeInProblem(target_address) => {
debug!("Got CheckNodeInProblem for peer {target_address:?}");
let is_in_trouble =
if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
result
} else {
debug!("Could not get status of {target_address:?}.");
false
};
QueryResponse::CheckNodeInProblem {
reporter_address: NetworkAddress::from_peer(network.peer_id()),
target_address,
is_in_trouble,
}
}
Query::GetClosestPeers {
key,
num_of_peers,
range,
sign_result,
} => {
debug!(
"Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
);
Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
.await
}
};
Response::Query(resp)
}
async fn respond_get_closest_peers(
network: &Network,
target: NetworkAddress,
num_of_peers: Option<usize>,
range: Option<[u8; 32]>,
sign_result: bool,
) -> QueryResponse {
let local_peers = network.get_local_peers_with_multiaddr().await;
let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
} else {
vec![]
};
let signature = if sign_result {
let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
if let Ok(sig) = network.sign(&bytes) {
Some(sig)
} else {
None
}
} else {
None
};
QueryResponse::GetClosestPeers {
target,
peers,
signature,
}
}
fn calculate_get_closest_peers(
peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
target: NetworkAddress,
num_of_peers: Option<usize>,
range: Option<[u8; 32]>,
) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
match (num_of_peers, range) {
(_, Some(value)) => {
let distance = U256::from_be_bytes(value);
peer_addrs
.iter()
.filter_map(|(peer_id, multi_addrs)| {
let addr = NetworkAddress::from_peer(*peer_id);
if convert_distance_to_u256(&target.distance(&addr)) <= distance {
Some((addr, multi_addrs.clone()))
} else {
None
}
})
.collect()
}
(Some(num_of_peers), _) => {
let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
.iter()
.map(|(peer_id, multi_addrs)| {
let addr = NetworkAddress::from_peer(*peer_id);
(addr, multi_addrs.clone())
})
.collect();
result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
result.into_iter().take(num_of_peers).collect()
}
(None, None) => vec![],
}
}
async fn respond_x_closest_record_proof(
network: &Network,
key: NetworkAddress,
nonce: Nonce,
difficulty: usize,
chunk_only: bool,
) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
let start = Instant::now();
let mut results = vec![];
if difficulty == 1 {
let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
let proof = ChunkProof::new(&record.value, nonce);
debug!("Chunk proof for {key:?} is {proof:?}");
result = Ok(proof)
} else {
debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
}
results.push((key.clone(), result));
} else {
let all_local_records = network.get_all_local_record_addresses().await;
if let Ok(all_local_records) = all_local_records {
let mut all_chunk_addrs: Vec<_> = if chunk_only {
all_local_records
.iter()
.filter_map(|(addr, record_type)| {
if *record_type == RecordType::Chunk {
Some(addr.clone())
} else {
None
}
})
.collect()
} else {
all_local_records.keys().cloned().collect()
};
all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
for addr in all_chunk_addrs.iter().take(workload_factor) {
if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
{
let proof = ChunkProof::new(&record.value, nonce);
debug!("Chunk proof for {key:?} is {proof:?}");
results.push((addr.clone(), Ok(proof)));
}
}
}
}
info!(
"Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
results.len(), start.elapsed()
);
results
}
async fn storage_challenge(network: Network) {
let start = Instant::now();
let closest_peers: Vec<PeerId> =
if let Ok(closest_peers) = network.get_closest_k_value_local_peers().await {
closest_peers
.into_iter()
.take(CLOSE_GROUP_SIZE)
.collect_vec()
} else {
error!("Cannot get local neighbours");
return;
};
if closest_peers.len() < CLOSE_GROUP_SIZE {
debug!(
"Not enough neighbours ({}/{}) to carry out storage challenge.",
closest_peers.len(),
CLOSE_GROUP_SIZE
);
return;
}
let mut verify_candidates: Vec<NetworkAddress> =
if let Ok(all_keys) = network.get_all_local_record_addresses().await {
all_keys
.iter()
.filter_map(|(addr, record_type)| {
if RecordType::Chunk == *record_type {
Some(addr.clone())
} else {
None
}
})
.collect()
} else {
error!("Failed to get local record addresses.");
return;
};
let num_of_targets = verify_candidates.len();
if num_of_targets < 50 {
debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
return;
}
let self_addr = NetworkAddress::from_peer(network.peer_id());
verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
let index: usize = OsRng.gen_range(0..num_of_targets / 2);
let target = verify_candidates[index].clone();
let difficulty = CLOSE_GROUP_SIZE;
verify_candidates.sort_by_key(|addr| target.distance(addr));
let expected_targets = verify_candidates.into_iter().take(difficulty);
let nonce: Nonce = thread_rng().gen::<u64>();
let mut expected_proofs = HashMap::new();
for addr in expected_targets {
if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
let expected_proof = ChunkProof::new(&record.value, nonce);
let _ = expected_proofs.insert(addr, expected_proof);
} else {
error!("Local record {addr:?} cann't be loaded from disk.");
}
}
let request = Request::Query(Query::GetChunkExistenceProof {
key: target.clone(),
nonce,
difficulty,
});
let mut tasks = JoinSet::new();
for peer_id in closest_peers {
if peer_id == network.peer_id() {
continue;
}
let network_clone = network.clone();
let request_clone = request.clone();
let expected_proofs_clone = expected_proofs.clone();
let _ = tasks.spawn(async move {
let res =
scoring_peer(network_clone, peer_id, request_clone, expected_proofs_clone)
.await;
(peer_id, res)
});
}
while let Some(res) = tasks.join_next().await {
match res {
Ok((peer_id, score)) => {
if score < MIN_ACCEPTABLE_HEALTHY_SCORE {
info!("Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}.");
network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
}
}
Err(e) => {
info!("StorageChallenge task completed with error {e:?}");
}
}
}
info!(
"Completed node StorageChallenge against neighbours in {:?}!",
start.elapsed()
);
}
#[allow(dead_code)]
async fn network_density_sampling(network: Network) {
for _ in 0..10 {
let target = NetworkAddress::from_peer(PeerId::random());
let peers = network.node_get_closest_peers(&target).await;
if let Ok(peers) = peers {
if peers.len() >= CLOSE_GROUP_SIZE {
let distance =
target.distance(&NetworkAddress::from_peer(peers[CLOSE_GROUP_SIZE - 1]));
network.add_network_density_sample(distance);
}
}
sleep(std::time::Duration::from_secs(10)).await;
}
}
}
async fn scoring_peer(
network: Network,
peer_id: PeerId,
request: Request,
expected_proofs: HashMap<NetworkAddress, ChunkProof>,
) -> usize {
let start = Instant::now();
let responses = network
.send_and_get_responses(&[peer_id], &request, true)
.await;
if let Some(Ok(Response::Query(QueryResponse::GetChunkExistenceProof(answers)))) =
responses.get(&peer_id)
{
if answers.is_empty() {
info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
return 0;
}
let elapsed = start.elapsed();
let mut received_proofs = vec![];
for (addr, proof) in answers {
if let Ok(proof) = proof {
received_proofs.push((addr.clone(), proof.clone()));
}
}
let score = mark_peer(elapsed, received_proofs, &expected_proofs);
info!(
"Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
answers.len()
);
score
} else {
info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
0
}
}
fn mark_peer(
duration: Duration,
answers: Vec<(NetworkAddress, ChunkProof)>,
expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
) -> usize {
let duration_score = duration_score_scheme(duration);
let challenge_score = challenge_score_scheme(answers, expected_proofs);
duration_score * challenge_score
}
fn duration_score_scheme(duration: Duration) -> usize {
let in_ms = if let Some(value) = duration.as_millis().to_usize() {
value
} else {
info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
1000
};
let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
HIGHEST_SCORE - step
}
fn challenge_score_scheme(
answers: Vec<(NetworkAddress, ChunkProof)>,
expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
) -> usize {
let mut correct_answers = 0;
for (addr, chunk_proof) in answers {
if let Some(expected_proof) = expected_proofs.get(&addr) {
if expected_proof.verify(&chunk_proof) {
correct_answers += 1;
} else {
info!("Spot a false answer to the challenge regarding {addr:?}");
return 0;
}
}
}
std::cmp::min(
HIGHEST_SCORE,
HIGHEST_SCORE * correct_answers / expected_proofs.len(),
)
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn test_no_local_peers() {
let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
let target = NetworkAddress::from_peer(PeerId::random());
let num_of_peers = Some(5);
let range = None;
let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
assert_eq!(result, vec![]);
}
#[test]
fn test_fewer_local_peers_than_num_of_peers() {
let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
(
PeerId::random(),
vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
),
(
PeerId::random(),
vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
),
(
PeerId::random(),
vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
),
];
let target = NetworkAddress::from_peer(PeerId::random());
let num_of_peers = Some(2);
let range = None;
let result = Node::calculate_get_closest_peers(
local_peers.clone(),
target.clone(),
num_of_peers,
range,
);
let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
.iter()
.map(|(peer_id, multi_addrs)| {
let addr = NetworkAddress::from_peer(*peer_id);
(addr, multi_addrs.clone())
})
.collect();
expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
assert_eq!(expected_result, result);
}
#[test]
fn test_with_range_and_num_of_peers() {
let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
(
PeerId::random(),
vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
),
(
PeerId::random(),
vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
),
(
PeerId::random(),
vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
),
];
let target = NetworkAddress::from_peer(PeerId::random());
let num_of_peers = Some(0);
let range_value = [128; 32];
let range = Some(range_value);
let result = Node::calculate_get_closest_peers(
local_peers.clone(),
target.clone(),
num_of_peers,
range,
);
let distance = U256::from_be_bytes(range_value);
let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
.into_iter()
.filter_map(|(peer_id, multi_addrs)| {
let addr = NetworkAddress::from_peer(peer_id);
if convert_distance_to_u256(&target.distance(&addr)) <= distance {
Some((addr, multi_addrs.clone()))
} else {
None
}
})
.collect();
assert_eq!(expected_result, result);
}
}