1use super::{
10 Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkError, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::BootstrapCacheStore;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_protocol::{
22 CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
23 error::Error as ProtocolError,
24 messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
25 storage::ValidationType,
26};
27use bytes::Bytes;
28use itertools::Itertools;
29use libp2p::{
30 Multiaddr, PeerId,
31 identity::Keypair,
32 kad::{Record, U256},
33 request_response::OutboundFailure,
34};
35use num_traits::cast::ToPrimitive;
36use rand::{
37 Rng, SeedableRng,
38 rngs::{OsRng, StdRng},
39 thread_rng,
40};
41use std::{
42 collections::HashMap,
43 net::SocketAddr,
44 path::PathBuf,
45 sync::{
46 Arc,
47 atomic::{AtomicUsize, Ordering},
48 },
49 time::{Duration, Instant},
50};
51use tokio::sync::watch;
52use tokio::{
53 sync::mpsc::Receiver,
54 task::{JoinSet, spawn},
55};
56
57pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
60
61const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
64
65const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
67
68const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
71
72const HIGHEST_SCORE: usize = 100;
74
75const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
78
79const TIME_STEP: usize = 20;
81
82pub struct NodeBuilder {
84 addr: SocketAddr,
85 bootstrap_cache: Option<BootstrapCacheStore>,
86 evm_address: RewardsAddress,
87 evm_network: EvmNetwork,
88 initial_peers: Vec<Multiaddr>,
89 identity_keypair: Keypair,
90 local: bool,
91 #[cfg(feature = "open-metrics")]
92 metrics_server_port: Option<u16>,
94 no_upnp: bool,
95 relay_client: bool,
96 root_dir: PathBuf,
97}
98
99impl NodeBuilder {
100 pub fn new(
103 identity_keypair: Keypair,
104 initial_peers: Vec<Multiaddr>,
105 evm_address: RewardsAddress,
106 evm_network: EvmNetwork,
107 addr: SocketAddr,
108 root_dir: PathBuf,
109 ) -> Self {
110 Self {
111 addr,
112 bootstrap_cache: None,
113 evm_address,
114 evm_network,
115 initial_peers,
116 identity_keypair,
117 local: false,
118 #[cfg(feature = "open-metrics")]
119 metrics_server_port: None,
120 no_upnp: false,
121 relay_client: false,
122 root_dir,
123 }
124 }
125
126 pub fn local(&mut self, local: bool) {
128 self.local = local;
129 }
130
131 #[cfg(feature = "open-metrics")]
132 pub fn metrics_server_port(&mut self, port: Option<u16>) {
134 self.metrics_server_port = port;
135 }
136
137 pub fn bootstrap_cache(&mut self, cache: BootstrapCacheStore) {
139 self.bootstrap_cache = Some(cache);
140 }
141
142 pub fn relay_client(&mut self, relay_client: bool) {
144 self.relay_client = relay_client;
145 }
146
147 pub fn no_upnp(&mut self, no_upnp: bool) {
149 self.no_upnp = no_upnp;
150 }
151
152 pub fn build_and_run(self) -> Result<RunningNode> {
165 #[cfg(feature = "open-metrics")]
167 let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
168 let mut metrics_registries = MetricsRegistries::default();
170 let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
171
172 (Some(metrics_recorder), metrics_registries)
173 } else {
174 (None, MetricsRegistries::default())
175 };
176
177 let (shutdown_tx, shutdown_rx) = watch::channel(false);
179
180 let network_config = NetworkConfig {
182 keypair: self.identity_keypair,
183 local: self.local,
184 initial_contacts: self.initial_peers,
185 listen_addr: self.addr,
186 root_dir: self.root_dir.clone(),
187 shutdown_rx: shutdown_rx.clone(),
188 bootstrap_cache: self.bootstrap_cache,
189 no_upnp: self.no_upnp,
190 relay_client: self.relay_client,
191 custom_request_timeout: None,
192 #[cfg(feature = "open-metrics")]
193 metrics_registries,
194 #[cfg(feature = "open-metrics")]
195 metrics_server_port: self.metrics_server_port,
196 };
197 let (network, network_event_receiver) = Network::init(network_config)?;
198
199 let node_events_channel = NodeEventsChannel::default();
201 let node = NodeInner {
202 network: network.clone(),
203 events_channel: node_events_channel.clone(),
204 reward_address: self.evm_address,
205 #[cfg(feature = "open-metrics")]
206 metrics_recorder,
207 evm_network: self.evm_network,
208 };
209 let node = Node {
210 inner: Arc::new(node),
211 };
212
213 node.run(network_event_receiver, shutdown_rx);
215 let running_node = RunningNode {
216 shutdown_sender: shutdown_tx,
217 network,
218 node_events_channel,
219 root_dir_path: self.root_dir,
220 rewards_address: self.evm_address,
221 };
222
223 Ok(running_node)
224 }
225}
226
227#[derive(Clone)]
231pub(crate) struct Node {
232 inner: Arc<NodeInner>,
233}
234
235struct NodeInner {
238 events_channel: NodeEventsChannel,
239 network: Network,
240 #[cfg(feature = "open-metrics")]
241 metrics_recorder: Option<NodeMetricsRecorder>,
242 reward_address: RewardsAddress,
243 evm_network: EvmNetwork,
244}
245
246impl Node {
247 pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
249 &self.inner.events_channel
250 }
251
252 pub(crate) fn network(&self) -> &Network {
254 &self.inner.network
255 }
256
257 #[cfg(feature = "open-metrics")]
258 pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
261 self.inner.metrics_recorder.as_ref()
262 }
263
264 pub(crate) fn reward_address(&self) -> &RewardsAddress {
266 &self.inner.reward_address
267 }
268
269 pub(crate) fn evm_network(&self) -> &EvmNetwork {
270 &self.inner.evm_network
271 }
272
273 fn run(
276 self,
277 mut network_event_receiver: Receiver<NetworkEvent>,
278 mut shutdown_rx: watch::Receiver<bool>,
279 ) {
280 let mut rng = StdRng::from_entropy();
281
282 let peers_connected = Arc::new(AtomicUsize::new(0));
283
284 let _node_task = spawn(async move {
285 let replication_interval: u64 = rng.gen_range(
288 PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
289 );
290 let replication_interval_time = Duration::from_secs(replication_interval);
291 debug!("Replication interval set to {replication_interval_time:?}");
292
293 let mut replication_interval = tokio::time::interval(replication_interval_time);
294 let _ = replication_interval.tick().await; let mut uptime_metrics_update_interval =
297 tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
298 let _ = uptime_metrics_update_interval.tick().await; let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
303 UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
304 ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
305 );
306 let irrelevant_records_cleanup_interval_time =
307 Duration::from_secs(irrelevant_records_cleanup_interval);
308 let mut irrelevant_records_cleanup_interval =
309 tokio::time::interval(irrelevant_records_cleanup_interval_time);
310 let _ = irrelevant_records_cleanup_interval.tick().await; let storage_challenge_interval: u64 =
315 rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
316 let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
317 debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
318
319 let mut storage_challenge_interval =
320 tokio::time::interval(storage_challenge_interval_time);
321 let _ = storage_challenge_interval.tick().await; loop {
324 let peers_connected = &peers_connected;
325
326 tokio::select! {
327 result = shutdown_rx.changed() => {
329 if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
330 info!("Shutdown signal received or sender dropped. Exiting network events loop.");
331 break;
332 }
333 },
334 net_event = network_event_receiver.recv() => {
335 match net_event {
336 Some(event) => {
337 let start = Instant::now();
338 let event_string = format!("{event:?}");
339
340 self.handle_network_event(event, peers_connected);
341 trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
342
343 }
344 None => {
345 error!("The `NetworkEvent` channel is closed");
346 self.events_channel().broadcast(NodeEvent::ChannelClosed);
347 break;
348 }
349 }
350 }
351 _ = replication_interval.tick() => {
353 let start = Instant::now();
354 let network = self.network().clone();
355 self.record_metrics(Marker::IntervalReplicationTriggered);
356
357 let _handle = spawn(async move {
358 Self::try_interval_replication(network);
359 trace!("Periodic replication took {:?}", start.elapsed());
360 });
361 }
362 _ = uptime_metrics_update_interval.tick() => {
363 #[cfg(feature = "open-metrics")]
364 if let Some(metrics_recorder) = self.metrics_recorder() {
365 let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
366 }
367 }
368 _ = irrelevant_records_cleanup_interval.tick() => {
369 let network = self.network().clone();
370
371 let _handle = spawn(async move {
372 Self::trigger_irrelevant_record_cleanup(network);
373 });
374 }
375 _ = storage_challenge_interval.tick() => {
377 let start = Instant::now();
378 debug!("Periodic storage challenge triggered");
379 let network = self.network().clone();
380
381 let _handle = spawn(async move {
382 Self::storage_challenge(network).await;
383 trace!("Periodic storage challenge took {:?}", start.elapsed());
384 });
385 }
386 }
387 }
388 });
389 }
390
391 pub(crate) fn record_metrics(&self, marker: Marker) {
394 marker.log();
395 #[cfg(feature = "open-metrics")]
396 if let Some(metrics_recorder) = self.metrics_recorder() {
397 metrics_recorder.record(marker)
398 }
399 }
400
401 fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
406 let start = Instant::now();
407 let event_string = format!("{event:?}");
408 let event_header;
409
410 if let NetworkEvent::QueryRequestReceived {
412 query: Query::GetVersion { .. },
413 ..
414 } = event
415 {
416 trace!("Handling NetworkEvent {event_string}");
417 } else {
418 debug!("Handling NetworkEvent {event_string}");
419 }
420
421 match event {
422 NetworkEvent::PeerAdded(peer_id, connected_peers) => {
423 event_header = "PeerAdded";
424 let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
426 if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
427 self.events_channel()
428 .broadcast(NodeEvent::ConnectedToNetwork);
429 }
430
431 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
432 self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
433
434 let network = self.network().clone();
436 let _handle = spawn(async move {
437 Self::try_query_peer_version(network, peer_id, Default::default()).await;
438 });
439
440 let network = self.network().clone();
442 self.record_metrics(Marker::IntervalReplicationTriggered);
443 let _handle = spawn(async move {
444 Self::try_interval_replication(network);
445 });
446 }
447 NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
448 event_header = "PeerRemoved";
449 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
450 self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
451
452 let self_id = self.network().peer_id();
453 let distance =
454 NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
455 info!(
456 "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
457 distance.ilog2()
458 );
459
460 let network = self.network().clone();
461 self.record_metrics(Marker::IntervalReplicationTriggered);
462 let _handle = spawn(async move {
463 Self::try_interval_replication(network);
464 });
465 }
466 NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
467 event_header = "PeerWithUnsupportedProtocol";
468 }
469 NetworkEvent::NewListenAddr(_) => {
470 event_header = "NewListenAddr";
471 }
472 NetworkEvent::ResponseReceived { res } => {
473 event_header = "ResponseReceived";
474 if let Err(err) = self.handle_response(res) {
475 error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
476 }
477 }
478 NetworkEvent::KeysToFetchForReplication(keys) => {
479 event_header = "KeysToFetchForReplication";
480 self.record_metrics(Marker::fetching_keys_for_replication(&keys));
481
482 if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
483 error!("Error while trying to fetch replicated data {err:?}");
484 }
485 }
486 NetworkEvent::QueryRequestReceived { query, channel } => {
487 event_header = "QueryRequestReceived";
488 let node = self.clone();
489 let payment_address = *self.reward_address();
490
491 let _handle = spawn(async move {
492 let network = node.network().clone();
493 let res = Self::handle_query(node, query, payment_address).await;
494
495 if let Response::Query(QueryResponse::GetVersion { .. }) = res {
497 trace!("Sending response {res:?}");
498 } else {
499 debug!("Sending response {res:?}");
500 }
501
502 network.send_response(res, channel);
503 });
504 }
505 NetworkEvent::UnverifiedRecord(record) => {
506 event_header = "UnverifiedRecord";
507 let self_clone = self.clone();
509 let _handle = spawn(async move {
510 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
511 match self_clone.validate_and_store_record(record).await {
512 Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
513 Err(err) => {
514 self_clone.record_metrics(Marker::RecordRejected(&key, &err));
515 }
516 }
517 });
518 }
519 NetworkEvent::TerminateNode { reason } => {
520 event_header = "TerminateNode";
521 error!("Received termination from swarm_driver due to {reason:?}");
522 self.events_channel()
523 .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
524 }
525 NetworkEvent::FailedToFetchHolders(bad_nodes) => {
526 event_header = "FailedToFetchHolders";
527 let network = self.network().clone();
528 let pretty_log: Vec<_> = bad_nodes
529 .iter()
530 .map(|(peer_id, record_key)| {
531 let pretty_key = PrettyPrintRecordKey::from(record_key);
532 (peer_id, pretty_key)
533 })
534 .collect();
535 debug!(
539 "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
540 );
541 let _handle = spawn(async move {
542 for (peer_id, record_key) in bad_nodes {
543 if let Ok(false) = network.is_record_key_present_locally(&record_key).await
546 {
547 error!(
548 "From peer {peer_id:?}, failed to fetch record {:?}",
549 PrettyPrintRecordKey::from(&record_key)
550 );
551 network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
552 }
553 }
554 });
555 }
556 NetworkEvent::QuoteVerification { quotes } => {
557 event_header = "QuoteVerification";
558 let network = self.network().clone();
559
560 let _handle = spawn(async move {
561 quotes_verification(&network, quotes).await;
562 });
563 }
564 NetworkEvent::FreshReplicateToFetch { holder, keys } => {
565 event_header = "FreshReplicateToFetch";
566 self.fresh_replicate_to_fetch(holder, keys);
567 }
568 NetworkEvent::PeersForVersionQuery(peers) => {
569 event_header = "PeersForVersionQuery";
570 let network = self.network().clone();
571 let _handle = spawn(async move {
572 Self::query_peers_version(network, peers).await;
573 });
574 }
575 }
576
577 trace!(
578 "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
579 start.elapsed()
580 );
581 }
582
583 fn handle_response(&self, response: Response) -> Result<()> {
585 match response {
586 Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
587 warn!("Mishandled replicate response, should be handled earlier");
589 }
590 Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
591 error!(
592 "Response to replication shall be handled by called not by common handler, {resp:?}"
593 );
594 }
595 Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
596 }
598 other => {
599 warn!("handle_response not implemented for {other:?}");
600 }
601 };
602
603 Ok(())
604 }
605
606 async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
607 let network = node.network();
608 let resp: QueryResponse = match query {
609 Query::GetStoreQuote {
610 key,
611 data_type,
612 data_size,
613 nonce,
614 difficulty,
615 } => {
616 let record_key = key.to_record_key();
617 let self_id = network.peer_id();
618
619 let maybe_quoting_metrics = network
620 .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
621 .await;
622
623 let storage_proofs = if let Some(nonce) = nonce {
624 Self::respond_x_closest_record_proof(
625 network,
626 key.clone(),
627 nonce,
628 difficulty,
629 false,
630 )
631 .await
632 } else {
633 vec![]
634 };
635
636 match maybe_quoting_metrics {
637 Ok((quoting_metrics, is_already_stored)) => {
638 if is_already_stored {
639 QueryResponse::GetStoreQuote {
640 quote: Err(ProtocolError::RecordExists(
641 PrettyPrintRecordKey::from(&record_key).into_owned(),
642 )),
643 peer_address: NetworkAddress::from(self_id),
644 storage_proofs,
645 }
646 } else {
647 QueryResponse::GetStoreQuote {
648 quote: Self::create_quote_for_storecost(
649 network,
650 &key,
651 "ing_metrics,
652 &payment_address,
653 ),
654 peer_address: NetworkAddress::from(self_id),
655 storage_proofs,
656 }
657 }
658 }
659 Err(err) => {
660 warn!("GetStoreQuote failed for {key:?}: {err}");
661 QueryResponse::GetStoreQuote {
662 quote: Err(ProtocolError::GetStoreQuoteFailed),
663 peer_address: NetworkAddress::from(self_id),
664 storage_proofs,
665 }
666 }
667 }
668 }
669 Query::GetReplicatedRecord { requester: _, key } => {
670 let our_address = NetworkAddress::from(network.peer_id());
671 let mut result = Err(ProtocolError::ReplicatedRecordNotFound {
672 holder: Box::new(our_address.clone()),
673 key: Box::new(key.clone()),
674 });
675 let record_key = key.as_record_key();
676
677 if let Some(record_key) = record_key
678 && let Ok(Some(record)) = network.get_local_record(&record_key).await
679 {
680 result = Ok((our_address, Bytes::from(record.value)));
681 }
682
683 QueryResponse::GetReplicatedRecord(result)
684 }
685 Query::GetChunkExistenceProof {
686 key,
687 nonce,
688 difficulty,
689 } => QueryResponse::GetChunkExistenceProof(
690 Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
691 ),
692 Query::CheckNodeInProblem(target_address) => {
693 debug!("Got CheckNodeInProblem for peer {target_address:?}");
694
695 let is_in_trouble =
696 if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
697 result
698 } else {
699 debug!("Could not get status of {target_address:?}.");
700 false
701 };
702
703 QueryResponse::CheckNodeInProblem {
704 reporter_address: NetworkAddress::from(network.peer_id()),
705 target_address,
706 is_in_trouble,
707 }
708 }
709 Query::GetClosestPeers {
710 key,
711 num_of_peers,
712 range,
713 sign_result,
714 } => {
715 debug!(
716 "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
717 );
718 Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
719 .await
720 }
721 Query::GetVersion(_) => QueryResponse::GetVersion {
722 peer: NetworkAddress::from(network.peer_id()),
723 version: ant_build_info::package_version(),
724 },
725 Query::PutRecord {
726 holder,
727 address,
728 serialized_record,
729 } => {
730 let record = Record {
731 key: address.to_record_key(),
732 value: serialized_record,
733 publisher: None,
734 expires: None,
735 };
736
737 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
738 let result = match node.validate_and_store_record(record).await {
739 Ok(()) => Ok(()),
740 Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
741 node.record_metrics(Marker::RecordRejected(
742 &key,
743 &PutValidationError::OutdatedRecordCounter { counter, expected },
744 ));
745 Err(ProtocolError::OutdatedRecordCounter { counter, expected })
746 }
747 Err(err) => {
748 node.record_metrics(Marker::RecordRejected(&key, &err));
749 Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
750 }
751 };
752 QueryResponse::PutRecord {
753 result,
754 peer_address: holder,
755 record_addr: address,
756 }
757 }
758 };
759 Response::Query(resp)
760 }
761
762 async fn respond_get_closest_peers(
763 network: &Network,
764 target: NetworkAddress,
765 num_of_peers: Option<usize>,
766 range: Option<[u8; 32]>,
767 sign_result: bool,
768 ) -> QueryResponse {
769 let local_peers = network.get_local_peers_with_multiaddr().await;
770 let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
771 Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
772 } else {
773 vec![]
774 };
775
776 let signature = if sign_result {
777 let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
778 bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
779 network.sign(&bytes).ok()
780 } else {
781 None
782 };
783
784 QueryResponse::GetClosestPeers {
785 target,
786 peers,
787 signature,
788 }
789 }
790
791 fn calculate_get_closest_peers(
792 peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
793 target: NetworkAddress,
794 num_of_peers: Option<usize>,
795 range: Option<[u8; 32]>,
796 ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
797 match (num_of_peers, range) {
798 (_, Some(value)) => {
799 let distance = U256::from_big_endian(&value);
800 peer_addrs
801 .iter()
802 .filter_map(|(peer_id, multi_addrs)| {
803 let addr = NetworkAddress::from(*peer_id);
804 if target.distance(&addr).0 <= distance {
805 Some((addr, multi_addrs.clone()))
806 } else {
807 None
808 }
809 })
810 .collect()
811 }
812 (Some(num_of_peers), _) => {
813 let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
814 .iter()
815 .map(|(peer_id, multi_addrs)| {
816 let addr = NetworkAddress::from(*peer_id);
817 (addr, multi_addrs.clone())
818 })
819 .collect();
820 result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
821 result.into_iter().take(num_of_peers).collect()
822 }
823 (None, None) => vec![],
824 }
825 }
826
827 async fn respond_x_closest_record_proof(
830 network: &Network,
831 key: NetworkAddress,
832 nonce: Nonce,
833 difficulty: usize,
834 chunk_only: bool,
835 ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
836 let start = Instant::now();
837 let mut results = vec![];
838 if difficulty == 1 {
839 let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
841 if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
842 let proof = ChunkProof::new(&record.value, nonce);
843 debug!("Chunk proof for {key:?} is {proof:?}");
844 result = Ok(proof)
845 } else {
846 debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
847 }
848
849 results.push((key.clone(), result));
850 } else {
851 let all_local_records = network.get_all_local_record_addresses().await;
852
853 if let Ok(all_local_records) = all_local_records {
854 let mut all_chunk_addrs: Vec<_> = if chunk_only {
855 all_local_records
856 .iter()
857 .filter_map(|(addr, record_type)| {
858 if *record_type == ValidationType::Chunk {
859 Some(addr.clone())
860 } else {
861 None
862 }
863 })
864 .collect()
865 } else {
866 all_local_records.keys().cloned().collect()
867 };
868
869 all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
871
872 let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
874
875 for addr in all_chunk_addrs.iter().take(workload_factor) {
876 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
877 {
878 let proof = ChunkProof::new(&record.value, nonce);
879 debug!("Chunk proof for {key:?} is {proof:?}");
880 results.push((addr.clone(), Ok(proof)));
881 }
882 }
883 }
884
885 info!(
886 "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
887 results.len(),
888 start.elapsed()
889 );
890 }
891
892 results
893 }
894
895 async fn storage_challenge(network: Network) {
899 let start = Instant::now();
900 let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
901 network.get_k_closest_local_peers_to_the_target(None).await
902 {
903 closest_peers
904 .into_iter()
905 .take(CLOSE_GROUP_SIZE)
906 .collect_vec()
907 } else {
908 error!("Cannot get local neighbours");
909 return;
910 };
911 if closest_peers.len() < CLOSE_GROUP_SIZE {
912 debug!(
913 "Not enough neighbours ({}/{}) to carry out storage challenge.",
914 closest_peers.len(),
915 CLOSE_GROUP_SIZE
916 );
917 return;
918 }
919
920 let mut verify_candidates: Vec<NetworkAddress> =
921 if let Ok(all_keys) = network.get_all_local_record_addresses().await {
922 all_keys
923 .iter()
924 .filter_map(|(addr, record_type)| {
925 if ValidationType::Chunk == *record_type {
926 Some(addr.clone())
927 } else {
928 None
929 }
930 })
931 .collect()
932 } else {
933 error!("Failed to get local record addresses.");
934 return;
935 };
936 let num_of_targets = verify_candidates.len();
937 if num_of_targets < 50 {
938 debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
939 return;
940 }
941
942 let self_addr = NetworkAddress::from(network.peer_id());
945 verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
946 let index: usize = OsRng.gen_range(0..num_of_targets / 2);
947 let target = verify_candidates[index].clone();
948 let difficulty = CLOSE_GROUP_SIZE;
950 verify_candidates.sort_by_key(|addr| target.distance(addr));
951 let expected_targets = verify_candidates.into_iter().take(difficulty);
952 let nonce: Nonce = thread_rng().r#gen::<u64>();
953 let mut expected_proofs = HashMap::new();
954 for addr in expected_targets {
955 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
956 let expected_proof = ChunkProof::new(&record.value, nonce);
957 let _ = expected_proofs.insert(addr, expected_proof);
958 } else {
959 error!("Local record {addr:?} cann't be loaded from disk.");
960 }
961 }
962 let request = Request::Query(Query::GetChunkExistenceProof {
963 key: target.clone(),
964 nonce,
965 difficulty,
966 });
967
968 let mut tasks = JoinSet::new();
969 for (peer_id, addresses) in closest_peers {
970 if peer_id == network.peer_id() {
971 continue;
972 }
973 let network_clone = network.clone();
974 let request_clone = request.clone();
975 let expected_proofs_clone = expected_proofs.clone();
976 let _ = tasks.spawn(async move {
977 let res = scoring_peer(
978 network_clone,
979 (peer_id, addresses),
980 request_clone,
981 expected_proofs_clone,
982 )
983 .await;
984 (peer_id, res)
985 });
986 }
987
988 let mut peer_scores = vec![];
989 while let Some(res) = tasks.join_next().await {
990 match res {
991 Ok((peer_id, score)) => {
992 let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
993 if !is_healthy {
994 info!(
995 "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
996 );
997 network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
999 }
1000 peer_scores.push((peer_id, is_healthy));
1001 }
1002 Err(e) => {
1003 info!("StorageChallenge task completed with error {e:?}");
1004 }
1005 }
1006 }
1007 if !peer_scores.is_empty() {
1008 network.notify_peer_scores(peer_scores);
1009 }
1010
1011 info!(
1012 "Completed node StorageChallenge against neighbours in {:?}!",
1013 start.elapsed()
1014 );
1015 }
1016
1017 async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1019 for (peer_id, addrs) in peers {
1021 Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1022 }
1023 }
1024
1025 async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1027 let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1028 let version = match network.send_request(request, peer, addrs).await {
1030 Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1031 trace!("Fetched peer version {peer:?} as {version:?}");
1032 version
1033 }
1034 Ok(other) => {
1035 info!("Not a fetched peer version {peer:?}, {other:?}");
1036 "none".to_string()
1037 }
1038 Err(err) => {
1039 info!("Failed to fetch peer version {peer:?} with error {err:?}");
1040 if let NetworkError::OutboundError(OutboundFailure::DialFailure) = err {
1044 network.remove_peer(peer);
1045 return;
1046 }
1047 "old".to_string()
1048 }
1049 };
1050 network.notify_node_version(peer, version);
1051 }
1052}
1053
1054async fn scoring_peer(
1055 network: Network,
1056 peer: (PeerId, Addresses),
1057 request: Request,
1058 expected_proofs: HashMap<NetworkAddress, ChunkProof>,
1059) -> usize {
1060 let peer_id = peer.0;
1061 let start = Instant::now();
1062 let responses = network
1063 .send_and_get_responses(&[peer], &request, true)
1064 .await;
1065
1066 if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
1067 responses.get(&peer_id)
1068 {
1069 if answers.is_empty() {
1070 info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
1071 return 0;
1072 }
1073 let elapsed = start.elapsed();
1074
1075 let mut received_proofs = vec![];
1076 for (addr, proof) in answers {
1077 if let Ok(proof) = proof {
1078 received_proofs.push((addr.clone(), proof.clone()));
1079 }
1080 }
1081
1082 let score = mark_peer(elapsed, received_proofs, &expected_proofs);
1083 info!(
1084 "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
1085 answers.len()
1086 );
1087 score
1088 } else {
1089 info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
1090 0
1091 }
1092}
1093
1094fn mark_peer(
1100 duration: Duration,
1101 answers: Vec<(NetworkAddress, ChunkProof)>,
1102 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1103) -> usize {
1104 let duration_score = duration_score_scheme(duration);
1105 let challenge_score = challenge_score_scheme(answers, expected_proofs);
1106
1107 duration_score * challenge_score
1108}
1109
1110fn duration_score_scheme(duration: Duration) -> usize {
1112 let in_ms = if let Some(value) = duration.as_millis().to_usize() {
1114 value
1115 } else {
1116 info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
1117 1000
1118 };
1119
1120 let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
1121 HIGHEST_SCORE - step
1122}
1123
1124fn challenge_score_scheme(
1126 answers: Vec<(NetworkAddress, ChunkProof)>,
1127 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1128) -> usize {
1129 let mut correct_answers = 0;
1130 for (addr, chunk_proof) in answers {
1131 if let Some(expected_proof) = expected_proofs.get(&addr) {
1132 if expected_proof.verify(&chunk_proof) {
1133 correct_answers += 1;
1134 } else {
1135 info!("Spot a false answer to the challenge regarding {addr:?}");
1136 return 0;
1138 }
1139 }
1140 }
1141 std::cmp::min(
1148 HIGHEST_SCORE,
1149 HIGHEST_SCORE * correct_answers / expected_proofs.len(),
1150 )
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155 use super::*;
1156 use std::str::FromStr;
1157
1158 #[test]
1159 fn test_no_local_peers() {
1160 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
1161 let target = NetworkAddress::from(PeerId::random());
1162 let num_of_peers = Some(5);
1163 let range = None;
1164 let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
1165
1166 assert_eq!(result, vec![]);
1167 }
1168
1169 #[test]
1170 fn test_fewer_local_peers_than_num_of_peers() {
1171 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1172 (
1173 PeerId::random(),
1174 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1175 ),
1176 (
1177 PeerId::random(),
1178 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1179 ),
1180 (
1181 PeerId::random(),
1182 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1183 ),
1184 ];
1185 let target = NetworkAddress::from(PeerId::random());
1186 let num_of_peers = Some(2);
1187 let range = None;
1188 let result = Node::calculate_get_closest_peers(
1189 local_peers.clone(),
1190 target.clone(),
1191 num_of_peers,
1192 range,
1193 );
1194
1195 let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1197 .iter()
1198 .map(|(peer_id, multi_addrs)| {
1199 let addr = NetworkAddress::from(*peer_id);
1200 (addr, multi_addrs.clone())
1201 })
1202 .collect();
1203 expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
1204 let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
1205
1206 assert_eq!(expected_result, result);
1207 }
1208
1209 #[test]
1210 fn test_with_range_and_num_of_peers() {
1211 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1212 (
1213 PeerId::random(),
1214 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1215 ),
1216 (
1217 PeerId::random(),
1218 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1219 ),
1220 (
1221 PeerId::random(),
1222 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1223 ),
1224 ];
1225 let target = NetworkAddress::from(PeerId::random());
1226 let num_of_peers = Some(0);
1227 let range_value = [128; 32];
1228 let range = Some(range_value);
1229 let result = Node::calculate_get_closest_peers(
1230 local_peers.clone(),
1231 target.clone(),
1232 num_of_peers,
1233 range,
1234 );
1235
1236 let distance = U256::from_big_endian(&range_value);
1238 let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1239 .into_iter()
1240 .filter_map(|(peer_id, multi_addrs)| {
1241 let addr = NetworkAddress::from(peer_id);
1242 if target.distance(&addr).0 <= distance {
1243 Some((addr, multi_addrs.clone()))
1244 } else {
1245 None
1246 }
1247 })
1248 .collect();
1249
1250 assert_eq!(expected_result, result);
1251 }
1252}