1use super::{
10 Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkError, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::bootstrap::Bootstrap;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_evm::merkle_payments::MERKLE_PAYMENT_EXPIRATION;
22use ant_protocol::{
23 CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
24 error::Error as ProtocolError,
25 messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
26 storage::ValidationType,
27};
28use bytes::Bytes;
29use itertools::Itertools;
30use libp2p::{
31 Multiaddr, PeerId,
32 identity::Keypair,
33 kad::{Record, U256},
34 request_response::OutboundFailure,
35};
36use num_traits::cast::ToPrimitive;
37use rand::{
38 Rng, SeedableRng,
39 rngs::{OsRng, StdRng},
40 thread_rng,
41};
42use std::{
43 collections::HashMap,
44 net::SocketAddr,
45 path::PathBuf,
46 sync::{
47 Arc,
48 atomic::{AtomicUsize, Ordering},
49 },
50 time::{Duration, Instant},
51};
52use tokio::sync::watch;
53use tokio::{
54 sync::mpsc::Receiver,
55 task::{JoinSet, spawn},
56};
57
58pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
61
62const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
65
66const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
68
69const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
72
73const HIGHEST_SCORE: usize = 100;
75
76const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
79
80const TIME_STEP: usize = 20;
82
83pub struct NodeBuilder {
85 addr: SocketAddr,
86 bootstrap: Bootstrap,
87 evm_address: RewardsAddress,
88 evm_network: EvmNetwork,
89 identity_keypair: Keypair,
90 local: bool,
91 #[cfg(feature = "open-metrics")]
92 metrics_server_port: Option<u16>,
94 no_upnp: bool,
95 relay_client: bool,
96 root_dir: PathBuf,
97}
98
99impl NodeBuilder {
100 pub fn new(
103 identity_keypair: Keypair,
104 bootstrap_flow: Bootstrap,
105 evm_address: RewardsAddress,
106 evm_network: EvmNetwork,
107 addr: SocketAddr,
108 root_dir: PathBuf,
109 ) -> Self {
110 Self {
111 addr,
112 bootstrap: bootstrap_flow,
113 evm_address,
114 evm_network,
115 identity_keypair,
116 local: false,
117 #[cfg(feature = "open-metrics")]
118 metrics_server_port: None,
119 no_upnp: false,
120 relay_client: false,
121 root_dir,
122 }
123 }
124
125 pub fn local(&mut self, local: bool) {
127 self.local = local;
128 }
129
130 #[cfg(feature = "open-metrics")]
131 pub fn metrics_server_port(&mut self, port: Option<u16>) {
133 self.metrics_server_port = port;
134 }
135
136 pub fn relay_client(&mut self, relay_client: bool) {
138 self.relay_client = relay_client;
139 }
140
141 pub fn no_upnp(&mut self, no_upnp: bool) {
143 self.no_upnp = no_upnp;
144 }
145
146 pub fn build_and_run(self) -> Result<RunningNode> {
159 #[cfg(feature = "open-metrics")]
161 let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
162 let mut metrics_registries = MetricsRegistries::default();
164 let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
165
166 (Some(metrics_recorder), metrics_registries)
167 } else {
168 (None, MetricsRegistries::default())
169 };
170
171 let (shutdown_tx, shutdown_rx) = watch::channel(false);
173
174 let network_config = NetworkConfig {
176 keypair: self.identity_keypair,
177 local: self.local,
178 listen_addr: self.addr,
179 root_dir: self.root_dir.clone(),
180 shutdown_rx: shutdown_rx.clone(),
181 bootstrap: self.bootstrap,
182 no_upnp: self.no_upnp,
183 relay_client: self.relay_client,
184 custom_request_timeout: None,
185 #[cfg(feature = "open-metrics")]
186 metrics_registries,
187 #[cfg(feature = "open-metrics")]
188 metrics_server_port: self.metrics_server_port,
189 };
190 let (network, network_event_receiver) = Network::init(network_config)?;
191
192 let node_events_channel = NodeEventsChannel::default();
194 let node = NodeInner {
195 network: network.clone(),
196 events_channel: node_events_channel.clone(),
197 reward_address: self.evm_address,
198 #[cfg(feature = "open-metrics")]
199 metrics_recorder,
200 evm_network: self.evm_network,
201 };
202 let node = Node {
203 inner: Arc::new(node),
204 };
205
206 node.run(network_event_receiver, shutdown_rx);
208 let running_node = RunningNode {
209 shutdown_sender: shutdown_tx,
210 network,
211 node_events_channel,
212 root_dir_path: self.root_dir,
213 rewards_address: self.evm_address,
214 };
215
216 Ok(running_node)
217 }
218}
219
220#[derive(Clone)]
224pub(crate) struct Node {
225 inner: Arc<NodeInner>,
226}
227
228struct NodeInner {
231 events_channel: NodeEventsChannel,
232 network: Network,
233 #[cfg(feature = "open-metrics")]
234 metrics_recorder: Option<NodeMetricsRecorder>,
235 reward_address: RewardsAddress,
236 evm_network: EvmNetwork,
237}
238
239impl Node {
240 pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
242 &self.inner.events_channel
243 }
244
245 pub(crate) fn network(&self) -> &Network {
247 &self.inner.network
248 }
249
250 #[cfg(feature = "open-metrics")]
251 pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
254 self.inner.metrics_recorder.as_ref()
255 }
256
257 pub(crate) fn reward_address(&self) -> &RewardsAddress {
259 &self.inner.reward_address
260 }
261
262 pub(crate) fn evm_network(&self) -> &EvmNetwork {
263 &self.inner.evm_network
264 }
265
266 fn run(
269 self,
270 mut network_event_receiver: Receiver<NetworkEvent>,
271 mut shutdown_rx: watch::Receiver<bool>,
272 ) {
273 let mut rng = StdRng::from_entropy();
274
275 let peers_connected = Arc::new(AtomicUsize::new(0));
276
277 let _node_task = spawn(async move {
278 let replication_interval: u64 = rng.gen_range(
281 PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
282 );
283 let replication_interval_time = Duration::from_secs(replication_interval);
284 debug!("Replication interval set to {replication_interval_time:?}");
285
286 let mut replication_interval = tokio::time::interval(replication_interval_time);
287 let _ = replication_interval.tick().await; let mut uptime_metrics_update_interval =
290 tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
291 let _ = uptime_metrics_update_interval.tick().await; let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
296 UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
297 ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
298 );
299 let irrelevant_records_cleanup_interval_time =
300 Duration::from_secs(irrelevant_records_cleanup_interval);
301 let mut irrelevant_records_cleanup_interval =
302 tokio::time::interval(irrelevant_records_cleanup_interval_time);
303 let _ = irrelevant_records_cleanup_interval.tick().await; let storage_challenge_interval: u64 =
308 rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
309 let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
310 debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
311
312 let mut storage_challenge_interval =
313 tokio::time::interval(storage_challenge_interval_time);
314 let _ = storage_challenge_interval.tick().await; loop {
317 let peers_connected = &peers_connected;
318
319 tokio::select! {
320 result = shutdown_rx.changed() => {
322 if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
323 info!("Shutdown signal received or sender dropped. Exiting network events loop.");
324 break;
325 }
326 },
327 net_event = network_event_receiver.recv() => {
328 match net_event {
329 Some(event) => {
330 let start = Instant::now();
331 let event_string = format!("{event:?}");
332
333 self.handle_network_event(event, peers_connected);
334 trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
335
336 }
337 None => {
338 error!("The `NetworkEvent` channel is closed");
339 self.events_channel().broadcast(NodeEvent::ChannelClosed);
340 break;
341 }
342 }
343 }
344 _ = replication_interval.tick() => {
346 let start = Instant::now();
347 let network = self.network().clone();
348 self.record_metrics(Marker::IntervalReplicationTriggered);
349
350 let _handle = spawn(async move {
351 Self::try_interval_replication(network);
352 trace!("Periodic replication took {:?}", start.elapsed());
353 });
354 }
355 _ = uptime_metrics_update_interval.tick() => {
356 #[cfg(feature = "open-metrics")]
357 if let Some(metrics_recorder) = self.metrics_recorder() {
358 let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
359 }
360 }
361 _ = irrelevant_records_cleanup_interval.tick() => {
362 let network = self.network().clone();
363
364 let _handle = spawn(async move {
365 Self::trigger_irrelevant_record_cleanup(network);
366 });
367 }
368 _ = storage_challenge_interval.tick() => {
370 let start = Instant::now();
371 debug!("Periodic storage challenge triggered");
372 let network = self.network().clone();
373
374 let _handle = spawn(async move {
375 Self::storage_challenge(network).await;
376 trace!("Periodic storage challenge took {:?}", start.elapsed());
377 });
378 }
379 }
380 }
381 });
382 }
383
384 pub(crate) fn record_metrics(&self, marker: Marker) {
387 marker.log();
388 #[cfg(feature = "open-metrics")]
389 if let Some(metrics_recorder) = self.metrics_recorder() {
390 metrics_recorder.record(marker)
391 }
392 }
393
394 fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
399 let start = Instant::now();
400 let event_string = format!("{event:?}");
401 let event_header;
402
403 if let NetworkEvent::QueryRequestReceived {
405 query: Query::GetVersion { .. },
406 ..
407 } = event
408 {
409 trace!("Handling NetworkEvent {event_string}");
410 } else {
411 debug!("Handling NetworkEvent {event_string}");
412 }
413
414 match event {
415 NetworkEvent::PeerAdded(peer_id, connected_peers) => {
416 event_header = "PeerAdded";
417 let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
419 if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
420 self.events_channel()
421 .broadcast(NodeEvent::ConnectedToNetwork);
422 }
423
424 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
425 self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
426
427 let network = self.network().clone();
429 let _handle = spawn(async move {
430 Self::try_query_peer_version(network, peer_id, Default::default()).await;
431 });
432
433 let network = self.network().clone();
435 self.record_metrics(Marker::IntervalReplicationTriggered);
436 let _handle = spawn(async move {
437 Self::try_interval_replication(network);
438 });
439 }
440 NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
441 event_header = "PeerRemoved";
442 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
443 self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
444
445 let self_id = self.network().peer_id();
446 let distance =
447 NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
448 info!(
449 "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
450 distance.ilog2()
451 );
452
453 let network = self.network().clone();
454 self.record_metrics(Marker::IntervalReplicationTriggered);
455 let _handle = spawn(async move {
456 Self::try_interval_replication(network);
457 });
458 }
459 NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
460 event_header = "PeerWithUnsupportedProtocol";
461 }
462 NetworkEvent::NewListenAddr(_) => {
463 event_header = "NewListenAddr";
464 }
465 NetworkEvent::ResponseReceived { res } => {
466 event_header = "ResponseReceived";
467 if let Err(err) = self.handle_response(res) {
468 error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
469 }
470 }
471 NetworkEvent::KeysToFetchForReplication(keys) => {
472 event_header = "KeysToFetchForReplication";
473 self.record_metrics(Marker::fetching_keys_for_replication(&keys));
474
475 if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
476 error!("Error while trying to fetch replicated data {err:?}");
477 }
478 }
479 NetworkEvent::QueryRequestReceived { query, channel } => {
480 event_header = "QueryRequestReceived";
481 let node = self.clone();
482 let payment_address = *self.reward_address();
483
484 let _handle = spawn(async move {
485 let network = node.network().clone();
486 let res = Self::handle_query(node, query, payment_address).await;
487
488 if let Response::Query(QueryResponse::GetVersion { .. }) = res {
490 trace!("Sending response {res:?}");
491 } else {
492 debug!("Sending response {res:?}");
493 }
494
495 network.send_response(res, channel);
496 });
497 }
498 NetworkEvent::UnverifiedRecord(record) => {
499 event_header = "UnverifiedRecord";
500 let self_clone = self.clone();
502 let _handle = spawn(async move {
503 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
504 match self_clone.validate_and_store_record(record).await {
505 Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
506 Err(err) => {
507 self_clone.record_metrics(Marker::RecordRejected(&key, &err));
508 }
509 }
510 });
511 }
512 NetworkEvent::TerminateNode { reason } => {
513 event_header = "TerminateNode";
514 error!("Received termination from swarm_driver due to {reason:?}");
515 self.events_channel()
516 .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
517 }
518 NetworkEvent::FailedToFetchHolders(bad_nodes) => {
519 event_header = "FailedToFetchHolders";
520 let network = self.network().clone();
521 let pretty_log: Vec<_> = bad_nodes
522 .iter()
523 .map(|(peer_id, record_key)| {
524 let pretty_key = PrettyPrintRecordKey::from(record_key);
525 (peer_id, pretty_key)
526 })
527 .collect();
528 debug!(
532 "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
533 );
534 let _handle = spawn(async move {
535 for (peer_id, record_key) in bad_nodes {
536 if let Ok(false) = network.is_record_key_present_locally(&record_key).await
539 {
540 error!(
541 "From peer {peer_id:?}, failed to fetch record {:?}",
542 PrettyPrintRecordKey::from(&record_key)
543 );
544 network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
545 }
546 }
547 });
548 }
549 NetworkEvent::QuoteVerification { quotes } => {
550 event_header = "QuoteVerification";
551 let network = self.network().clone();
552
553 let _handle = spawn(async move {
554 quotes_verification(&network, quotes).await;
555 });
556 }
557 NetworkEvent::FreshReplicateToFetch { holder, keys } => {
558 event_header = "FreshReplicateToFetch";
559 self.fresh_replicate_to_fetch(holder, keys);
560 }
561 NetworkEvent::PeersForVersionQuery(peers) => {
562 event_header = "PeersForVersionQuery";
563 let network = self.network().clone();
564 let _handle = spawn(async move {
565 Self::query_peers_version(network, peers).await;
566 });
567 }
568 NetworkEvent::NetworkWideReplication { keys } => {
569 event_header = "NetworkWideReplication";
570 self.perform_network_wide_replication(keys);
571 }
572 }
573
574 trace!(
575 "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
576 start.elapsed()
577 );
578 }
579
580 fn handle_response(&self, response: Response) -> Result<()> {
582 match response {
583 Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
584 warn!("Mishandled replicate response, should be handled earlier");
586 }
587 Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
588 error!(
589 "Response to replication shall be handled by called not by common handler, {resp:?}"
590 );
591 }
592 Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
593 }
595 other => {
596 warn!("handle_response not implemented for {other:?}");
597 }
598 };
599
600 Ok(())
601 }
602
603 async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
604 let network = node.network();
605 let resp: QueryResponse = match query {
606 Query::GetStoreQuote {
607 key,
608 data_type,
609 data_size,
610 nonce,
611 difficulty,
612 } => {
613 let record_key = key.to_record_key();
614 let self_id = network.peer_id();
615
616 let maybe_quoting_metrics = network
617 .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
618 .await;
619
620 let storage_proofs = if let Some(nonce) = nonce {
621 Self::respond_x_closest_record_proof(
622 network,
623 key.clone(),
624 nonce,
625 difficulty,
626 false,
627 )
628 .await
629 } else {
630 vec![]
631 };
632
633 match maybe_quoting_metrics {
634 Ok((quoting_metrics, is_already_stored)) => {
635 if is_already_stored {
636 QueryResponse::GetStoreQuote {
637 quote: Err(ProtocolError::RecordExists(
638 PrettyPrintRecordKey::from(&record_key).into_owned(),
639 )),
640 peer_address: NetworkAddress::from(self_id),
641 storage_proofs,
642 }
643 } else {
644 QueryResponse::GetStoreQuote {
645 quote: Self::create_quote_for_storecost(
646 network,
647 &key,
648 "ing_metrics,
649 &payment_address,
650 ),
651 peer_address: NetworkAddress::from(self_id),
652 storage_proofs,
653 }
654 }
655 }
656 Err(err) => {
657 warn!("GetStoreQuote failed for {key:?}: {err}");
658 QueryResponse::GetStoreQuote {
659 quote: Err(ProtocolError::GetStoreQuoteFailed),
660 peer_address: NetworkAddress::from(self_id),
661 storage_proofs,
662 }
663 }
664 }
665 }
666 Query::GetReplicatedRecord { requester: _, key } => {
667 let our_address = NetworkAddress::from(network.peer_id());
668 let record_key = key.to_record_key();
669
670 let result = match network.get_local_record(&record_key).await {
671 Ok(Some(record)) => Ok((our_address, Bytes::from(record.value))),
672 Ok(None) => Err(ProtocolError::ReplicatedRecordNotFound {
673 holder: Box::new(our_address),
674 key: Box::new(key.clone()),
675 }),
676 Err(err) => Err(ProtocolError::PutRecordFailed(format!(
678 "Error to fetch local record for GetReplicatedRecord {err:?}"
679 ))),
680 };
681
682 QueryResponse::GetReplicatedRecord(result)
683 }
684 Query::GetChunkExistenceProof {
685 key,
686 nonce,
687 difficulty,
688 } => QueryResponse::GetChunkExistenceProof(
689 Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
690 ),
691 Query::CheckNodeInProblem(target_address) => {
692 debug!("Got CheckNodeInProblem for peer {target_address:?}");
693
694 let is_in_trouble =
695 if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
696 result
697 } else {
698 debug!("Could not get status of {target_address:?}.");
699 false
700 };
701
702 QueryResponse::CheckNodeInProblem {
703 reporter_address: NetworkAddress::from(network.peer_id()),
704 target_address,
705 is_in_trouble,
706 }
707 }
708 Query::GetClosestPeers {
709 key,
710 num_of_peers,
711 range,
712 sign_result,
713 } => {
714 debug!(
715 "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
716 );
717 Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
718 .await
719 }
720 Query::GetVersion(_) => QueryResponse::GetVersion {
721 peer: NetworkAddress::from(network.peer_id()),
722 version: ant_build_info::package_version(),
723 },
724 Query::PutRecord {
725 holder,
726 address,
727 serialized_record,
728 } => {
729 let record = Record {
730 key: address.to_record_key(),
731 value: serialized_record,
732 publisher: None,
733 expires: None,
734 };
735
736 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
737 let result = match node.validate_and_store_record(record).await {
738 Ok(()) => Ok(()),
739 Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
740 node.record_metrics(Marker::RecordRejected(
741 &key,
742 &PutValidationError::OutdatedRecordCounter { counter, expected },
743 ));
744 Err(ProtocolError::OutdatedRecordCounter { counter, expected })
745 }
746 Err(PutValidationError::TopologyVerificationFailed {
747 target_address,
748 valid_count,
749 total_paid,
750 closest_count,
751 node_peers,
752 paid_peers,
753 }) => {
754 node.record_metrics(Marker::RecordRejected(
755 &key,
756 &PutValidationError::TopologyVerificationFailed {
757 target_address: target_address.clone(),
758 valid_count,
759 total_paid,
760 closest_count,
761 node_peers: node_peers.clone(),
762 paid_peers: paid_peers.clone(),
763 },
764 ));
765 Err(ProtocolError::TopologyVerificationFailed {
766 target_address: Box::new(target_address),
767 valid_count,
768 total_paid,
769 closest_count,
770 node_peers,
771 paid_peers,
772 })
773 }
774 Err(err) => {
775 node.record_metrics(Marker::RecordRejected(&key, &err));
776 Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
777 }
778 };
779 QueryResponse::PutRecord {
780 result,
781 peer_address: holder,
782 record_addr: address,
783 }
784 }
785 Query::GetMerkleCandidateQuote {
786 key,
787 data_type,
788 data_size,
789 merkle_payment_timestamp,
790 } => {
791 Self::respond_merkle_candidate_quote(
792 network,
793 key,
794 data_type,
795 data_size,
796 merkle_payment_timestamp,
797 payment_address,
798 )
799 .await
800 }
801 };
802 Response::Query(resp)
803 }
804
805 async fn respond_get_closest_peers(
806 network: &Network,
807 target: NetworkAddress,
808 num_of_peers: Option<usize>,
809 range: Option<[u8; 32]>,
810 sign_result: bool,
811 ) -> QueryResponse {
812 let local_peers = network.get_local_peers_with_multiaddr().await;
813 let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
814 Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
815 } else {
816 vec![]
817 };
818
819 let signature = if sign_result {
820 let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
821 bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
822 network.sign(&bytes).ok()
823 } else {
824 None
825 };
826
827 QueryResponse::GetClosestPeers {
828 target,
829 peers,
830 signature,
831 }
832 }
833
834 fn calculate_get_closest_peers(
835 peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
836 target: NetworkAddress,
837 num_of_peers: Option<usize>,
838 range: Option<[u8; 32]>,
839 ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
840 match (num_of_peers, range) {
841 (_, Some(value)) => {
842 let distance = U256::from_big_endian(&value);
843 peer_addrs
844 .iter()
845 .filter_map(|(peer_id, multi_addrs)| {
846 let addr = NetworkAddress::from(*peer_id);
847 if target.distance(&addr).0 <= distance {
848 Some((addr, multi_addrs.clone()))
849 } else {
850 None
851 }
852 })
853 .collect()
854 }
855 (Some(num_of_peers), _) => {
856 let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
857 .iter()
858 .map(|(peer_id, multi_addrs)| {
859 let addr = NetworkAddress::from(*peer_id);
860 (addr, multi_addrs.clone())
861 })
862 .collect();
863 result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
864 result.into_iter().take(num_of_peers).collect()
865 }
866 (None, None) => vec![],
867 }
868 }
869
870 async fn respond_merkle_candidate_quote(
874 network: &Network,
875 key: NetworkAddress,
876 data_type: u32,
877 data_size: usize,
878 merkle_payment_timestamp: u64,
879 payment_address: RewardsAddress,
880 ) -> QueryResponse {
881 debug!(
882 "merkle payment: GetMerkleCandidateQuote for target {key:?}, timestamp: {merkle_payment_timestamp}, data_type: {data_type}, data_size: {data_size}"
883 );
884
885 const TIMESTAMP_TOLERANCE: u64 = 24 * 60 * 60; let now = std::time::SystemTime::now()
895 .duration_since(std::time::UNIX_EPOCH)
896 .unwrap_or_default()
897 .as_secs();
898
899 let future_threshold = now + TIMESTAMP_TOLERANCE;
901 if merkle_payment_timestamp > future_threshold {
902 let error_msg = format!(
903 "Rejected future timestamp {merkle_payment_timestamp} (current time: {now}, threshold: {future_threshold})"
904 );
905 warn!("{error_msg} for {key:?}");
906 return QueryResponse::GetMerkleCandidateQuote(Err(
907 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
908 ));
909 }
910
911 let expiration_threshold = MERKLE_PAYMENT_EXPIRATION + TIMESTAMP_TOLERANCE;
913 let age = now.saturating_sub(merkle_payment_timestamp);
914 if age > expiration_threshold {
915 let error_msg = format!(
916 "Rejected expired timestamp {merkle_payment_timestamp} (age: {age}s, max: {expiration_threshold}s)",
917 );
918 warn!("{error_msg} for {key:?}");
919 return QueryResponse::GetMerkleCandidateQuote(Err(
920 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
921 ));
922 }
923
924 let record_key = key.to_record_key();
926 let (quoting_metrics, _is_already_stored) = match network
927 .get_local_quoting_metrics(record_key, data_type, data_size)
928 .await
929 {
930 Ok(metrics) => metrics,
931 Err(err) => {
932 let error_msg = format!("Failed to get quoting metrics for {key:?}: {err}");
933 warn!("{error_msg}");
934 return QueryResponse::GetMerkleCandidateQuote(Err(
935 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
936 ));
937 }
938 };
939
940 let pub_key = network.get_pub_key();
942 let reward_address = payment_address;
943 let bytes = ant_evm::merkle_payments::MerklePaymentCandidateNode::bytes_to_sign(
944 "ing_metrics,
945 &reward_address,
946 merkle_payment_timestamp,
947 );
948 let signature = match network.sign(&bytes) {
949 Ok(sig) => sig,
950 Err(e) => {
951 let error_msg = format!("Failed to sign candidate node for {key:?}: {e}");
952 error!("{error_msg}");
953 return QueryResponse::GetMerkleCandidateQuote(Err(
954 ProtocolError::FailedToSignMerkleCandidate(error_msg),
955 ));
956 }
957 };
958
959 let candidate = ant_evm::merkle_payments::MerklePaymentCandidateNode {
960 quoting_metrics,
961 reward_address,
962 merkle_payment_timestamp,
963 pub_key,
964 signature,
965 };
966 QueryResponse::GetMerkleCandidateQuote(Ok(candidate))
967 }
968
969 async fn respond_x_closest_record_proof(
972 network: &Network,
973 key: NetworkAddress,
974 nonce: Nonce,
975 difficulty: usize,
976 chunk_only: bool,
977 ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
978 let start = Instant::now();
979 let mut results = vec![];
980 if difficulty == 1 {
981 let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
983 if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
984 let proof = ChunkProof::new(&record.value, nonce);
985 debug!("Chunk proof for {key:?} is {proof:?}");
986 result = Ok(proof)
987 } else {
988 debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
989 }
990
991 results.push((key.clone(), result));
992 } else {
993 let all_local_records = network.get_all_local_record_addresses().await;
994
995 if let Ok(all_local_records) = all_local_records {
996 let mut all_chunk_addrs: Vec<_> = if chunk_only {
997 all_local_records
998 .iter()
999 .filter_map(|(addr, record_type)| {
1000 if *record_type == ValidationType::Chunk {
1001 Some(addr.clone())
1002 } else {
1003 None
1004 }
1005 })
1006 .collect()
1007 } else {
1008 all_local_records.keys().cloned().collect()
1009 };
1010
1011 all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
1013
1014 let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
1016
1017 for addr in all_chunk_addrs.iter().take(workload_factor) {
1018 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
1019 {
1020 let proof = ChunkProof::new(&record.value, nonce);
1021 debug!("Chunk proof for {key:?} is {proof:?}");
1022 results.push((addr.clone(), Ok(proof)));
1023 }
1024 }
1025 }
1026
1027 info!(
1028 "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
1029 results.len(),
1030 start.elapsed()
1031 );
1032 }
1033
1034 results
1035 }
1036
1037 async fn storage_challenge(network: Network) {
1041 let start = Instant::now();
1042 let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
1043 network.get_k_closest_local_peers_to_the_target(None).await
1044 {
1045 closest_peers
1046 .into_iter()
1047 .take(CLOSE_GROUP_SIZE)
1048 .collect_vec()
1049 } else {
1050 error!("Cannot get local neighbours");
1051 return;
1052 };
1053 if closest_peers.len() < CLOSE_GROUP_SIZE {
1054 debug!(
1055 "Not enough neighbours ({}/{}) to carry out storage challenge.",
1056 closest_peers.len(),
1057 CLOSE_GROUP_SIZE
1058 );
1059 return;
1060 }
1061
1062 let mut verify_candidates: Vec<NetworkAddress> =
1063 if let Ok(all_keys) = network.get_all_local_record_addresses().await {
1064 all_keys
1065 .iter()
1066 .filter_map(|(addr, record_type)| {
1067 if ValidationType::Chunk == *record_type {
1068 Some(addr.clone())
1069 } else {
1070 None
1071 }
1072 })
1073 .collect()
1074 } else {
1075 error!("Failed to get local record addresses.");
1076 return;
1077 };
1078 let num_of_targets = verify_candidates.len();
1079 if num_of_targets < 50 {
1080 debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
1081 return;
1082 }
1083
1084 let self_addr = NetworkAddress::from(network.peer_id());
1087 verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
1088 let index: usize = OsRng.gen_range(0..num_of_targets / 2);
1089 let target = verify_candidates[index].clone();
1090 let difficulty = CLOSE_GROUP_SIZE;
1092 verify_candidates.sort_by_key(|addr| target.distance(addr));
1093 let expected_targets = verify_candidates.into_iter().take(difficulty);
1094 let nonce: Nonce = thread_rng().r#gen::<u64>();
1095 let mut expected_proofs = HashMap::new();
1096 for addr in expected_targets {
1097 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
1098 let expected_proof = ChunkProof::new(&record.value, nonce);
1099 let _ = expected_proofs.insert(addr, expected_proof);
1100 } else {
1101 error!("Local record {addr:?} cann't be loaded from disk.");
1102 }
1103 }
1104 let request = Request::Query(Query::GetChunkExistenceProof {
1105 key: target.clone(),
1106 nonce,
1107 difficulty,
1108 });
1109
1110 let mut tasks = JoinSet::new();
1111 for (peer_id, addresses) in closest_peers {
1112 if peer_id == network.peer_id() {
1113 continue;
1114 }
1115 let network_clone = network.clone();
1116 let request_clone = request.clone();
1117 let expected_proofs_clone = expected_proofs.clone();
1118 let _ = tasks.spawn(async move {
1119 let res = scoring_peer(
1120 network_clone,
1121 (peer_id, addresses),
1122 request_clone,
1123 expected_proofs_clone,
1124 )
1125 .await;
1126 (peer_id, res)
1127 });
1128 }
1129
1130 let mut peer_scores = vec![];
1131 while let Some(res) = tasks.join_next().await {
1132 match res {
1133 Ok((peer_id, score)) => {
1134 let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
1135 if !is_healthy {
1136 info!(
1137 "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
1138 );
1139 network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
1141 }
1142 peer_scores.push((peer_id, is_healthy));
1143 }
1144 Err(e) => {
1145 info!("StorageChallenge task completed with error {e:?}");
1146 }
1147 }
1148 }
1149 if !peer_scores.is_empty() {
1150 network.notify_peer_scores(peer_scores);
1151 }
1152
1153 info!(
1154 "Completed node StorageChallenge against neighbours in {:?}!",
1155 start.elapsed()
1156 );
1157 }
1158
1159 async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1161 for (peer_id, addrs) in peers {
1163 Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1164 }
1165 }
1166
1167 async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1169 let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1170 let version = match network.send_request(request, peer, addrs).await {
1172 Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1173 trace!("Fetched peer version {peer:?} as {version:?}");
1174 version
1175 }
1176 Ok(other) => {
1177 info!("Not a fetched peer version {peer:?}, {other:?}");
1178 "none".to_string()
1179 }
1180 Err(err) => {
1181 info!("Failed to fetch peer version {peer:?} with error {err:?}");
1182 if let NetworkError::OutboundError(OutboundFailure::DialFailure) = err {
1186 network.remove_peer(peer);
1187 return;
1188 }
1189 "old".to_string()
1190 }
1191 };
1192 network.notify_node_version(peer, version);
1193 }
1194}
1195
1196async fn scoring_peer(
1197 network: Network,
1198 peer: (PeerId, Addresses),
1199 request: Request,
1200 expected_proofs: HashMap<NetworkAddress, ChunkProof>,
1201) -> usize {
1202 let peer_id = peer.0;
1203 let start = Instant::now();
1204 let responses = network
1205 .send_and_get_responses(&[peer], &request, true)
1206 .await;
1207
1208 if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
1209 responses.get(&peer_id)
1210 {
1211 if answers.is_empty() {
1212 info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
1213 return 0;
1214 }
1215 let elapsed = start.elapsed();
1216
1217 let mut received_proofs = vec![];
1218 for (addr, proof) in answers {
1219 if let Ok(proof) = proof {
1220 received_proofs.push((addr.clone(), proof.clone()));
1221 }
1222 }
1223
1224 let score = mark_peer(elapsed, received_proofs, &expected_proofs);
1225 info!(
1226 "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
1227 answers.len()
1228 );
1229 score
1230 } else {
1231 info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
1232 0
1233 }
1234}
1235
1236fn mark_peer(
1242 duration: Duration,
1243 answers: Vec<(NetworkAddress, ChunkProof)>,
1244 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1245) -> usize {
1246 let duration_score = duration_score_scheme(duration);
1247 let challenge_score = challenge_score_scheme(answers, expected_proofs);
1248
1249 duration_score * challenge_score
1250}
1251
1252fn duration_score_scheme(duration: Duration) -> usize {
1254 let in_ms = if let Some(value) = duration.as_millis().to_usize() {
1256 value
1257 } else {
1258 info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
1259 1000
1260 };
1261
1262 let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
1263 HIGHEST_SCORE - step
1264}
1265
1266fn challenge_score_scheme(
1268 answers: Vec<(NetworkAddress, ChunkProof)>,
1269 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1270) -> usize {
1271 let mut correct_answers = 0;
1272 for (addr, chunk_proof) in answers {
1273 if let Some(expected_proof) = expected_proofs.get(&addr) {
1274 if expected_proof.verify(&chunk_proof) {
1275 correct_answers += 1;
1276 } else {
1277 info!("Spot a false answer to the challenge regarding {addr:?}");
1278 return 0;
1280 }
1281 }
1282 }
1283 std::cmp::min(
1290 HIGHEST_SCORE,
1291 HIGHEST_SCORE * correct_answers / expected_proofs.len(),
1292 )
1293}
1294
1295#[cfg(test)]
1296mod tests {
1297 use super::*;
1298 use std::str::FromStr;
1299
1300 #[test]
1301 fn test_no_local_peers() {
1302 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
1303 let target = NetworkAddress::from(PeerId::random());
1304 let num_of_peers = Some(5);
1305 let range = None;
1306 let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
1307
1308 assert_eq!(result, vec![]);
1309 }
1310
1311 #[test]
1312 fn test_fewer_local_peers_than_num_of_peers() {
1313 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1314 (
1315 PeerId::random(),
1316 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1317 ),
1318 (
1319 PeerId::random(),
1320 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1321 ),
1322 (
1323 PeerId::random(),
1324 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1325 ),
1326 ];
1327 let target = NetworkAddress::from(PeerId::random());
1328 let num_of_peers = Some(2);
1329 let range = None;
1330 let result = Node::calculate_get_closest_peers(
1331 local_peers.clone(),
1332 target.clone(),
1333 num_of_peers,
1334 range,
1335 );
1336
1337 let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1339 .iter()
1340 .map(|(peer_id, multi_addrs)| {
1341 let addr = NetworkAddress::from(*peer_id);
1342 (addr, multi_addrs.clone())
1343 })
1344 .collect();
1345 expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
1346 let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
1347
1348 assert_eq!(expected_result, result);
1349 }
1350
1351 #[test]
1352 fn test_with_range_and_num_of_peers() {
1353 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1354 (
1355 PeerId::random(),
1356 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1357 ),
1358 (
1359 PeerId::random(),
1360 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1361 ),
1362 (
1363 PeerId::random(),
1364 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1365 ),
1366 ];
1367 let target = NetworkAddress::from(PeerId::random());
1368 let num_of_peers = Some(0);
1369 let range_value = [128; 32];
1370 let range = Some(range_value);
1371 let result = Node::calculate_get_closest_peers(
1372 local_peers.clone(),
1373 target.clone(),
1374 num_of_peers,
1375 range,
1376 );
1377
1378 let distance = U256::from_big_endian(&range_value);
1380 let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1381 .into_iter()
1382 .filter_map(|(peer_id, multi_addrs)| {
1383 let addr = NetworkAddress::from(peer_id);
1384 if target.distance(&addr).0 <= distance {
1385 Some((addr, multi_addrs.clone()))
1386 } else {
1387 None
1388 }
1389 })
1390 .collect();
1391
1392 assert_eq!(expected_result, result);
1393 }
1394
1395 mod merkle_payment_tests {
1396 use super::*;
1397
1398 #[test]
1400 fn test_timestamp_validation_accepts_valid_timestamp() {
1401 let now = std::time::SystemTime::now()
1402 .duration_since(std::time::UNIX_EPOCH)
1403 .unwrap()
1404 .as_secs();
1405
1406 let valid_timestamp = now - 3600;
1408
1409 let age = now.saturating_sub(valid_timestamp);
1411
1412 assert!(
1413 valid_timestamp <= now,
1414 "Valid timestamp should not be in the future"
1415 );
1416 assert!(
1417 age <= MERKLE_PAYMENT_EXPIRATION,
1418 "Valid timestamp should not be expired"
1419 );
1420 }
1421
1422 #[test]
1424 fn test_timestamp_validation_rejects_future_timestamp() {
1425 let now = std::time::SystemTime::now()
1426 .duration_since(std::time::UNIX_EPOCH)
1427 .unwrap()
1428 .as_secs();
1429
1430 let future_timestamp = now + 3600;
1432
1433 assert!(
1435 future_timestamp > now,
1436 "Future timestamp should be rejected"
1437 );
1438 }
1439
1440 #[test]
1442 fn test_timestamp_validation_rejects_expired_timestamp() {
1443 let now = std::time::SystemTime::now()
1444 .duration_since(std::time::UNIX_EPOCH)
1445 .unwrap()
1446 .as_secs();
1447
1448 let expired_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 86400);
1450
1451 let age = now.saturating_sub(expired_timestamp);
1453
1454 assert!(
1456 age > MERKLE_PAYMENT_EXPIRATION,
1457 "Expired timestamp should be rejected"
1458 );
1459 }
1460
1461 #[test]
1463 fn test_timestamp_validation_at_expiration_boundary() {
1464 let now = std::time::SystemTime::now()
1465 .duration_since(std::time::UNIX_EPOCH)
1466 .unwrap()
1467 .as_secs();
1468
1469 let boundary_timestamp = now - MERKLE_PAYMENT_EXPIRATION;
1471
1472 let age = now.saturating_sub(boundary_timestamp);
1473
1474 assert_eq!(age, MERKLE_PAYMENT_EXPIRATION);
1476 assert!(
1478 age <= MERKLE_PAYMENT_EXPIRATION,
1479 "Timestamp exactly at boundary should not be rejected"
1480 );
1481 }
1482
1483 #[test]
1485 fn test_timestamp_validation_beyond_expiration_boundary() {
1486 let now = std::time::SystemTime::now()
1487 .duration_since(std::time::UNIX_EPOCH)
1488 .unwrap()
1489 .as_secs();
1490
1491 let beyond_boundary_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 1);
1493
1494 let age = now.saturating_sub(beyond_boundary_timestamp);
1495
1496 assert!(
1497 age > MERKLE_PAYMENT_EXPIRATION,
1498 "Timestamp beyond boundary should be rejected"
1499 );
1500 }
1501
1502 #[test]
1504 fn test_timestamp_validation_at_current_time() {
1505 let now = std::time::SystemTime::now()
1506 .duration_since(std::time::UNIX_EPOCH)
1507 .unwrap()
1508 .as_secs();
1509
1510 let current_timestamp = now;
1512
1513 let age = now.saturating_sub(current_timestamp);
1514
1515 assert!(
1516 current_timestamp <= now,
1517 "Current timestamp should not be in future"
1518 );
1519 assert!(
1520 age <= MERKLE_PAYMENT_EXPIRATION,
1521 "Current timestamp should not be expired"
1522 );
1523 assert_eq!(age, 0, "Age should be 0 for current timestamp");
1524 }
1525
1526 #[test]
1528 fn test_timestamp_validation_near_future_boundary() {
1529 let now = std::time::SystemTime::now()
1530 .duration_since(std::time::UNIX_EPOCH)
1531 .unwrap()
1532 .as_secs();
1533
1534 let near_future_timestamp = now + 1;
1536
1537 assert!(
1538 near_future_timestamp > now,
1539 "Near-future timestamp should be rejected"
1540 );
1541 }
1542
1543 #[test]
1545 fn test_merkle_payment_expiration_constant() {
1546 const SEVEN_DAYS_IN_SECONDS: u64 = 7 * 24 * 60 * 60;
1547 assert_eq!(
1548 MERKLE_PAYMENT_EXPIRATION, SEVEN_DAYS_IN_SECONDS,
1549 "MERKLE_PAYMENT_EXPIRATION should be 7 days"
1550 );
1551 }
1552 }
1553}