1use super::{
10 error::Result, event::NodeEventsChannel, quote::quotes_verification, Marker, NodeEvent,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14use crate::RunningNode;
15use ant_bootstrap::BootstrapCacheStore;
16use ant_evm::EvmNetwork;
17use ant_evm::RewardsAddress;
18use ant_networking::Addresses;
19#[cfg(feature = "open-metrics")]
20use ant_networking::MetricsRegistries;
21use ant_networking::{
22 Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, SwarmDriver,
23};
24use ant_protocol::{
25 error::Error as ProtocolError,
26 messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
27 storage::ValidationType,
28 NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
29};
30use bytes::Bytes;
31use itertools::Itertools;
32use libp2p::{identity::Keypair, kad::U256, request_response::OutboundFailure, Multiaddr, PeerId};
33use num_traits::cast::ToPrimitive;
34use rand::{
35 rngs::{OsRng, StdRng},
36 thread_rng, Rng, SeedableRng,
37};
38use std::{
39 collections::HashMap,
40 net::SocketAddr,
41 path::PathBuf,
42 sync::{
43 atomic::{AtomicUsize, Ordering},
44 Arc,
45 },
46 time::Duration,
47};
48use tokio::sync::watch;
49use tokio::{
50 sync::mpsc::Receiver,
51 task::{spawn, JoinSet},
52};
53
54pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
57
58const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
61
62const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
64
65const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
68
69const HIGHEST_SCORE: usize = 100;
71
72const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
75
76const TIME_STEP: usize = 20;
78
79pub struct NodeBuilder {
81 addr: SocketAddr,
82 bootstrap_cache: Option<BootstrapCacheStore>,
83 evm_address: RewardsAddress,
84 evm_network: EvmNetwork,
85 initial_peers: Vec<Multiaddr>,
86 identity_keypair: Keypair,
87 local: bool,
88 #[cfg(feature = "open-metrics")]
89 metrics_server_port: Option<u16>,
91 no_upnp: bool,
92 relay_client: bool,
93 root_dir: PathBuf,
94}
95
96impl NodeBuilder {
97 pub fn new(
100 identity_keypair: Keypair,
101 initial_peers: Vec<Multiaddr>,
102 evm_address: RewardsAddress,
103 evm_network: EvmNetwork,
104 addr: SocketAddr,
105 root_dir: PathBuf,
106 ) -> Self {
107 Self {
108 addr,
109 bootstrap_cache: None,
110 evm_address,
111 evm_network,
112 initial_peers,
113 identity_keypair,
114 local: false,
115 #[cfg(feature = "open-metrics")]
116 metrics_server_port: None,
117 no_upnp: false,
118 relay_client: false,
119 root_dir,
120 }
121 }
122
123 pub fn local(&mut self, local: bool) {
125 self.local = local;
126 }
127
128 #[cfg(feature = "open-metrics")]
129 pub fn metrics_server_port(&mut self, port: Option<u16>) {
131 self.metrics_server_port = port;
132 }
133
134 pub fn bootstrap_cache(&mut self, cache: BootstrapCacheStore) {
136 self.bootstrap_cache = Some(cache);
137 }
138
139 pub fn relay_client(&mut self, relay_client: bool) {
141 self.relay_client = relay_client;
142 }
143
144 pub fn no_upnp(&mut self, no_upnp: bool) {
146 self.no_upnp = no_upnp;
147 }
148
149 pub fn build_and_run(self) -> Result<RunningNode> {
162 let mut network_builder =
163 NetworkBuilder::new(self.identity_keypair, self.local, self.initial_peers);
164
165 #[cfg(feature = "open-metrics")]
166 let metrics_recorder = if self.metrics_server_port.is_some() {
167 let mut metrics_registries = MetricsRegistries::default();
169 let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
170
171 network_builder.metrics_registries(metrics_registries);
172
173 Some(metrics_recorder)
174 } else {
175 None
176 };
177
178 network_builder.listen_addr(self.addr);
179 #[cfg(feature = "open-metrics")]
180 network_builder.metrics_server_port(self.metrics_server_port);
181 network_builder.relay_client(self.relay_client);
182 if let Some(cache) = self.bootstrap_cache {
183 network_builder.bootstrap_cache(cache);
184 }
185
186 network_builder.no_upnp(self.no_upnp);
187
188 let (network, network_event_receiver, swarm_driver) =
189 network_builder.build_node(self.root_dir.clone())?;
190
191 let node_events_channel = NodeEventsChannel::default();
192
193 let node = NodeInner {
194 network: network.clone(),
195 events_channel: node_events_channel.clone(),
196 reward_address: self.evm_address,
197 #[cfg(feature = "open-metrics")]
198 metrics_recorder,
199 evm_network: self.evm_network,
200 };
201
202 let node = Node {
203 inner: Arc::new(node),
204 };
205
206 let (shutdown_tx, shutdown_rx) = watch::channel(false);
208
209 node.run(swarm_driver, network_event_receiver, shutdown_rx);
211
212 let running_node = RunningNode {
213 shutdown_sender: shutdown_tx,
214 network,
215 node_events_channel,
216 root_dir_path: self.root_dir,
217 rewards_address: self.evm_address,
218 };
219
220 Ok(running_node)
221 }
222}
223
224#[derive(Clone)]
228pub(crate) struct Node {
229 inner: Arc<NodeInner>,
230}
231
232struct NodeInner {
235 events_channel: NodeEventsChannel,
236 network: Network,
237 #[cfg(feature = "open-metrics")]
238 metrics_recorder: Option<NodeMetricsRecorder>,
239 reward_address: RewardsAddress,
240 evm_network: EvmNetwork,
241}
242
243impl Node {
244 pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
246 &self.inner.events_channel
247 }
248
249 pub(crate) fn network(&self) -> &Network {
251 &self.inner.network
252 }
253
254 #[cfg(feature = "open-metrics")]
255 pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
258 self.inner.metrics_recorder.as_ref()
259 }
260
261 pub(crate) fn reward_address(&self) -> &RewardsAddress {
263 &self.inner.reward_address
264 }
265
266 pub(crate) fn evm_network(&self) -> &EvmNetwork {
267 &self.inner.evm_network
268 }
269
270 fn run(
273 self,
274 swarm_driver: SwarmDriver,
275 mut network_event_receiver: Receiver<NetworkEvent>,
276 mut shutdown_rx: watch::Receiver<bool>,
277 ) {
278 let mut rng = StdRng::from_entropy();
279
280 let peers_connected = Arc::new(AtomicUsize::new(0));
281
282 let _swarm_driver_task = spawn(swarm_driver.run(shutdown_rx.clone()));
283 let _node_task = spawn(async move {
284 let replication_interval: u64 = rng.gen_range(
287 PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
288 );
289 let replication_interval_time = Duration::from_secs(replication_interval);
290 debug!("Replication interval set to {replication_interval_time:?}");
291
292 let mut replication_interval = tokio::time::interval(replication_interval_time);
293 let _ = replication_interval.tick().await; let mut uptime_metrics_update_interval =
296 tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
297 let _ = uptime_metrics_update_interval.tick().await; let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
302 UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
303 ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
304 );
305 let irrelevant_records_cleanup_interval_time =
306 Duration::from_secs(irrelevant_records_cleanup_interval);
307 let mut irrelevant_records_cleanup_interval =
308 tokio::time::interval(irrelevant_records_cleanup_interval_time);
309 let _ = irrelevant_records_cleanup_interval.tick().await; let storage_challenge_interval: u64 =
314 rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
315 let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
316 debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
317
318 let mut storage_challenge_interval =
319 tokio::time::interval(storage_challenge_interval_time);
320 let _ = storage_challenge_interval.tick().await; loop {
323 let peers_connected = &peers_connected;
324
325 tokio::select! {
326 result = shutdown_rx.changed() => {
328 if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
329 info!("Shutdown signal received or sender dropped. Exiting network events loop.");
330 break;
331 }
332 },
333 net_event = network_event_receiver.recv() => {
334 match net_event {
335 Some(event) => {
336 let start = Instant::now();
337 let event_string = format!("{event:?}");
338
339 self.handle_network_event(event, peers_connected);
340 trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
341
342 }
343 None => {
344 error!("The `NetworkEvent` channel is closed");
345 self.events_channel().broadcast(NodeEvent::ChannelClosed);
346 break;
347 }
348 }
349 }
350 _ = replication_interval.tick() => {
352 let start = Instant::now();
353 let network = self.network().clone();
354 self.record_metrics(Marker::IntervalReplicationTriggered);
355
356 let _handle = spawn(async move {
357 Self::try_interval_replication(network);
358 trace!("Periodic replication took {:?}", start.elapsed());
359 });
360 }
361 _ = uptime_metrics_update_interval.tick() => {
362 #[cfg(feature = "open-metrics")]
363 if let Some(metrics_recorder) = self.metrics_recorder() {
364 let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
365 }
366 }
367 _ = irrelevant_records_cleanup_interval.tick() => {
368 let network = self.network().clone();
369
370 let _handle = spawn(async move {
371 Self::trigger_irrelevant_record_cleanup(network);
372 });
373 }
374 _ = storage_challenge_interval.tick() => {
376 let start = Instant::now();
377 debug!("Periodic storage challenge triggered");
378 let network = self.network().clone();
379
380 let _handle = spawn(async move {
381 Self::storage_challenge(network).await;
382 trace!("Periodic storage challenge took {:?}", start.elapsed());
383 });
384 }
385 }
386 }
387 });
388 }
389
390 pub(crate) fn record_metrics(&self, marker: Marker) {
393 marker.log();
394 #[cfg(feature = "open-metrics")]
395 if let Some(metrics_recorder) = self.metrics_recorder() {
396 metrics_recorder.record(marker)
397 }
398 }
399
400 fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
405 let start = Instant::now();
406 let event_string = format!("{event:?}");
407 let event_header;
408
409 if let NetworkEvent::QueryRequestReceived {
411 query: Query::GetVersion { .. },
412 ..
413 } = event
414 {
415 trace!("Handling NetworkEvent {event_string}");
416 } else {
417 debug!("Handling NetworkEvent {event_string}");
418 }
419
420 match event {
421 NetworkEvent::PeerAdded(peer_id, connected_peers) => {
422 event_header = "PeerAdded";
423 let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
425 if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
426 self.events_channel()
427 .broadcast(NodeEvent::ConnectedToNetwork);
428 }
429
430 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
431 self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
432
433 let network = self.network().clone();
435 let _handle = spawn(async move {
436 Self::try_query_peer_version(network, peer_id, Default::default()).await;
437 });
438
439 let network = self.network().clone();
441 self.record_metrics(Marker::IntervalReplicationTriggered);
442 let _handle = spawn(async move {
443 Self::try_interval_replication(network);
444 });
445 }
446 NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
447 event_header = "PeerRemoved";
448 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
449 self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
450
451 let self_id = self.network().peer_id();
452 let distance =
453 NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
454 info!("Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.", distance.ilog2());
455
456 let network = self.network().clone();
457 self.record_metrics(Marker::IntervalReplicationTriggered);
458 let _handle = spawn(async move {
459 Self::try_interval_replication(network);
460 });
461 }
462 NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
463 event_header = "PeerWithUnsupportedProtocol";
464 }
465 NetworkEvent::NewListenAddr(_) => {
466 event_header = "NewListenAddr";
467 }
468 NetworkEvent::ResponseReceived { res } => {
469 event_header = "ResponseReceived";
470 if let Err(err) = self.handle_response(res) {
471 error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
472 }
473 }
474 NetworkEvent::KeysToFetchForReplication(keys) => {
475 event_header = "KeysToFetchForReplication";
476 self.record_metrics(Marker::fetching_keys_for_replication(&keys));
477
478 if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
479 error!("Error while trying to fetch replicated data {err:?}");
480 }
481 }
482 NetworkEvent::QueryRequestReceived { query, channel } => {
483 event_header = "QueryRequestReceived";
484 let network = self.network().clone();
485 let payment_address = *self.reward_address();
486
487 let _handle = spawn(async move {
488 let res = Self::handle_query(&network, query, payment_address).await;
489
490 if let Response::Query(QueryResponse::GetVersion { .. }) = res {
492 trace!("Sending response {res:?}");
493 } else {
494 debug!("Sending response {res:?}");
495 }
496
497 network.send_response(res, channel);
498 });
499 }
500 NetworkEvent::UnverifiedRecord(record) => {
501 event_header = "UnverifiedRecord";
502 let self_clone = self.clone();
504 let _handle = spawn(async move {
505 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
506 match self_clone.validate_and_store_record(record).await {
507 Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
508 Err(err) => {
509 self_clone.record_metrics(Marker::RecordRejected(&key, &err));
510 }
511 }
512 });
513 }
514 NetworkEvent::TerminateNode { reason } => {
515 event_header = "TerminateNode";
516 error!("Received termination from swarm_driver due to {reason:?}");
517 self.events_channel()
518 .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
519 }
520 NetworkEvent::FailedToFetchHolders(bad_nodes) => {
521 event_header = "FailedToFetchHolders";
522 let network = self.network().clone();
523 let pretty_log: Vec<_> = bad_nodes
524 .iter()
525 .map(|(peer_id, record_key)| {
526 let pretty_key = PrettyPrintRecordKey::from(record_key);
527 (peer_id, pretty_key)
528 })
529 .collect();
530 debug!("Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from.");
534 let _handle = spawn(async move {
535 for (peer_id, record_key) in bad_nodes {
536 if let Ok(false) = network.is_record_key_present_locally(&record_key).await
539 {
540 error!(
541 "From peer {peer_id:?}, failed to fetch record {:?}",
542 PrettyPrintRecordKey::from(&record_key)
543 );
544 network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
545 }
546 }
547 });
548 }
549 NetworkEvent::QuoteVerification { quotes } => {
550 event_header = "QuoteVerification";
551 let network = self.network().clone();
552
553 let _handle = spawn(async move {
554 quotes_verification(&network, quotes).await;
555 });
556 }
557 NetworkEvent::FreshReplicateToFetch { holder, keys } => {
558 event_header = "FreshReplicateToFetch";
559 self.fresh_replicate_to_fetch(holder, keys);
560 }
561 NetworkEvent::PeersForVersionQuery(peers) => {
562 event_header = "PeersForVersionQuery";
563 let network = self.network().clone();
564 let _handle = spawn(async move {
565 Self::query_peers_version(network, peers).await;
566 });
567 }
568 }
569
570 trace!(
571 "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
572 start.elapsed()
573 );
574 }
575
576 fn handle_response(&self, response: Response) -> Result<()> {
578 match response {
579 Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
580 warn!("Mishandled replicate response, should be handled earlier");
582 }
583 Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
584 error!("Response to replication shall be handled by called not by common handler, {resp:?}");
585 }
586 Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
587 }
589 other => {
590 warn!("handle_response not implemented for {other:?}");
591 }
592 };
593
594 Ok(())
595 }
596
597 async fn handle_query(
598 network: &Network,
599 query: Query,
600 payment_address: RewardsAddress,
601 ) -> Response {
602 let resp: QueryResponse = match query {
603 Query::GetStoreQuote {
604 key,
605 data_type,
606 data_size,
607 nonce,
608 difficulty,
609 } => {
610 let record_key = key.to_record_key();
611 let self_id = network.peer_id();
612
613 let maybe_quoting_metrics = network
614 .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
615 .await;
616
617 let storage_proofs = if let Some(nonce) = nonce {
618 Self::respond_x_closest_record_proof(
619 network,
620 key.clone(),
621 nonce,
622 difficulty,
623 false,
624 )
625 .await
626 } else {
627 vec![]
628 };
629
630 match maybe_quoting_metrics {
631 Ok((quoting_metrics, is_already_stored)) => {
632 if is_already_stored {
633 QueryResponse::GetStoreQuote {
634 quote: Err(ProtocolError::RecordExists(
635 PrettyPrintRecordKey::from(&record_key).into_owned(),
636 )),
637 peer_address: NetworkAddress::from(self_id),
638 storage_proofs,
639 }
640 } else {
641 QueryResponse::GetStoreQuote {
642 quote: Self::create_quote_for_storecost(
643 network,
644 &key,
645 "ing_metrics,
646 &payment_address,
647 ),
648 peer_address: NetworkAddress::from(self_id),
649 storage_proofs,
650 }
651 }
652 }
653 Err(err) => {
654 warn!("GetStoreQuote failed for {key:?}: {err}");
655 QueryResponse::GetStoreQuote {
656 quote: Err(ProtocolError::GetStoreQuoteFailed),
657 peer_address: NetworkAddress::from(self_id),
658 storage_proofs,
659 }
660 }
661 }
662 }
663 Query::GetReplicatedRecord { requester: _, key } => {
664 let our_address = NetworkAddress::from(network.peer_id());
665 let mut result = Err(ProtocolError::ReplicatedRecordNotFound {
666 holder: Box::new(our_address.clone()),
667 key: Box::new(key.clone()),
668 });
669 let record_key = key.as_record_key();
670
671 if let Some(record_key) = record_key {
672 if let Ok(Some(record)) = network.get_local_record(&record_key).await {
673 result = Ok((our_address, Bytes::from(record.value)));
674 }
675 }
676
677 QueryResponse::GetReplicatedRecord(result)
678 }
679 Query::GetChunkExistenceProof {
680 key,
681 nonce,
682 difficulty,
683 } => QueryResponse::GetChunkExistenceProof(
684 Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
685 ),
686 Query::CheckNodeInProblem(target_address) => {
687 debug!("Got CheckNodeInProblem for peer {target_address:?}");
688
689 let is_in_trouble =
690 if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
691 result
692 } else {
693 debug!("Could not get status of {target_address:?}.");
694 false
695 };
696
697 QueryResponse::CheckNodeInProblem {
698 reporter_address: NetworkAddress::from(network.peer_id()),
699 target_address,
700 is_in_trouble,
701 }
702 }
703 Query::GetClosestPeers {
704 key,
705 num_of_peers,
706 range,
707 sign_result,
708 } => {
709 debug!(
710 "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
711 );
712 Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
713 .await
714 }
715 Query::GetVersion(_) => QueryResponse::GetVersion {
716 peer: NetworkAddress::from(network.peer_id()),
717 version: ant_build_info::package_version(),
718 },
719 };
720 Response::Query(resp)
721 }
722
723 async fn respond_get_closest_peers(
724 network: &Network,
725 target: NetworkAddress,
726 num_of_peers: Option<usize>,
727 range: Option<[u8; 32]>,
728 sign_result: bool,
729 ) -> QueryResponse {
730 let local_peers = network.get_local_peers_with_multiaddr().await;
731 let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
732 Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
733 } else {
734 vec![]
735 };
736
737 let signature = if sign_result {
738 let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
739 bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
740 network.sign(&bytes).ok()
741 } else {
742 None
743 };
744
745 QueryResponse::GetClosestPeers {
746 target,
747 peers,
748 signature,
749 }
750 }
751
752 fn calculate_get_closest_peers(
753 peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
754 target: NetworkAddress,
755 num_of_peers: Option<usize>,
756 range: Option<[u8; 32]>,
757 ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
758 match (num_of_peers, range) {
759 (_, Some(value)) => {
760 let distance = U256::from_big_endian(&value);
761 peer_addrs
762 .iter()
763 .filter_map(|(peer_id, multi_addrs)| {
764 let addr = NetworkAddress::from(*peer_id);
765 if target.distance(&addr).0 <= distance {
766 Some((addr, multi_addrs.clone()))
767 } else {
768 None
769 }
770 })
771 .collect()
772 }
773 (Some(num_of_peers), _) => {
774 let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
775 .iter()
776 .map(|(peer_id, multi_addrs)| {
777 let addr = NetworkAddress::from(*peer_id);
778 (addr, multi_addrs.clone())
779 })
780 .collect();
781 result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
782 result.into_iter().take(num_of_peers).collect()
783 }
784 (None, None) => vec![],
785 }
786 }
787
788 async fn respond_x_closest_record_proof(
791 network: &Network,
792 key: NetworkAddress,
793 nonce: Nonce,
794 difficulty: usize,
795 chunk_only: bool,
796 ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
797 let start = Instant::now();
798 let mut results = vec![];
799 if difficulty == 1 {
800 let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
802 if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
803 let proof = ChunkProof::new(&record.value, nonce);
804 debug!("Chunk proof for {key:?} is {proof:?}");
805 result = Ok(proof)
806 } else {
807 debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
808 }
809
810 results.push((key.clone(), result));
811 } else {
812 let all_local_records = network.get_all_local_record_addresses().await;
813
814 if let Ok(all_local_records) = all_local_records {
815 let mut all_chunk_addrs: Vec<_> = if chunk_only {
816 all_local_records
817 .iter()
818 .filter_map(|(addr, record_type)| {
819 if *record_type == ValidationType::Chunk {
820 Some(addr.clone())
821 } else {
822 None
823 }
824 })
825 .collect()
826 } else {
827 all_local_records.keys().cloned().collect()
828 };
829
830 all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
832
833 let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
835
836 for addr in all_chunk_addrs.iter().take(workload_factor) {
837 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
838 {
839 let proof = ChunkProof::new(&record.value, nonce);
840 debug!("Chunk proof for {key:?} is {proof:?}");
841 results.push((addr.clone(), Ok(proof)));
842 }
843 }
844 }
845
846 info!(
847 "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
848 results.len(), start.elapsed()
849 );
850 }
851
852 results
853 }
854
855 async fn storage_challenge(network: Network) {
859 let start = Instant::now();
860 let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
861 network.get_k_closest_local_peers_to_the_target(None).await
862 {
863 closest_peers
864 .into_iter()
865 .take(CLOSE_GROUP_SIZE)
866 .collect_vec()
867 } else {
868 error!("Cannot get local neighbours");
869 return;
870 };
871 if closest_peers.len() < CLOSE_GROUP_SIZE {
872 debug!(
873 "Not enough neighbours ({}/{}) to carry out storage challenge.",
874 closest_peers.len(),
875 CLOSE_GROUP_SIZE
876 );
877 return;
878 }
879
880 let mut verify_candidates: Vec<NetworkAddress> =
881 if let Ok(all_keys) = network.get_all_local_record_addresses().await {
882 all_keys
883 .iter()
884 .filter_map(|(addr, record_type)| {
885 if ValidationType::Chunk == *record_type {
886 Some(addr.clone())
887 } else {
888 None
889 }
890 })
891 .collect()
892 } else {
893 error!("Failed to get local record addresses.");
894 return;
895 };
896 let num_of_targets = verify_candidates.len();
897 if num_of_targets < 50 {
898 debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
899 return;
900 }
901
902 let self_addr = NetworkAddress::from(network.peer_id());
905 verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
906 let index: usize = OsRng.gen_range(0..num_of_targets / 2);
907 let target = verify_candidates[index].clone();
908 let difficulty = CLOSE_GROUP_SIZE;
910 verify_candidates.sort_by_key(|addr| target.distance(addr));
911 let expected_targets = verify_candidates.into_iter().take(difficulty);
912 let nonce: Nonce = thread_rng().gen::<u64>();
913 let mut expected_proofs = HashMap::new();
914 for addr in expected_targets {
915 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
916 let expected_proof = ChunkProof::new(&record.value, nonce);
917 let _ = expected_proofs.insert(addr, expected_proof);
918 } else {
919 error!("Local record {addr:?} cann't be loaded from disk.");
920 }
921 }
922 let request = Request::Query(Query::GetChunkExistenceProof {
923 key: target.clone(),
924 nonce,
925 difficulty,
926 });
927
928 let mut tasks = JoinSet::new();
929 for (peer_id, addresses) in closest_peers {
930 if peer_id == network.peer_id() {
931 continue;
932 }
933 let network_clone = network.clone();
934 let request_clone = request.clone();
935 let expected_proofs_clone = expected_proofs.clone();
936 let _ = tasks.spawn(async move {
937 let res = scoring_peer(
938 network_clone,
939 (peer_id, addresses),
940 request_clone,
941 expected_proofs_clone,
942 )
943 .await;
944 (peer_id, res)
945 });
946 }
947
948 let mut peer_scores = vec![];
949 while let Some(res) = tasks.join_next().await {
950 match res {
951 Ok((peer_id, score)) => {
952 let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
953 if !is_healthy {
954 info!("Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}.");
955 network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
957 }
958 peer_scores.push((peer_id, is_healthy));
959 }
960 Err(e) => {
961 info!("StorageChallenge task completed with error {e:?}");
962 }
963 }
964 }
965 if !peer_scores.is_empty() {
966 network.notify_peer_scores(peer_scores);
967 }
968
969 info!(
970 "Completed node StorageChallenge against neighbours in {:?}!",
971 start.elapsed()
972 );
973 }
974
975 async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
977 for (peer_id, addrs) in peers {
979 Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
980 }
981 }
982
983 async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
985 let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
986 let version = match network.send_request(request, peer, addrs).await {
988 Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
989 trace!("Fetched peer version {peer:?} as {version:?}");
990 version
991 }
992 Ok(other) => {
993 info!("Not a fetched peer version {peer:?}, {other:?}");
994 "none".to_string()
995 }
996 Err(err) => {
997 info!("Failed to fetch peer version {peer:?} with error {err:?}");
998 if let NetworkError::OutboundError(OutboundFailure::DialFailure) = err {
1002 network.remove_peer(peer);
1003 return;
1004 }
1005 "old".to_string()
1006 }
1007 };
1008 network.notify_node_version(peer, version);
1009 }
1010}
1011
1012async fn scoring_peer(
1013 network: Network,
1014 peer: (PeerId, Addresses),
1015 request: Request,
1016 expected_proofs: HashMap<NetworkAddress, ChunkProof>,
1017) -> usize {
1018 let peer_id = peer.0;
1019 let start = Instant::now();
1020 let responses = network
1021 .send_and_get_responses(&[peer], &request, true)
1022 .await;
1023
1024 if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
1025 responses.get(&peer_id)
1026 {
1027 if answers.is_empty() {
1028 info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
1029 return 0;
1030 }
1031 let elapsed = start.elapsed();
1032
1033 let mut received_proofs = vec![];
1034 for (addr, proof) in answers {
1035 if let Ok(proof) = proof {
1036 received_proofs.push((addr.clone(), proof.clone()));
1037 }
1038 }
1039
1040 let score = mark_peer(elapsed, received_proofs, &expected_proofs);
1041 info!(
1042 "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
1043 answers.len()
1044 );
1045 score
1046 } else {
1047 info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
1048 0
1049 }
1050}
1051
1052fn mark_peer(
1058 duration: Duration,
1059 answers: Vec<(NetworkAddress, ChunkProof)>,
1060 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1061) -> usize {
1062 let duration_score = duration_score_scheme(duration);
1063 let challenge_score = challenge_score_scheme(answers, expected_proofs);
1064
1065 duration_score * challenge_score
1066}
1067
1068fn duration_score_scheme(duration: Duration) -> usize {
1070 let in_ms = if let Some(value) = duration.as_millis().to_usize() {
1072 value
1073 } else {
1074 info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
1075 1000
1076 };
1077
1078 let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
1079 HIGHEST_SCORE - step
1080}
1081
1082fn challenge_score_scheme(
1084 answers: Vec<(NetworkAddress, ChunkProof)>,
1085 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1086) -> usize {
1087 let mut correct_answers = 0;
1088 for (addr, chunk_proof) in answers {
1089 if let Some(expected_proof) = expected_proofs.get(&addr) {
1090 if expected_proof.verify(&chunk_proof) {
1091 correct_answers += 1;
1092 } else {
1093 info!("Spot a false answer to the challenge regarding {addr:?}");
1094 return 0;
1096 }
1097 }
1098 }
1099 std::cmp::min(
1106 HIGHEST_SCORE,
1107 HIGHEST_SCORE * correct_answers / expected_proofs.len(),
1108 )
1109}
1110
1111#[cfg(test)]
1112mod tests {
1113 use super::*;
1114 use std::str::FromStr;
1115
1116 #[test]
1117 fn test_no_local_peers() {
1118 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
1119 let target = NetworkAddress::from(PeerId::random());
1120 let num_of_peers = Some(5);
1121 let range = None;
1122 let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
1123
1124 assert_eq!(result, vec![]);
1125 }
1126
1127 #[test]
1128 fn test_fewer_local_peers_than_num_of_peers() {
1129 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1130 (
1131 PeerId::random(),
1132 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1133 ),
1134 (
1135 PeerId::random(),
1136 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1137 ),
1138 (
1139 PeerId::random(),
1140 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1141 ),
1142 ];
1143 let target = NetworkAddress::from(PeerId::random());
1144 let num_of_peers = Some(2);
1145 let range = None;
1146 let result = Node::calculate_get_closest_peers(
1147 local_peers.clone(),
1148 target.clone(),
1149 num_of_peers,
1150 range,
1151 );
1152
1153 let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1155 .iter()
1156 .map(|(peer_id, multi_addrs)| {
1157 let addr = NetworkAddress::from(*peer_id);
1158 (addr, multi_addrs.clone())
1159 })
1160 .collect();
1161 expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
1162 let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
1163
1164 assert_eq!(expected_result, result);
1165 }
1166
1167 #[test]
1168 fn test_with_range_and_num_of_peers() {
1169 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1170 (
1171 PeerId::random(),
1172 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1173 ),
1174 (
1175 PeerId::random(),
1176 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1177 ),
1178 (
1179 PeerId::random(),
1180 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1181 ),
1182 ];
1183 let target = NetworkAddress::from(PeerId::random());
1184 let num_of_peers = Some(0);
1185 let range_value = [128; 32];
1186 let range = Some(range_value);
1187 let result = Node::calculate_get_closest_peers(
1188 local_peers.clone(),
1189 target.clone(),
1190 num_of_peers,
1191 range,
1192 );
1193
1194 let distance = U256::from_big_endian(&range_value);
1196 let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1197 .into_iter()
1198 .filter_map(|(peer_id, multi_addrs)| {
1199 let addr = NetworkAddress::from(peer_id);
1200 if target.distance(&addr).0 <= distance {
1201 Some((addr, multi_addrs.clone()))
1202 } else {
1203 None
1204 }
1205 })
1206 .collect();
1207
1208 assert_eq!(expected_result, result);
1209 }
1210}