1use super::{
10 Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkError, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::bootstrap::Bootstrap;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_protocol::{
22 CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
23 error::Error as ProtocolError,
24 messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
25 storage::ValidationType,
26};
27use bytes::Bytes;
28use itertools::Itertools;
29use libp2p::{
30 Multiaddr, PeerId,
31 identity::Keypair,
32 kad::{Record, U256},
33 request_response::OutboundFailure,
34};
35use num_traits::cast::ToPrimitive;
36use rand::{
37 Rng, SeedableRng,
38 rngs::{OsRng, StdRng},
39 thread_rng,
40};
41use std::{
42 collections::HashMap,
43 net::SocketAddr,
44 path::PathBuf,
45 sync::{
46 Arc,
47 atomic::{AtomicUsize, Ordering},
48 },
49 time::{Duration, Instant},
50};
51use tokio::sync::watch;
52use tokio::{
53 sync::mpsc::Receiver,
54 task::{JoinSet, spawn},
55};
56
57pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
60
61const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
64
65const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
67
68const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
71
72const HIGHEST_SCORE: usize = 100;
74
75const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
78
79const TIME_STEP: usize = 20;
81
82pub struct NodeBuilder {
84 addr: SocketAddr,
85 bootstrap: Bootstrap,
86 evm_address: RewardsAddress,
87 evm_network: EvmNetwork,
88 identity_keypair: Keypair,
89 local: bool,
90 #[cfg(feature = "open-metrics")]
91 metrics_server_port: Option<u16>,
93 no_upnp: bool,
94 relay_client: bool,
95 root_dir: PathBuf,
96}
97
98impl NodeBuilder {
99 pub fn new(
102 identity_keypair: Keypair,
103 bootstrap_flow: Bootstrap,
104 evm_address: RewardsAddress,
105 evm_network: EvmNetwork,
106 addr: SocketAddr,
107 root_dir: PathBuf,
108 ) -> Self {
109 Self {
110 addr,
111 bootstrap: bootstrap_flow,
112 evm_address,
113 evm_network,
114 identity_keypair,
115 local: false,
116 #[cfg(feature = "open-metrics")]
117 metrics_server_port: None,
118 no_upnp: false,
119 relay_client: false,
120 root_dir,
121 }
122 }
123
124 pub fn local(&mut self, local: bool) {
126 self.local = local;
127 }
128
129 #[cfg(feature = "open-metrics")]
130 pub fn metrics_server_port(&mut self, port: Option<u16>) {
132 self.metrics_server_port = port;
133 }
134
135 pub fn relay_client(&mut self, relay_client: bool) {
137 self.relay_client = relay_client;
138 }
139
140 pub fn no_upnp(&mut self, no_upnp: bool) {
142 self.no_upnp = no_upnp;
143 }
144
145 pub fn build_and_run(self) -> Result<RunningNode> {
158 #[cfg(feature = "open-metrics")]
160 let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
161 let mut metrics_registries = MetricsRegistries::default();
163 let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
164
165 (Some(metrics_recorder), metrics_registries)
166 } else {
167 (None, MetricsRegistries::default())
168 };
169
170 let (shutdown_tx, shutdown_rx) = watch::channel(false);
172
173 let network_config = NetworkConfig {
175 keypair: self.identity_keypair,
176 local: self.local,
177 listen_addr: self.addr,
178 root_dir: self.root_dir.clone(),
179 shutdown_rx: shutdown_rx.clone(),
180 bootstrap: self.bootstrap,
181 no_upnp: self.no_upnp,
182 relay_client: self.relay_client,
183 custom_request_timeout: None,
184 #[cfg(feature = "open-metrics")]
185 metrics_registries,
186 #[cfg(feature = "open-metrics")]
187 metrics_server_port: self.metrics_server_port,
188 };
189 let (network, network_event_receiver) = Network::init(network_config)?;
190
191 let node_events_channel = NodeEventsChannel::default();
193 let node = NodeInner {
194 network: network.clone(),
195 events_channel: node_events_channel.clone(),
196 reward_address: self.evm_address,
197 #[cfg(feature = "open-metrics")]
198 metrics_recorder,
199 evm_network: self.evm_network,
200 };
201 let node = Node {
202 inner: Arc::new(node),
203 };
204
205 node.run(network_event_receiver, shutdown_rx);
207 let running_node = RunningNode {
208 shutdown_sender: shutdown_tx,
209 network,
210 node_events_channel,
211 root_dir_path: self.root_dir,
212 rewards_address: self.evm_address,
213 };
214
215 Ok(running_node)
216 }
217}
218
219#[derive(Clone)]
223pub(crate) struct Node {
224 inner: Arc<NodeInner>,
225}
226
227struct NodeInner {
230 events_channel: NodeEventsChannel,
231 network: Network,
232 #[cfg(feature = "open-metrics")]
233 metrics_recorder: Option<NodeMetricsRecorder>,
234 reward_address: RewardsAddress,
235 evm_network: EvmNetwork,
236}
237
238impl Node {
239 pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
241 &self.inner.events_channel
242 }
243
244 pub(crate) fn network(&self) -> &Network {
246 &self.inner.network
247 }
248
249 #[cfg(feature = "open-metrics")]
250 pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
253 self.inner.metrics_recorder.as_ref()
254 }
255
256 pub(crate) fn reward_address(&self) -> &RewardsAddress {
258 &self.inner.reward_address
259 }
260
261 pub(crate) fn evm_network(&self) -> &EvmNetwork {
262 &self.inner.evm_network
263 }
264
265 fn run(
268 self,
269 mut network_event_receiver: Receiver<NetworkEvent>,
270 mut shutdown_rx: watch::Receiver<bool>,
271 ) {
272 let mut rng = StdRng::from_entropy();
273
274 let peers_connected = Arc::new(AtomicUsize::new(0));
275
276 let _node_task = spawn(async move {
277 let replication_interval: u64 = rng.gen_range(
280 PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
281 );
282 let replication_interval_time = Duration::from_secs(replication_interval);
283 debug!("Replication interval set to {replication_interval_time:?}");
284
285 let mut replication_interval = tokio::time::interval(replication_interval_time);
286 let _ = replication_interval.tick().await; let mut uptime_metrics_update_interval =
289 tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
290 let _ = uptime_metrics_update_interval.tick().await; let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
295 UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
296 ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
297 );
298 let irrelevant_records_cleanup_interval_time =
299 Duration::from_secs(irrelevant_records_cleanup_interval);
300 let mut irrelevant_records_cleanup_interval =
301 tokio::time::interval(irrelevant_records_cleanup_interval_time);
302 let _ = irrelevant_records_cleanup_interval.tick().await; let storage_challenge_interval: u64 =
307 rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
308 let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
309 debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
310
311 let mut storage_challenge_interval =
312 tokio::time::interval(storage_challenge_interval_time);
313 let _ = storage_challenge_interval.tick().await; loop {
316 let peers_connected = &peers_connected;
317
318 tokio::select! {
319 result = shutdown_rx.changed() => {
321 if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
322 info!("Shutdown signal received or sender dropped. Exiting network events loop.");
323 break;
324 }
325 },
326 net_event = network_event_receiver.recv() => {
327 match net_event {
328 Some(event) => {
329 let start = Instant::now();
330 let event_string = format!("{event:?}");
331
332 self.handle_network_event(event, peers_connected);
333 trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
334
335 }
336 None => {
337 error!("The `NetworkEvent` channel is closed");
338 self.events_channel().broadcast(NodeEvent::ChannelClosed);
339 break;
340 }
341 }
342 }
343 _ = replication_interval.tick() => {
345 let start = Instant::now();
346 let network = self.network().clone();
347 self.record_metrics(Marker::IntervalReplicationTriggered);
348
349 let _handle = spawn(async move {
350 Self::try_interval_replication(network);
351 trace!("Periodic replication took {:?}", start.elapsed());
352 });
353 }
354 _ = uptime_metrics_update_interval.tick() => {
355 #[cfg(feature = "open-metrics")]
356 if let Some(metrics_recorder) = self.metrics_recorder() {
357 let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
358 }
359 }
360 _ = irrelevant_records_cleanup_interval.tick() => {
361 let network = self.network().clone();
362
363 let _handle = spawn(async move {
364 Self::trigger_irrelevant_record_cleanup(network);
365 });
366 }
367 _ = storage_challenge_interval.tick() => {
369 let start = Instant::now();
370 debug!("Periodic storage challenge triggered");
371 let network = self.network().clone();
372
373 let _handle = spawn(async move {
374 Self::storage_challenge(network).await;
375 trace!("Periodic storage challenge took {:?}", start.elapsed());
376 });
377 }
378 }
379 }
380 });
381 }
382
383 pub(crate) fn record_metrics(&self, marker: Marker) {
386 marker.log();
387 #[cfg(feature = "open-metrics")]
388 if let Some(metrics_recorder) = self.metrics_recorder() {
389 metrics_recorder.record(marker)
390 }
391 }
392
393 fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
398 let start = Instant::now();
399 let event_string = format!("{event:?}");
400 let event_header;
401
402 if let NetworkEvent::QueryRequestReceived {
404 query: Query::GetVersion { .. },
405 ..
406 } = event
407 {
408 trace!("Handling NetworkEvent {event_string}");
409 } else {
410 debug!("Handling NetworkEvent {event_string}");
411 }
412
413 match event {
414 NetworkEvent::PeerAdded(peer_id, connected_peers) => {
415 event_header = "PeerAdded";
416 let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
418 if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
419 self.events_channel()
420 .broadcast(NodeEvent::ConnectedToNetwork);
421 }
422
423 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
424 self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
425
426 let network = self.network().clone();
428 let _handle = spawn(async move {
429 Self::try_query_peer_version(network, peer_id, Default::default()).await;
430 });
431
432 let network = self.network().clone();
434 self.record_metrics(Marker::IntervalReplicationTriggered);
435 let _handle = spawn(async move {
436 Self::try_interval_replication(network);
437 });
438 }
439 NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
440 event_header = "PeerRemoved";
441 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
442 self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
443
444 let self_id = self.network().peer_id();
445 let distance =
446 NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
447 info!(
448 "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
449 distance.ilog2()
450 );
451
452 let network = self.network().clone();
453 self.record_metrics(Marker::IntervalReplicationTriggered);
454 let _handle = spawn(async move {
455 Self::try_interval_replication(network);
456 });
457 }
458 NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
459 event_header = "PeerWithUnsupportedProtocol";
460 }
461 NetworkEvent::NewListenAddr(_) => {
462 event_header = "NewListenAddr";
463 }
464 NetworkEvent::ResponseReceived { res } => {
465 event_header = "ResponseReceived";
466 if let Err(err) = self.handle_response(res) {
467 error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
468 }
469 }
470 NetworkEvent::KeysToFetchForReplication(keys) => {
471 event_header = "KeysToFetchForReplication";
472 self.record_metrics(Marker::fetching_keys_for_replication(&keys));
473
474 if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
475 error!("Error while trying to fetch replicated data {err:?}");
476 }
477 }
478 NetworkEvent::QueryRequestReceived { query, channel } => {
479 event_header = "QueryRequestReceived";
480 let node = self.clone();
481 let payment_address = *self.reward_address();
482
483 let _handle = spawn(async move {
484 let network = node.network().clone();
485 let res = Self::handle_query(node, query, payment_address).await;
486
487 if let Response::Query(QueryResponse::GetVersion { .. }) = res {
489 trace!("Sending response {res:?}");
490 } else {
491 debug!("Sending response {res:?}");
492 }
493
494 network.send_response(res, channel);
495 });
496 }
497 NetworkEvent::UnverifiedRecord(record) => {
498 event_header = "UnverifiedRecord";
499 let self_clone = self.clone();
501 let _handle = spawn(async move {
502 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
503 match self_clone.validate_and_store_record(record).await {
504 Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
505 Err(err) => {
506 self_clone.record_metrics(Marker::RecordRejected(&key, &err));
507 }
508 }
509 });
510 }
511 NetworkEvent::TerminateNode { reason } => {
512 event_header = "TerminateNode";
513 error!("Received termination from swarm_driver due to {reason:?}");
514 self.events_channel()
515 .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
516 }
517 NetworkEvent::FailedToFetchHolders(bad_nodes) => {
518 event_header = "FailedToFetchHolders";
519 let network = self.network().clone();
520 let pretty_log: Vec<_> = bad_nodes
521 .iter()
522 .map(|(peer_id, record_key)| {
523 let pretty_key = PrettyPrintRecordKey::from(record_key);
524 (peer_id, pretty_key)
525 })
526 .collect();
527 debug!(
531 "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
532 );
533 let _handle = spawn(async move {
534 for (peer_id, record_key) in bad_nodes {
535 if let Ok(false) = network.is_record_key_present_locally(&record_key).await
538 {
539 error!(
540 "From peer {peer_id:?}, failed to fetch record {:?}",
541 PrettyPrintRecordKey::from(&record_key)
542 );
543 network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
544 }
545 }
546 });
547 }
548 NetworkEvent::QuoteVerification { quotes } => {
549 event_header = "QuoteVerification";
550 let network = self.network().clone();
551
552 let _handle = spawn(async move {
553 quotes_verification(&network, quotes).await;
554 });
555 }
556 NetworkEvent::FreshReplicateToFetch { holder, keys } => {
557 event_header = "FreshReplicateToFetch";
558 self.fresh_replicate_to_fetch(holder, keys);
559 }
560 NetworkEvent::PeersForVersionQuery(peers) => {
561 event_header = "PeersForVersionQuery";
562 let network = self.network().clone();
563 let _handle = spawn(async move {
564 Self::query_peers_version(network, peers).await;
565 });
566 }
567 }
568
569 trace!(
570 "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
571 start.elapsed()
572 );
573 }
574
575 fn handle_response(&self, response: Response) -> Result<()> {
577 match response {
578 Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
579 warn!("Mishandled replicate response, should be handled earlier");
581 }
582 Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
583 error!(
584 "Response to replication shall be handled by called not by common handler, {resp:?}"
585 );
586 }
587 Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
588 }
590 other => {
591 warn!("handle_response not implemented for {other:?}");
592 }
593 };
594
595 Ok(())
596 }
597
598 async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
599 let network = node.network();
600 let resp: QueryResponse = match query {
601 Query::GetStoreQuote {
602 key,
603 data_type,
604 data_size,
605 nonce,
606 difficulty,
607 } => {
608 let record_key = key.to_record_key();
609 let self_id = network.peer_id();
610
611 let maybe_quoting_metrics = network
612 .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
613 .await;
614
615 let storage_proofs = if let Some(nonce) = nonce {
616 Self::respond_x_closest_record_proof(
617 network,
618 key.clone(),
619 nonce,
620 difficulty,
621 false,
622 )
623 .await
624 } else {
625 vec![]
626 };
627
628 match maybe_quoting_metrics {
629 Ok((quoting_metrics, is_already_stored)) => {
630 if is_already_stored {
631 QueryResponse::GetStoreQuote {
632 quote: Err(ProtocolError::RecordExists(
633 PrettyPrintRecordKey::from(&record_key).into_owned(),
634 )),
635 peer_address: NetworkAddress::from(self_id),
636 storage_proofs,
637 }
638 } else {
639 QueryResponse::GetStoreQuote {
640 quote: Self::create_quote_for_storecost(
641 network,
642 &key,
643 "ing_metrics,
644 &payment_address,
645 ),
646 peer_address: NetworkAddress::from(self_id),
647 storage_proofs,
648 }
649 }
650 }
651 Err(err) => {
652 warn!("GetStoreQuote failed for {key:?}: {err}");
653 QueryResponse::GetStoreQuote {
654 quote: Err(ProtocolError::GetStoreQuoteFailed),
655 peer_address: NetworkAddress::from(self_id),
656 storage_proofs,
657 }
658 }
659 }
660 }
661 Query::GetReplicatedRecord { requester: _, key } => {
662 let our_address = NetworkAddress::from(network.peer_id());
663 let mut result = Err(ProtocolError::ReplicatedRecordNotFound {
664 holder: Box::new(our_address.clone()),
665 key: Box::new(key.clone()),
666 });
667 let record_key = key.to_record_key();
668
669 if let Ok(Some(record)) = network.get_local_record(&record_key).await {
670 result = Ok((our_address, Bytes::from(record.value)));
671 }
672
673 QueryResponse::GetReplicatedRecord(result)
674 }
675 Query::GetChunkExistenceProof {
676 key,
677 nonce,
678 difficulty,
679 } => QueryResponse::GetChunkExistenceProof(
680 Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
681 ),
682 Query::CheckNodeInProblem(target_address) => {
683 debug!("Got CheckNodeInProblem for peer {target_address:?}");
684
685 let is_in_trouble =
686 if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
687 result
688 } else {
689 debug!("Could not get status of {target_address:?}.");
690 false
691 };
692
693 QueryResponse::CheckNodeInProblem {
694 reporter_address: NetworkAddress::from(network.peer_id()),
695 target_address,
696 is_in_trouble,
697 }
698 }
699 Query::GetClosestPeers {
700 key,
701 num_of_peers,
702 range,
703 sign_result,
704 } => {
705 debug!(
706 "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
707 );
708 Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
709 .await
710 }
711 Query::GetVersion(_) => QueryResponse::GetVersion {
712 peer: NetworkAddress::from(network.peer_id()),
713 version: ant_build_info::package_version(),
714 },
715 Query::PutRecord {
716 holder,
717 address,
718 serialized_record,
719 } => {
720 let record = Record {
721 key: address.to_record_key(),
722 value: serialized_record,
723 publisher: None,
724 expires: None,
725 };
726
727 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
728 let result = match node.validate_and_store_record(record).await {
729 Ok(()) => Ok(()),
730 Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
731 node.record_metrics(Marker::RecordRejected(
732 &key,
733 &PutValidationError::OutdatedRecordCounter { counter, expected },
734 ));
735 Err(ProtocolError::OutdatedRecordCounter { counter, expected })
736 }
737 Err(err) => {
738 node.record_metrics(Marker::RecordRejected(&key, &err));
739 Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
740 }
741 };
742 QueryResponse::PutRecord {
743 result,
744 peer_address: holder,
745 record_addr: address,
746 }
747 }
748 };
749 Response::Query(resp)
750 }
751
752 async fn respond_get_closest_peers(
753 network: &Network,
754 target: NetworkAddress,
755 num_of_peers: Option<usize>,
756 range: Option<[u8; 32]>,
757 sign_result: bool,
758 ) -> QueryResponse {
759 let local_peers = network.get_local_peers_with_multiaddr().await;
760 let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
761 Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
762 } else {
763 vec![]
764 };
765
766 let signature = if sign_result {
767 let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
768 bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
769 network.sign(&bytes).ok()
770 } else {
771 None
772 };
773
774 QueryResponse::GetClosestPeers {
775 target,
776 peers,
777 signature,
778 }
779 }
780
781 fn calculate_get_closest_peers(
782 peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
783 target: NetworkAddress,
784 num_of_peers: Option<usize>,
785 range: Option<[u8; 32]>,
786 ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
787 match (num_of_peers, range) {
788 (_, Some(value)) => {
789 let distance = U256::from_big_endian(&value);
790 peer_addrs
791 .iter()
792 .filter_map(|(peer_id, multi_addrs)| {
793 let addr = NetworkAddress::from(*peer_id);
794 if target.distance(&addr).0 <= distance {
795 Some((addr, multi_addrs.clone()))
796 } else {
797 None
798 }
799 })
800 .collect()
801 }
802 (Some(num_of_peers), _) => {
803 let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
804 .iter()
805 .map(|(peer_id, multi_addrs)| {
806 let addr = NetworkAddress::from(*peer_id);
807 (addr, multi_addrs.clone())
808 })
809 .collect();
810 result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
811 result.into_iter().take(num_of_peers).collect()
812 }
813 (None, None) => vec![],
814 }
815 }
816
817 async fn respond_x_closest_record_proof(
820 network: &Network,
821 key: NetworkAddress,
822 nonce: Nonce,
823 difficulty: usize,
824 chunk_only: bool,
825 ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
826 let start = Instant::now();
827 let mut results = vec![];
828 if difficulty == 1 {
829 let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
831 if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
832 let proof = ChunkProof::new(&record.value, nonce);
833 debug!("Chunk proof for {key:?} is {proof:?}");
834 result = Ok(proof)
835 } else {
836 debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
837 }
838
839 results.push((key.clone(), result));
840 } else {
841 let all_local_records = network.get_all_local_record_addresses().await;
842
843 if let Ok(all_local_records) = all_local_records {
844 let mut all_chunk_addrs: Vec<_> = if chunk_only {
845 all_local_records
846 .iter()
847 .filter_map(|(addr, record_type)| {
848 if *record_type == ValidationType::Chunk {
849 Some(addr.clone())
850 } else {
851 None
852 }
853 })
854 .collect()
855 } else {
856 all_local_records.keys().cloned().collect()
857 };
858
859 all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
861
862 let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
864
865 for addr in all_chunk_addrs.iter().take(workload_factor) {
866 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
867 {
868 let proof = ChunkProof::new(&record.value, nonce);
869 debug!("Chunk proof for {key:?} is {proof:?}");
870 results.push((addr.clone(), Ok(proof)));
871 }
872 }
873 }
874
875 info!(
876 "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
877 results.len(),
878 start.elapsed()
879 );
880 }
881
882 results
883 }
884
885 async fn storage_challenge(network: Network) {
889 let start = Instant::now();
890 let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
891 network.get_k_closest_local_peers_to_the_target(None).await
892 {
893 closest_peers
894 .into_iter()
895 .take(CLOSE_GROUP_SIZE)
896 .collect_vec()
897 } else {
898 error!("Cannot get local neighbours");
899 return;
900 };
901 if closest_peers.len() < CLOSE_GROUP_SIZE {
902 debug!(
903 "Not enough neighbours ({}/{}) to carry out storage challenge.",
904 closest_peers.len(),
905 CLOSE_GROUP_SIZE
906 );
907 return;
908 }
909
910 let mut verify_candidates: Vec<NetworkAddress> =
911 if let Ok(all_keys) = network.get_all_local_record_addresses().await {
912 all_keys
913 .iter()
914 .filter_map(|(addr, record_type)| {
915 if ValidationType::Chunk == *record_type {
916 Some(addr.clone())
917 } else {
918 None
919 }
920 })
921 .collect()
922 } else {
923 error!("Failed to get local record addresses.");
924 return;
925 };
926 let num_of_targets = verify_candidates.len();
927 if num_of_targets < 50 {
928 debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
929 return;
930 }
931
932 let self_addr = NetworkAddress::from(network.peer_id());
935 verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
936 let index: usize = OsRng.gen_range(0..num_of_targets / 2);
937 let target = verify_candidates[index].clone();
938 let difficulty = CLOSE_GROUP_SIZE;
940 verify_candidates.sort_by_key(|addr| target.distance(addr));
941 let expected_targets = verify_candidates.into_iter().take(difficulty);
942 let nonce: Nonce = thread_rng().r#gen::<u64>();
943 let mut expected_proofs = HashMap::new();
944 for addr in expected_targets {
945 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
946 let expected_proof = ChunkProof::new(&record.value, nonce);
947 let _ = expected_proofs.insert(addr, expected_proof);
948 } else {
949 error!("Local record {addr:?} cann't be loaded from disk.");
950 }
951 }
952 let request = Request::Query(Query::GetChunkExistenceProof {
953 key: target.clone(),
954 nonce,
955 difficulty,
956 });
957
958 let mut tasks = JoinSet::new();
959 for (peer_id, addresses) in closest_peers {
960 if peer_id == network.peer_id() {
961 continue;
962 }
963 let network_clone = network.clone();
964 let request_clone = request.clone();
965 let expected_proofs_clone = expected_proofs.clone();
966 let _ = tasks.spawn(async move {
967 let res = scoring_peer(
968 network_clone,
969 (peer_id, addresses),
970 request_clone,
971 expected_proofs_clone,
972 )
973 .await;
974 (peer_id, res)
975 });
976 }
977
978 let mut peer_scores = vec![];
979 while let Some(res) = tasks.join_next().await {
980 match res {
981 Ok((peer_id, score)) => {
982 let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
983 if !is_healthy {
984 info!(
985 "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
986 );
987 network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
989 }
990 peer_scores.push((peer_id, is_healthy));
991 }
992 Err(e) => {
993 info!("StorageChallenge task completed with error {e:?}");
994 }
995 }
996 }
997 if !peer_scores.is_empty() {
998 network.notify_peer_scores(peer_scores);
999 }
1000
1001 info!(
1002 "Completed node StorageChallenge against neighbours in {:?}!",
1003 start.elapsed()
1004 );
1005 }
1006
1007 async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1009 for (peer_id, addrs) in peers {
1011 Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1012 }
1013 }
1014
1015 async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1017 let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1018 let version = match network.send_request(request, peer, addrs).await {
1020 Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1021 trace!("Fetched peer version {peer:?} as {version:?}");
1022 version
1023 }
1024 Ok(other) => {
1025 info!("Not a fetched peer version {peer:?}, {other:?}");
1026 "none".to_string()
1027 }
1028 Err(err) => {
1029 info!("Failed to fetch peer version {peer:?} with error {err:?}");
1030 if let NetworkError::OutboundError(OutboundFailure::DialFailure) = err {
1034 network.remove_peer(peer);
1035 return;
1036 }
1037 "old".to_string()
1038 }
1039 };
1040 network.notify_node_version(peer, version);
1041 }
1042}
1043
1044async fn scoring_peer(
1045 network: Network,
1046 peer: (PeerId, Addresses),
1047 request: Request,
1048 expected_proofs: HashMap<NetworkAddress, ChunkProof>,
1049) -> usize {
1050 let peer_id = peer.0;
1051 let start = Instant::now();
1052 let responses = network
1053 .send_and_get_responses(&[peer], &request, true)
1054 .await;
1055
1056 if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
1057 responses.get(&peer_id)
1058 {
1059 if answers.is_empty() {
1060 info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
1061 return 0;
1062 }
1063 let elapsed = start.elapsed();
1064
1065 let mut received_proofs = vec![];
1066 for (addr, proof) in answers {
1067 if let Ok(proof) = proof {
1068 received_proofs.push((addr.clone(), proof.clone()));
1069 }
1070 }
1071
1072 let score = mark_peer(elapsed, received_proofs, &expected_proofs);
1073 info!(
1074 "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
1075 answers.len()
1076 );
1077 score
1078 } else {
1079 info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
1080 0
1081 }
1082}
1083
1084fn mark_peer(
1090 duration: Duration,
1091 answers: Vec<(NetworkAddress, ChunkProof)>,
1092 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1093) -> usize {
1094 let duration_score = duration_score_scheme(duration);
1095 let challenge_score = challenge_score_scheme(answers, expected_proofs);
1096
1097 duration_score * challenge_score
1098}
1099
1100fn duration_score_scheme(duration: Duration) -> usize {
1102 let in_ms = if let Some(value) = duration.as_millis().to_usize() {
1104 value
1105 } else {
1106 info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
1107 1000
1108 };
1109
1110 let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
1111 HIGHEST_SCORE - step
1112}
1113
1114fn challenge_score_scheme(
1116 answers: Vec<(NetworkAddress, ChunkProof)>,
1117 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1118) -> usize {
1119 let mut correct_answers = 0;
1120 for (addr, chunk_proof) in answers {
1121 if let Some(expected_proof) = expected_proofs.get(&addr) {
1122 if expected_proof.verify(&chunk_proof) {
1123 correct_answers += 1;
1124 } else {
1125 info!("Spot a false answer to the challenge regarding {addr:?}");
1126 return 0;
1128 }
1129 }
1130 }
1131 std::cmp::min(
1138 HIGHEST_SCORE,
1139 HIGHEST_SCORE * correct_answers / expected_proofs.len(),
1140 )
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145 use super::*;
1146 use std::str::FromStr;
1147
1148 #[test]
1149 fn test_no_local_peers() {
1150 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
1151 let target = NetworkAddress::from(PeerId::random());
1152 let num_of_peers = Some(5);
1153 let range = None;
1154 let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
1155
1156 assert_eq!(result, vec![]);
1157 }
1158
1159 #[test]
1160 fn test_fewer_local_peers_than_num_of_peers() {
1161 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1162 (
1163 PeerId::random(),
1164 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1165 ),
1166 (
1167 PeerId::random(),
1168 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1169 ),
1170 (
1171 PeerId::random(),
1172 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1173 ),
1174 ];
1175 let target = NetworkAddress::from(PeerId::random());
1176 let num_of_peers = Some(2);
1177 let range = None;
1178 let result = Node::calculate_get_closest_peers(
1179 local_peers.clone(),
1180 target.clone(),
1181 num_of_peers,
1182 range,
1183 );
1184
1185 let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1187 .iter()
1188 .map(|(peer_id, multi_addrs)| {
1189 let addr = NetworkAddress::from(*peer_id);
1190 (addr, multi_addrs.clone())
1191 })
1192 .collect();
1193 expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
1194 let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
1195
1196 assert_eq!(expected_result, result);
1197 }
1198
1199 #[test]
1200 fn test_with_range_and_num_of_peers() {
1201 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1202 (
1203 PeerId::random(),
1204 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1205 ),
1206 (
1207 PeerId::random(),
1208 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1209 ),
1210 (
1211 PeerId::random(),
1212 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1213 ),
1214 ];
1215 let target = NetworkAddress::from(PeerId::random());
1216 let num_of_peers = Some(0);
1217 let range_value = [128; 32];
1218 let range = Some(range_value);
1219 let result = Node::calculate_get_closest_peers(
1220 local_peers.clone(),
1221 target.clone(),
1222 num_of_peers,
1223 range,
1224 );
1225
1226 let distance = U256::from_big_endian(&range_value);
1228 let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1229 .into_iter()
1230 .filter_map(|(peer_id, multi_addrs)| {
1231 let addr = NetworkAddress::from(peer_id);
1232 if target.distance(&addr).0 <= distance {
1233 Some((addr, multi_addrs.clone()))
1234 } else {
1235 None
1236 }
1237 })
1238 .collect();
1239
1240 assert_eq!(expected_result, result);
1241 }
1242}