1use super::{
10 Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkError, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::bootstrap::Bootstrap;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_evm::merkle_payments::MERKLE_PAYMENT_EXPIRATION;
22use ant_protocol::{
23 CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
24 error::Error as ProtocolError,
25 messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
26 storage::{try_deserialize_record, Chunk, ValidationType},
27};
28use bytes::Bytes;
29use futures::stream::{self, StreamExt};
30use itertools::Itertools;
31use libp2p::{
32 Multiaddr, PeerId,
33 identity::Keypair,
34 kad::{KBucketDistance as Distance, Record, U256},
35 request_response::OutboundFailure,
36};
37use num_traits::cast::ToPrimitive;
38use rand::{
39 Rng, SeedableRng,
40 rngs::{OsRng, StdRng},
41 seq::SliceRandom,
42 thread_rng,
43};
44use std::{
45 collections::{BTreeSet, HashMap, HashSet},
46 net::SocketAddr,
47 path::PathBuf,
48 sync::{
49 Arc,
50 atomic::{AtomicUsize, Ordering},
51 },
52 time::{Duration, Instant},
53};
54use tokio::sync::{Mutex, watch};
55use tokio::{
56 sync::mpsc::Receiver,
57 task::{JoinSet, spawn},
58};
59
60pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
63
64const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
67
68const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
70
71const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
74
75const HIGHEST_SCORE: usize = 100;
77
78const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
81
82const TIME_STEP: usize = 20;
84
85const REPLICA_FETCH_RETRY_DELAY: Duration = Duration::from_secs(15 * 60);
87
88const REPLICA_FETCH_PEER_COUNT: usize = 5;
90
91const MIN_HEALTHY_REPLICA_COUNT: usize = 3;
93
94const CLOSE_NEIGHBOUR_DISTANCE_INDEX: usize = 7;
96
97const CLOSE_GROUP_TRACKING_LIMIT: usize = 20;
104
105const CLOSE_GROUP_RESTART_SUPPRESSION: Duration = Duration::from_secs(90);
115
116pub struct NodeBuilder {
118 addr: SocketAddr,
119 bootstrap: Bootstrap,
120 evm_address: RewardsAddress,
121 evm_network: EvmNetwork,
122 identity_keypair: Keypair,
123 local: bool,
124 #[cfg(feature = "open-metrics")]
125 metrics_server_port: Option<u16>,
127 no_upnp: bool,
128 relay_client: bool,
129 root_dir: PathBuf,
130}
131
132impl NodeBuilder {
133 pub fn new(
136 identity_keypair: Keypair,
137 bootstrap_flow: Bootstrap,
138 evm_address: RewardsAddress,
139 evm_network: EvmNetwork,
140 addr: SocketAddr,
141 root_dir: PathBuf,
142 ) -> Self {
143 Self {
144 addr,
145 bootstrap: bootstrap_flow,
146 evm_address,
147 evm_network,
148 identity_keypair,
149 local: false,
150 #[cfg(feature = "open-metrics")]
151 metrics_server_port: None,
152 no_upnp: false,
153 relay_client: false,
154 root_dir,
155 }
156 }
157
158 pub fn local(&mut self, local: bool) {
160 self.local = local;
161 }
162
163 #[cfg(feature = "open-metrics")]
164 pub fn metrics_server_port(&mut self, port: Option<u16>) {
166 self.metrics_server_port = port;
167 }
168
169 pub fn relay_client(&mut self, relay_client: bool) {
171 self.relay_client = relay_client;
172 }
173
174 pub fn no_upnp(&mut self, no_upnp: bool) {
176 self.no_upnp = no_upnp;
177 }
178
179 pub fn build_and_run(self) -> Result<RunningNode> {
192 #[cfg(feature = "open-metrics")]
194 let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
195 let mut metrics_registries = MetricsRegistries::default();
197 let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
198
199 (Some(metrics_recorder), metrics_registries)
200 } else {
201 (None, MetricsRegistries::default())
202 };
203
204 let (shutdown_tx, shutdown_rx) = watch::channel(false);
206
207 let network_config = NetworkConfig {
209 keypair: self.identity_keypair,
210 local: self.local,
211 listen_addr: self.addr,
212 root_dir: self.root_dir.clone(),
213 shutdown_rx: shutdown_rx.clone(),
214 bootstrap: self.bootstrap,
215 no_upnp: self.no_upnp,
216 relay_client: self.relay_client,
217 custom_request_timeout: None,
218 #[cfg(feature = "open-metrics")]
219 metrics_registries,
220 #[cfg(feature = "open-metrics")]
221 metrics_server_port: self.metrics_server_port,
222 };
223 let (network, network_event_receiver) = Network::init(network_config)?;
224
225 let node_events_channel = NodeEventsChannel::default();
227 let node = NodeInner {
228 network: network.clone(),
229 events_channel: node_events_channel.clone(),
230 close_group_tracker: Mutex::new(CloseGroupTracker::new(network.peer_id())),
231 reward_address: self.evm_address,
232 #[cfg(feature = "open-metrics")]
233 metrics_recorder,
234 evm_network: self.evm_network,
235 };
236 let node = Node {
237 inner: Arc::new(node),
238 };
239
240 node.run(network_event_receiver, shutdown_rx);
242 let running_node = RunningNode {
243 shutdown_sender: shutdown_tx,
244 network,
245 node_events_channel,
246 root_dir_path: self.root_dir,
247 rewards_address: self.evm_address,
248 };
249
250 Ok(running_node)
251 }
252}
253
254#[derive(Clone)]
258pub(crate) struct Node {
259 inner: Arc<NodeInner>,
260}
261
262struct NodeInner {
265 events_channel: NodeEventsChannel,
266 network: Network,
267 close_group_tracker: Mutex<CloseGroupTracker>,
268 #[cfg(feature = "open-metrics")]
269 metrics_recorder: Option<NodeMetricsRecorder>,
270 reward_address: RewardsAddress,
271 evm_network: EvmNetwork,
272}
273
274impl Node {
275 pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
277 &self.inner.events_channel
278 }
279
280 pub(crate) fn network(&self) -> &Network {
282 &self.inner.network
283 }
284
285 fn close_group_tracker(&self) -> &Mutex<CloseGroupTracker> {
286 &self.inner.close_group_tracker
287 }
288
289 #[cfg(feature = "open-metrics")]
290 pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
293 self.inner.metrics_recorder.as_ref()
294 }
295
296 pub(crate) fn reward_address(&self) -> &RewardsAddress {
298 &self.inner.reward_address
299 }
300
301 pub(crate) fn evm_network(&self) -> &EvmNetwork {
302 &self.inner.evm_network
303 }
304
305 fn run(
308 self,
309 mut network_event_receiver: Receiver<NetworkEvent>,
310 mut shutdown_rx: watch::Receiver<bool>,
311 ) {
312 let mut rng = StdRng::from_entropy();
313
314 let peers_connected = Arc::new(AtomicUsize::new(0));
315
316 let _node_task = spawn(async move {
317 let replication_interval: u64 = rng.gen_range(
320 PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
321 );
322 let replication_interval_time = Duration::from_secs(replication_interval);
323 debug!("Replication interval set to {replication_interval_time:?}");
324
325 let mut replication_interval = tokio::time::interval(replication_interval_time);
326 let _ = replication_interval.tick().await; let mut uptime_metrics_update_interval =
329 tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
330 let _ = uptime_metrics_update_interval.tick().await; let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
335 UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
336 ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
337 );
338 let irrelevant_records_cleanup_interval_time =
339 Duration::from_secs(irrelevant_records_cleanup_interval);
340 let mut irrelevant_records_cleanup_interval =
341 tokio::time::interval(irrelevant_records_cleanup_interval_time);
342 let _ = irrelevant_records_cleanup_interval.tick().await; let storage_challenge_interval: u64 =
347 rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
348 let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
349 debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
350
351 let mut storage_challenge_interval =
352 tokio::time::interval(storage_challenge_interval_time);
353 let _ = storage_challenge_interval.tick().await; loop {
356 let peers_connected = &peers_connected;
357
358 tokio::select! {
359 result = shutdown_rx.changed() => {
361 if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
362 info!("Shutdown signal received or sender dropped. Exiting network events loop.");
363 break;
364 }
365 },
366 net_event = network_event_receiver.recv() => {
367 match net_event {
368 Some(event) => {
369 let start = Instant::now();
370 let event_string = format!("{event:?}");
371
372 self.handle_network_event(event, peers_connected).await;
373 trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
374
375 }
376 None => {
377 error!("The `NetworkEvent` channel is closed");
378 self.events_channel().broadcast(NodeEvent::ChannelClosed);
379 break;
380 }
381 }
382 }
383 _ = replication_interval.tick() => {
385 let now = Instant::now();
386
387 {
388 let mut tracker = self.close_group_tracker().lock().await;
389 if tracker.handle_timer_expiry(now) {
390 trace!("Replication timer expired for tracked peers; periodic run will cover it");
391 }
392 }
393
394 let start = now;
395 let network = self.network().clone();
396 self.record_metrics(Marker::IntervalReplicationTriggered);
397
398 let _handle = spawn(async move {
399 Self::try_interval_replication(network);
400 trace!("Periodic replication took {:?}", start.elapsed());
401 });
402 }
403 _ = uptime_metrics_update_interval.tick() => {
404 #[cfg(feature = "open-metrics")]
405 if let Some(metrics_recorder) = self.metrics_recorder() {
406 let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
407 }
408 }
409 _ = irrelevant_records_cleanup_interval.tick() => {
410 let network = self.network().clone();
411
412 let _handle = spawn(async move {
413 Self::trigger_irrelevant_record_cleanup(network);
414 });
415 }
416 _ = storage_challenge_interval.tick() => {
418 let start = Instant::now();
419 debug!("Periodic storage challenge triggered");
420 let network = self.network().clone();
421
422 let _handle = spawn(async move {
423 Self::storage_challenge(network).await;
424 trace!("Periodic storage challenge took {:?}", start.elapsed());
425 });
426 }
427 }
428 }
429 });
430 }
431
432 pub(crate) fn record_metrics(&self, marker: Marker) {
435 marker.log();
436 #[cfg(feature = "open-metrics")]
437 if let Some(metrics_recorder) = self.metrics_recorder() {
438 metrics_recorder.record(marker)
439 }
440 }
441
442 async fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
447 let start = Instant::now();
448 let event_string = format!("{event:?}");
449 let event_header;
450
451 if let NetworkEvent::QueryRequestReceived {
453 query: Query::GetVersion { .. },
454 ..
455 } = event
456 {
457 trace!("Handling NetworkEvent {event_string}");
458 } else {
459 debug!("Handling NetworkEvent {event_string}");
460 }
461
462 match event {
463 NetworkEvent::PeerAdded(peer_id, connected_peers) => {
464 event_header = "PeerAdded";
465 let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
467 if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
468 self.events_channel()
469 .broadcast(NodeEvent::ConnectedToNetwork);
470 }
471
472 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
473 self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
474
475 let network = self.network().clone();
477 let _handle = spawn(async move {
478 Self::try_query_peer_version(network, peer_id, Default::default()).await;
479 });
480
481 let replication_decision = {
482 let mut tracker = self.close_group_tracker().lock().await;
483 tracker.record_peer_added(peer_id)
484 };
485
486 if replication_decision.should_trigger() {
487 let network = self.network().clone();
488 self.record_metrics(Marker::IntervalReplicationTriggered);
489 let _handle = spawn(async move {
490 Self::try_interval_replication(network);
491 });
492 } else if matches!(replication_decision, ReplicationDirective::Skip) {
493 trace!(
494 "Replication skipped for {peer_id:?} addition due to restart behaviour tracking"
495 );
496 }
497 }
498 NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
499 event_header = "PeerRemoved";
500 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
501 self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
502
503 let self_id = self.network().peer_id();
504 let distance =
505 NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
506 info!(
507 "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
508 distance.ilog2()
509 );
510
511 let replication_decision = {
512 let mut tracker = self.close_group_tracker().lock().await;
513 tracker.record_peer_removed(peer_id)
514 };
515
516 if replication_decision.should_trigger() {
517 let network = self.network().clone();
518 self.record_metrics(Marker::IntervalReplicationTriggered);
519 let _handle = spawn(async move {
520 Self::try_interval_replication(network);
521 });
522 } else if matches!(replication_decision, ReplicationDirective::Skip) {
523 trace!(
524 "Replication skipped for {peer_id:?} removal due to restart behaviour tracking"
525 );
526 }
527 }
528 NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
529 event_header = "PeerWithUnsupportedProtocol";
530 }
531 NetworkEvent::NewListenAddr(_) => {
532 event_header = "NewListenAddr";
533 }
534 NetworkEvent::ResponseReceived { res } => {
535 event_header = "ResponseReceived";
536 if let Err(err) = self.handle_response(res) {
537 error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
538 }
539 }
540 NetworkEvent::KeysToFetchForReplication(keys) => {
541 event_header = "KeysToFetchForReplication";
542 self.record_metrics(Marker::fetching_keys_for_replication(&keys));
543
544 if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
545 error!("Error while trying to fetch replicated data {err:?}");
546 }
547 }
548 NetworkEvent::QueryRequestReceived { query, channel } => {
549 event_header = "QueryRequestReceived";
550 let node = self.clone();
551 let payment_address = *self.reward_address();
552
553 let _handle = spawn(async move {
554 let network = node.network().clone();
555 let res = Self::handle_query(node, query, payment_address).await;
556
557 if let Response::Query(QueryResponse::GetVersion { .. }) = res {
559 trace!("Sending response {res:?}");
560 } else {
561 debug!("Sending response {res:?}");
562 }
563
564 network.send_response(res, channel);
565 });
566 }
567 NetworkEvent::UnverifiedRecord(record) => {
568 event_header = "UnverifiedRecord";
569 let self_clone = self.clone();
571 let _handle = spawn(async move {
572 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
573 match self_clone.validate_and_store_record(record).await {
574 Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
575 Err(err) => {
576 self_clone.record_metrics(Marker::RecordRejected(&key, &err));
577 }
578 }
579 });
580 }
581 NetworkEvent::TerminateNode { reason } => {
582 event_header = "TerminateNode";
583 error!("Received termination from swarm_driver due to {reason:?}");
584 self.events_channel()
585 .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
586 }
587 NetworkEvent::FailedToFetchHolders(bad_nodes) => {
588 event_header = "FailedToFetchHolders";
589 let network = self.network().clone();
590 let pretty_log: Vec<_> = bad_nodes
591 .iter()
592 .map(|(peer_id, record_key)| {
593 let pretty_key = PrettyPrintRecordKey::from(record_key);
594 (peer_id, pretty_key)
595 })
596 .collect();
597 debug!(
601 "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
602 );
603 let _handle = spawn(async move {
604 for (peer_id, record_key) in bad_nodes {
605 if let Ok(false) = network.is_record_key_present_locally(&record_key).await
608 {
609 error!(
610 "From peer {peer_id:?}, failed to fetch record {:?}",
611 PrettyPrintRecordKey::from(&record_key)
612 );
613 network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
614 }
615 }
616 });
617 }
618 NetworkEvent::QuoteVerification { quotes } => {
619 event_header = "QuoteVerification";
620 let network = self.network().clone();
621
622 let _handle = spawn(async move {
623 quotes_verification(&network, quotes).await;
624 });
625 }
626 NetworkEvent::FreshReplicateToFetch { holder, keys } => {
627 event_header = "FreshReplicateToFetch";
628 self.fresh_replicate_to_fetch(holder, keys);
629 }
630 NetworkEvent::PeersForVersionQuery(peers) => {
631 event_header = "PeersForVersionQuery";
632 let network = self.network().clone();
633 let _handle = spawn(async move {
634 Self::query_peers_version(network, peers).await;
635 });
636 }
637 NetworkEvent::NetworkWideReplication { keys } => {
638 event_header = "NetworkWideReplication";
639 self.perform_network_wide_replication(keys);
640 }
641 }
642
643 trace!(
644 "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
645 start.elapsed()
646 );
647 }
648
649 fn handle_response(&self, response: Response) -> Result<()> {
651 match response {
652 Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
653 warn!("Mishandled replicate response, should be handled earlier");
655 }
656 Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
657 error!(
658 "Response to replication shall be handled by called not by common handler, {resp:?}"
659 );
660 }
661 Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
662 }
664 other => {
665 warn!("handle_response not implemented for {other:?}");
666 }
667 };
668
669 Ok(())
670 }
671
672 async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
673 let network = node.network();
674 let resp: QueryResponse = match query {
675 Query::GetStoreQuote {
676 key,
677 data_type,
678 data_size,
679 nonce,
680 difficulty,
681 } => {
682 let record_key = key.to_record_key();
683 let self_id = network.peer_id();
684
685 let maybe_quoting_metrics = network
686 .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
687 .await;
688
689 let storage_proofs = if let Some(nonce) = nonce {
690 Self::respond_x_closest_record_proof(
691 network,
692 key.clone(),
693 nonce,
694 difficulty,
695 false,
696 )
697 .await
698 } else {
699 vec![]
700 };
701
702 match maybe_quoting_metrics {
703 Ok((quoting_metrics, is_already_stored)) => {
704 if is_already_stored {
705 QueryResponse::GetStoreQuote {
706 quote: Err(ProtocolError::RecordExists(
707 PrettyPrintRecordKey::from(&record_key).into_owned(),
708 )),
709 peer_address: NetworkAddress::from(self_id),
710 storage_proofs,
711 }
712 } else {
713 QueryResponse::GetStoreQuote {
714 quote: Self::create_quote_for_storecost(
715 network,
716 &key,
717 "ing_metrics,
718 &payment_address,
719 ),
720 peer_address: NetworkAddress::from(self_id),
721 storage_proofs,
722 }
723 }
724 }
725 Err(err) => {
726 warn!("GetStoreQuote failed for {key:?}: {err}");
727 QueryResponse::GetStoreQuote {
728 quote: Err(ProtocolError::GetStoreQuoteFailed),
729 peer_address: NetworkAddress::from(self_id),
730 storage_proofs,
731 }
732 }
733 }
734 }
735 Query::GetReplicatedRecord { requester: _, key } => {
736 let our_address = NetworkAddress::from(network.peer_id());
737 let record_key = key.to_record_key();
738
739 let result = match network.get_local_record(&record_key).await {
740 Ok(Some(record)) => Ok((our_address, Bytes::from(record.value))),
741 Ok(None) => Err(ProtocolError::ReplicatedRecordNotFound {
742 holder: Box::new(our_address),
743 key: Box::new(key.clone()),
744 }),
745 Err(err) => Err(ProtocolError::PutRecordFailed(format!(
747 "Error to fetch local record for GetReplicatedRecord {err:?}"
748 ))),
749 };
750
751 QueryResponse::GetReplicatedRecord(result)
752 }
753 Query::GetChunkExistenceProof {
754 key,
755 nonce,
756 difficulty,
757 } => QueryResponse::GetChunkExistenceProof(
758 Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
759 ),
760 Query::CheckNodeInProblem(target_address) => {
761 debug!("Got CheckNodeInProblem for peer {target_address:?}");
762
763 let is_in_trouble =
764 if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
765 result
766 } else {
767 debug!("Could not get status of {target_address:?}.");
768 false
769 };
770
771 QueryResponse::CheckNodeInProblem {
772 reporter_address: NetworkAddress::from(network.peer_id()),
773 target_address,
774 is_in_trouble,
775 }
776 }
777 Query::GetClosestPeers {
778 key,
779 num_of_peers,
780 range,
781 sign_result,
782 } => {
783 debug!(
784 "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
785 );
786 Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
787 .await
788 }
789 Query::GetVersion(_) => QueryResponse::GetVersion {
790 peer: NetworkAddress::from(network.peer_id()),
791 version: ant_build_info::package_version(),
792 },
793 Query::PutRecord {
794 holder,
795 address,
796 serialized_record,
797 } => {
798 let record = Record {
799 key: address.to_record_key(),
800 value: serialized_record,
801 publisher: None,
802 expires: None,
803 };
804
805 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
806 let result = match node.validate_and_store_record(record).await {
807 Ok(()) => Ok(()),
808 Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
809 node.record_metrics(Marker::RecordRejected(
810 &key,
811 &PutValidationError::OutdatedRecordCounter { counter, expected },
812 ));
813 Err(ProtocolError::OutdatedRecordCounter { counter, expected })
814 }
815 Err(PutValidationError::TopologyVerificationFailed {
816 target_address,
817 valid_count,
818 total_paid,
819 closest_count,
820 node_peers,
821 paid_peers,
822 }) => {
823 node.record_metrics(Marker::RecordRejected(
824 &key,
825 &PutValidationError::TopologyVerificationFailed {
826 target_address: target_address.clone(),
827 valid_count,
828 total_paid,
829 closest_count,
830 node_peers: node_peers.clone(),
831 paid_peers: paid_peers.clone(),
832 },
833 ));
834 Err(ProtocolError::TopologyVerificationFailed {
835 target_address: Box::new(target_address),
836 valid_count,
837 total_paid,
838 closest_count,
839 node_peers,
840 paid_peers,
841 })
842 }
843 Err(err) => {
844 node.record_metrics(Marker::RecordRejected(&key, &err));
845 Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
846 }
847 };
848 QueryResponse::PutRecord {
849 result,
850 peer_address: holder,
851 record_addr: address,
852 }
853 }
854 Query::GetMerkleCandidateQuote {
855 key,
856 data_type,
857 data_size,
858 merkle_payment_timestamp,
859 } => {
860 Self::respond_merkle_candidate_quote(
861 network,
862 key,
863 data_type,
864 data_size,
865 merkle_payment_timestamp,
866 payment_address,
867 )
868 .await
869 }
870 };
871 Response::Query(resp)
872 }
873
874 async fn respond_get_closest_peers(
875 network: &Network,
876 target: NetworkAddress,
877 num_of_peers: Option<usize>,
878 range: Option<[u8; 32]>,
879 sign_result: bool,
880 ) -> QueryResponse {
881 let local_peers = network.get_local_peers_with_multiaddr().await;
882 let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
883 Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
884 } else {
885 vec![]
886 };
887
888 let signature = if sign_result {
889 let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
890 bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
891 network.sign(&bytes).ok()
892 } else {
893 None
894 };
895
896 QueryResponse::GetClosestPeers {
897 target,
898 peers,
899 signature,
900 }
901 }
902
903 fn calculate_get_closest_peers(
904 peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
905 target: NetworkAddress,
906 num_of_peers: Option<usize>,
907 range: Option<[u8; 32]>,
908 ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
909 match (num_of_peers, range) {
910 (_, Some(value)) => {
911 let distance = U256::from_big_endian(&value);
912 peer_addrs
913 .iter()
914 .filter_map(|(peer_id, multi_addrs)| {
915 let addr = NetworkAddress::from(*peer_id);
916 if target.distance(&addr).0 <= distance {
917 Some((addr, multi_addrs.clone()))
918 } else {
919 None
920 }
921 })
922 .collect()
923 }
924 (Some(num_of_peers), _) => {
925 let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
926 .iter()
927 .map(|(peer_id, multi_addrs)| {
928 let addr = NetworkAddress::from(*peer_id);
929 (addr, multi_addrs.clone())
930 })
931 .collect();
932 result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
933 result.into_iter().take(num_of_peers).collect()
934 }
935 (None, None) => vec![],
936 }
937 }
938
939 async fn respond_merkle_candidate_quote(
943 network: &Network,
944 key: NetworkAddress,
945 data_type: u32,
946 data_size: usize,
947 merkle_payment_timestamp: u64,
948 payment_address: RewardsAddress,
949 ) -> QueryResponse {
950 debug!(
951 "merkle payment: GetMerkleCandidateQuote for target {key:?}, timestamp: {merkle_payment_timestamp}, data_type: {data_type}, data_size: {data_size}"
952 );
953
954 const TIMESTAMP_TOLERANCE: u64 = 24 * 60 * 60; let now = std::time::SystemTime::now()
964 .duration_since(std::time::UNIX_EPOCH)
965 .unwrap_or_default()
966 .as_secs();
967
968 let future_threshold = now + TIMESTAMP_TOLERANCE;
970 if merkle_payment_timestamp > future_threshold {
971 let error_msg = format!(
972 "Rejected future timestamp {merkle_payment_timestamp} (current time: {now}, threshold: {future_threshold})"
973 );
974 warn!("{error_msg} for {key:?}");
975 return QueryResponse::GetMerkleCandidateQuote(Err(
976 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
977 ));
978 }
979
980 let expiration_threshold = MERKLE_PAYMENT_EXPIRATION + TIMESTAMP_TOLERANCE;
982 let age = now.saturating_sub(merkle_payment_timestamp);
983 if age > expiration_threshold {
984 let error_msg = format!(
985 "Rejected expired timestamp {merkle_payment_timestamp} (age: {age}s, max: {expiration_threshold}s)",
986 );
987 warn!("{error_msg} for {key:?}");
988 return QueryResponse::GetMerkleCandidateQuote(Err(
989 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
990 ));
991 }
992
993 let record_key = key.to_record_key();
995 let (quoting_metrics, _is_already_stored) = match network
996 .get_local_quoting_metrics(record_key, data_type, data_size)
997 .await
998 {
999 Ok(metrics) => metrics,
1000 Err(err) => {
1001 let error_msg = format!("Failed to get quoting metrics for {key:?}: {err}");
1002 warn!("{error_msg}");
1003 return QueryResponse::GetMerkleCandidateQuote(Err(
1004 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1005 ));
1006 }
1007 };
1008
1009 let pub_key = network.get_pub_key();
1011 let reward_address = payment_address;
1012 let bytes = ant_evm::merkle_payments::MerklePaymentCandidateNode::bytes_to_sign(
1013 "ing_metrics,
1014 &reward_address,
1015 merkle_payment_timestamp,
1016 );
1017 let signature = match network.sign(&bytes) {
1018 Ok(sig) => sig,
1019 Err(e) => {
1020 let error_msg = format!("Failed to sign candidate node for {key:?}: {e}");
1021 error!("{error_msg}");
1022 return QueryResponse::GetMerkleCandidateQuote(Err(
1023 ProtocolError::FailedToSignMerkleCandidate(error_msg),
1024 ));
1025 }
1026 };
1027
1028 let candidate = ant_evm::merkle_payments::MerklePaymentCandidateNode {
1029 quoting_metrics,
1030 reward_address,
1031 merkle_payment_timestamp,
1032 pub_key,
1033 signature,
1034 };
1035 QueryResponse::GetMerkleCandidateQuote(Ok(candidate))
1036 }
1037
1038 async fn respond_x_closest_record_proof(
1041 network: &Network,
1042 key: NetworkAddress,
1043 nonce: Nonce,
1044 difficulty: usize,
1045 chunk_only: bool,
1046 ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
1047 let start = Instant::now();
1048 let mut results = vec![];
1049 if difficulty == 1 {
1050 let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
1052 if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
1053 let proof = ChunkProof::new(&record.value, nonce);
1054 debug!("Chunk proof for {key:?} is {proof:?}");
1055 result = Ok(proof)
1056 } else {
1057 debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
1058 }
1059
1060 results.push((key.clone(), result));
1061 } else {
1062 let all_local_records = network.get_all_local_record_addresses().await;
1063
1064 if let Ok(all_local_records) = all_local_records {
1065 let mut all_chunk_addrs: Vec<_> = if chunk_only {
1066 all_local_records
1067 .iter()
1068 .filter_map(|(addr, record_type)| {
1069 if *record_type == ValidationType::Chunk {
1070 Some(addr.clone())
1071 } else {
1072 None
1073 }
1074 })
1075 .collect()
1076 } else {
1077 all_local_records.keys().cloned().collect()
1078 };
1079
1080 all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
1082
1083 let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
1085
1086 for addr in all_chunk_addrs.iter().take(workload_factor) {
1087 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
1088 {
1089 let proof = ChunkProof::new(&record.value, nonce);
1090 debug!("Chunk proof for {key:?} is {proof:?}");
1091 results.push((addr.clone(), Ok(proof)));
1092 }
1093 }
1094 }
1095
1096 info!(
1097 "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
1098 results.len(),
1099 start.elapsed()
1100 );
1101 }
1102
1103 results
1104 }
1105
1106 async fn storage_challenge(network: Network) {
1110 let start = Instant::now();
1111 let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
1112 network.get_k_closest_local_peers_to_the_target(None).await
1113 {
1114 closest_peers
1115 .into_iter()
1116 .take(CLOSE_GROUP_SIZE)
1117 .collect_vec()
1118 } else {
1119 error!("Cannot get local neighbours");
1120 return;
1121 };
1122 if closest_peers.len() < CLOSE_GROUP_SIZE {
1123 debug!(
1124 "Not enough neighbours ({}/{}) to carry out storage challenge.",
1125 closest_peers.len(),
1126 CLOSE_GROUP_SIZE
1127 );
1128 return;
1129 }
1130
1131 let mut verify_candidates: Vec<NetworkAddress> =
1132 if let Ok(all_keys) = network.get_all_local_record_addresses().await {
1133 all_keys
1134 .iter()
1135 .filter_map(|(addr, record_type)| {
1136 if ValidationType::Chunk == *record_type {
1137 Some(addr.clone())
1138 } else {
1139 None
1140 }
1141 })
1142 .collect()
1143 } else {
1144 error!("Failed to get local record addresses.");
1145 return;
1146 };
1147 let num_of_targets = verify_candidates.len();
1148 if num_of_targets < 50 {
1149 debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
1150 return;
1151 }
1152
1153 let self_addr = NetworkAddress::from(network.peer_id());
1156 verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
1157 let index: usize = OsRng.gen_range(0..num_of_targets / 2);
1158 let target = verify_candidates[index].clone();
1159 let difficulty = CLOSE_GROUP_SIZE;
1161 verify_candidates.sort_by_key(|addr| target.distance(addr));
1162 let expected_targets = verify_candidates.into_iter().take(difficulty);
1163 let nonce: Nonce = thread_rng().r#gen::<u64>();
1164 let mut expected_proofs = HashMap::new();
1165 for addr in expected_targets {
1166 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
1167 let expected_proof = ChunkProof::new(&record.value, nonce);
1168 let _ = expected_proofs.insert(addr, expected_proof);
1169 } else {
1170 error!("Local record {addr:?} cann't be loaded from disk.");
1171 }
1172 }
1173 let request = Request::Query(Query::GetChunkExistenceProof {
1174 key: target.clone(),
1175 nonce,
1176 difficulty,
1177 });
1178
1179 let mut tasks = JoinSet::new();
1180 for (peer_id, addresses) in closest_peers {
1181 if peer_id == network.peer_id() {
1182 continue;
1183 }
1184 let network_clone = network.clone();
1185 let request_clone = request.clone();
1186 let expected_proofs_clone = expected_proofs.clone();
1187 let _ = tasks.spawn(async move {
1188 let res = scoring_peer(
1189 network_clone,
1190 (peer_id, addresses),
1191 request_clone,
1192 expected_proofs_clone,
1193 )
1194 .await;
1195 (peer_id, res)
1196 });
1197 }
1198
1199 let mut peer_scores = vec![];
1200 while let Some(res) = tasks.join_next().await {
1201 match res {
1202 Ok((peer_id, score)) => {
1203 let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
1204 if !is_healthy {
1205 info!(
1206 "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
1207 );
1208 network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
1210 }
1211 peer_scores.push((peer_id, is_healthy));
1212 }
1213 Err(e) => {
1214 info!("StorageChallenge task completed with error {e:?}");
1215 }
1216 }
1217 }
1218 if !peer_scores.is_empty() {
1219 network.notify_peer_scores(peer_scores);
1220 }
1221
1222 Self::verify_local_replication_health(network.clone()).await;
1223
1224 info!(
1225 "Completed node StorageChallenge against neighbours in {:?}!",
1226 start.elapsed()
1227 );
1228 }
1229
1230 async fn verify_local_replication_health(network: Network) {
1232 let closest_peers = match network
1233 .get_k_closest_local_peers_to_the_target(None)
1234 .await
1235 {
1236 Ok(peers) => peers,
1237 Err(err) => {
1238 warn!("Cannot fetch closest peers for replica verification: {err:?}");
1239 return;
1240 }
1241 };
1242
1243 if closest_peers.len() <= CLOSE_NEIGHBOUR_DISTANCE_INDEX {
1244 debug!(
1245 "Skipping replica verification as we only know {} neighbours (need > {}).",
1246 closest_peers.len(),
1247 CLOSE_NEIGHBOUR_DISTANCE_INDEX
1248 );
1249 return;
1250 }
1251
1252 let self_address = NetworkAddress::from(network.peer_id());
1253 let Some((threshold_peer, _)) = closest_peers.get(CLOSE_NEIGHBOUR_DISTANCE_INDEX) else {
1254 debug!("Unable to determine distance threshold for replica verification.");
1255 return;
1256 };
1257
1258 let threshold_distance =
1259 self_address.distance(&NetworkAddress::from(*threshold_peer));
1260 let Some(threshold_ilog2) = threshold_distance.ilog2() else {
1261 debug!("Threshold distance lacks ilog2; cannot proceed with replica verification.");
1262 return;
1263 };
1264
1265 let local_records = match network.get_all_local_record_addresses().await {
1266 Ok(records) => records,
1267 Err(err) => {
1268 warn!("Failed to list local records for replica verification: {err:?}");
1269 return;
1270 }
1271 };
1272
1273 let mut nearby_records: Vec<NetworkAddress> = local_records
1274 .into_iter()
1275 .filter_map(|(address, record_type)| {
1276 if record_type != ValidationType::Chunk {
1277 return None;
1278 }
1279 let distance = self_address.distance(&address);
1280 distance
1281 .ilog2()
1282 .and_then(|record_ilog2| {
1283 if record_ilog2 <= threshold_ilog2 {
1284 Some(address)
1285 } else {
1286 None
1287 }
1288 })
1289 })
1290 .collect();
1291
1292 nearby_records.shuffle(&mut thread_rng());
1293 let target_record = if let Some(entry) = nearby_records.first().cloned() {
1294 entry
1295 } else {
1296 debug!("No nearby chunk records available for replica verification.");
1297 return;
1298 };
1299
1300 let pretty_key =
1301 PrettyPrintRecordKey::from(&target_record.to_record_key()).into_owned();
1302
1303 let candidate_peers = match network
1304 .get_k_closest_local_peers_to_the_target(Some(target_record.clone()))
1305 .await
1306 {
1307 Ok(peers) => peers
1308 .into_iter()
1309 .filter(|(peer_id, _)| peer_id != &network.peer_id())
1310 .take(REPLICA_FETCH_PEER_COUNT)
1311 .collect::<Vec<_>>(),
1312 Err(err) => {
1313 warn!(
1314 "Cannot fetch record-specific closest peers for replica verification: {err:?}"
1315 );
1316 return;
1317 }
1318 };
1319
1320 if candidate_peers.len() < REPLICA_FETCH_PEER_COUNT {
1321 debug!(
1322 "Only {} peers available for replica verification (need at least {}).",
1323 candidate_peers.len(),
1324 REPLICA_FETCH_PEER_COUNT
1325 );
1326 return;
1327 }
1328
1329 debug!(
1330 "Verifying replicated record {pretty_key:?} against {} closest peers.",
1331 candidate_peers.len()
1332 );
1333
1334 let (successful_peers, failed_peers) = Self::fetch_record_from_peers_with_addresses(
1335 network.clone(),
1336 target_record.clone(),
1337 candidate_peers,
1338 )
1339 .await;
1340
1341 if failed_peers.is_empty() {
1342 debug!(
1343 "All peers returned record {pretty_key:?} during replica verification."
1344 );
1345 return;
1346 }
1347
1348 if successful_peers.len() < MIN_HEALTHY_REPLICA_COUNT {
1349 warn!(
1350 "Replica verification fetched only {} copies of {pretty_key:?}. Record is unhealthy; skipping peer classification.",
1351 successful_peers.len()
1352 );
1353 return;
1354 }
1355
1356 if !failed_peers.is_empty() {
1357 info!(
1358 "Scheduling replica verification retry for {} peers on record {pretty_key:?}.",
1359 failed_peers.len()
1360 );
1361 Self::schedule_record_fetch_retry(network.clone(), target_record.clone(), failed_peers).await;
1362 }
1363 }
1364
1365 async fn fetch_record_from_peers_with_addresses(
1367 network: Network,
1368 record_address: NetworkAddress,
1369 peers: Vec<(PeerId, Addresses)>,
1370 ) -> (Vec<PeerId>, Vec<PeerId>) {
1371 let request = Request::Query(Query::GetReplicatedRecord {
1372 requester: NetworkAddress::from(network.peer_id()),
1373 key: record_address.clone(),
1374 });
1375 let expected_key = record_address.to_record_key();
1376 let pretty_key = PrettyPrintRecordKey::from(&expected_key).into_owned();
1377
1378 let mut successes = Vec::new();
1379 let mut failures = Vec::new();
1380 let concurrency = peers.len();
1381 let results = stream::iter(
1382 peers
1383 .into_iter()
1384 .map(|(peer_id, addrs)| {
1385 let network_clone = network.clone();
1386 let request_clone = request.clone();
1387 async move {
1388 let result = network_clone
1389 .send_request(request_clone, peer_id, addrs.clone())
1390 .await;
1391 (peer_id, addrs, result)
1392 }
1393 }),
1394 )
1395 .buffer_unordered(concurrency)
1396 .collect::<Vec<_>>()
1397 .await;
1398
1399 for res in results {
1400 match res {
1401 (peer_id, _addrs, Ok((Response::Query(QueryResponse::GetReplicatedRecord(result)), _))) => {
1402 match result {
1403 Ok((_holder, value)) => {
1404 let record = Record::new(record_address.to_record_key(), value.to_vec());
1406 if let Ok(chunk) = try_deserialize_record::<Chunk>(&record)
1407 && chunk.network_address().to_record_key() == expected_key {
1408 successes.push(peer_id);
1409 } else {
1410 warn!(
1411 "Peer {peer_id:?} responded with an incorrect chunk copy of {pretty_key:?}."
1412 );
1413 failures.push(peer_id);
1414 }
1415 }
1416 Err(err) => {
1417 info!(
1418 "Peer {peer_id:?} responded with error {err:?} for replicated record {pretty_key:?}."
1419 );
1420 failures.push(peer_id);
1421 }
1422 }
1423 }
1424 (peer_id, _addrs, Ok((other_response, _))) => {
1425 warn!(
1426 "Peer {peer_id:?} responded with unexpected message {other_response:?} for {pretty_key:?}."
1427 );
1428 failures.push(peer_id);
1429 }
1430 (peer_id, _addrs, Err(err)) => {
1431 info!(
1432 "Failed to reach peer {peer_id:?} for replicated record {pretty_key:?}: {err:?}"
1433 );
1434 failures.push(peer_id);
1435 }
1436 }
1437 }
1438
1439 (successes, failures)
1440 }
1441
1442 async fn schedule_record_fetch_retry(
1444 network: Network,
1445 record_address: NetworkAddress,
1446 failed_peers: Vec<PeerId>,
1447 ) {
1448 let retry_peers: HashSet<_> = failed_peers.into_iter().collect();
1449 if retry_peers.is_empty() {
1450 return;
1451 }
1452
1453 let record_clone = record_address.clone();
1454 let network_clone = network.clone();
1455
1456 tokio::time::sleep(REPLICA_FETCH_RETRY_DELAY).await;
1457 let pretty_key =
1458 PrettyPrintRecordKey::from(&record_clone.to_record_key()).into_owned();
1459
1460 let refreshed_candidates = Self::refresh_retry_candidate_addresses(
1461 &network_clone,
1462 &record_clone,
1463 &retry_peers,
1464 )
1465 .await;
1466
1467 if refreshed_candidates.is_empty() {
1468 info!(
1469 "Skipping replica retry for {pretty_key:?}; no tracked peers remain close to the record."
1470 );
1471 return;
1472 }
1473
1474 let (_, still_failed) = Self::fetch_record_from_peers_with_addresses(
1475 network_clone.clone(),
1476 record_clone.clone(),
1477 refreshed_candidates,
1478 )
1479 .await;
1480
1481 if still_failed.is_empty() {
1482 info!(
1483 "All peers successfully returned {pretty_key:?} during replica retry."
1484 );
1485 return;
1486 }
1487
1488 warn!(
1489 "{} peers still failed to provide {pretty_key:?}; evicting and blacklisting.",
1490 still_failed.len()
1491 );
1492 for peer_id in still_failed {
1493 Self::evict_and_blacklist_peer(network_clone.clone(), peer_id);
1494 }
1495 }
1496
1497 async fn refresh_retry_candidate_addresses(
1499 network: &Network,
1500 record_address: &NetworkAddress,
1501 retry_peers: &HashSet<PeerId>,
1502 ) -> Vec<(PeerId, Addresses)> {
1503 let pretty_key =
1504 PrettyPrintRecordKey::from(&record_address.to_record_key()).into_owned();
1505 let Ok(closest_peers) = network.get_closest_peers(record_address).await else {
1506 warn!(
1507 "Failed to refresh peer addresses for {pretty_key:?}; unable to retry replica fetch."
1508 );
1509 return Vec::new();
1510 };
1511
1512 let closest_map: HashMap<PeerId, Addresses> = closest_peers
1513 .into_iter()
1514 .collect();
1515
1516 retry_peers
1517 .iter()
1518 .filter_map(|peer_id| closest_map.get(peer_id).cloned().map(|addrs| (*peer_id, addrs)))
1519 .collect()
1520 }
1521
1522 fn evict_and_blacklist_peer(network: Network, peer_id: PeerId) {
1524 network.remove_peer(peer_id);
1525 network.add_peer_to_blocklist(peer_id);
1526 }
1527
1528 async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1530 for (peer_id, addrs) in peers {
1532 Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1533 }
1534 }
1535
1536 async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1538 let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1539 let version = match network.send_request(request, peer, addrs).await {
1541 Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1542 trace!("Fetched peer version {peer:?} as {version:?}");
1543 version
1544 }
1545 Ok(other) => {
1546 info!("Not a fetched peer version {peer:?}, {other:?}");
1547 "none".to_string()
1548 }
1549 Err(err) => {
1550 info!("Failed to fetch peer version {peer:?} with error {err:?}");
1551 if let NetworkError::OutboundError(OutboundFailure::DialFailure) = err {
1555 network.remove_peer(peer);
1556 return;
1557 }
1558 "old".to_string()
1559 }
1560 };
1561 network.notify_node_version(peer, version);
1562 }
1563}
1564
1565#[derive(Debug)]
1566struct CloseGroupTracker {
1567 self_address: NetworkAddress,
1568 close_up_peers: BTreeSet<(Distance, PeerId)>,
1572 tracked_entries: HashMap<PeerId, BehaviourEntry>,
1573}
1574
1575impl CloseGroupTracker {
1576 fn new(self_peer_id: PeerId) -> Self {
1577 Self {
1578 self_address: NetworkAddress::from(self_peer_id),
1579 close_up_peers: BTreeSet::new(),
1580 tracked_entries: HashMap::new(),
1581 }
1582 }
1583
1584 fn record_peer_added(&mut self, peer_id: PeerId) -> ReplicationDirective {
1585 let distance = self.distance_to_peer(peer_id);
1586
1587 let was_present = self.close_up_peers.contains(&(distance, peer_id));
1589
1590 let is_close_enough = self.is_distance_within_close_range(distance);
1592
1593 if is_close_enough {
1594 let _ = self.close_up_peers.insert((distance, peer_id));
1596
1597 if self.close_up_peers.len() > CLOSE_GROUP_TRACKING_LIMIT
1599 && let Some(&(farthest_distance, farthest_peer)) =
1600 self.close_up_peers.iter().next_back()
1601 {
1602 let _ = self.close_up_peers.remove(&(farthest_distance, farthest_peer));
1603 }
1604 } else {
1605 let _ = self.tracked_entries.remove(&peer_id);
1607 return ReplicationDirective::Ignore;
1608 }
1609
1610 use std::collections::hash_map::Entry;
1611 match self.tracked_entries.entry(peer_id) {
1612 Entry::Vacant(vacant_entry) => {
1613 let _ = vacant_entry.insert(BehaviourEntry::default());
1614 if !was_present {
1615 ReplicationDirective::Trigger
1616 } else {
1617 ReplicationDirective::Ignore
1618 }
1619 }
1620 Entry::Occupied(mut occupied_entry) => {
1621 let entry = occupied_entry.get_mut();
1622 if entry.awaiting_rejoin || entry.timer_deadline.is_some() {
1623 entry.restart_detected = true;
1624 entry.awaiting_rejoin = false;
1625 entry.timer_deadline = None;
1626 ReplicationDirective::Skip
1627 } else if !was_present {
1628 entry.restart_detected = false;
1629 ReplicationDirective::Trigger
1630 } else {
1631 ReplicationDirective::Ignore
1632 }
1633 }
1634 }
1635 }
1636
1637 fn record_peer_removed(&mut self, peer_id: PeerId) -> ReplicationDirective {
1638 let distance = self.distance_to_peer(peer_id);
1639
1640 let was_tracked = self.close_up_peers.contains(&(distance, peer_id));
1642
1643 if !was_tracked {
1644 return ReplicationDirective::Ignore;
1646 }
1647
1648 let _ = self.close_up_peers.remove(&(distance, peer_id));
1650
1651 let entry = self.tracked_entries.entry(peer_id).or_default();
1652
1653 if entry.restart_detected {
1654 entry.timer_deadline = Some(Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION);
1655 entry.awaiting_rejoin = false;
1656 ReplicationDirective::Skip
1657 } else {
1658 entry.awaiting_rejoin = true;
1659 entry.timer_deadline = None;
1660 ReplicationDirective::Trigger
1661 }
1662 }
1663
1664 fn handle_timer_expiry(&mut self, now: Instant) -> bool {
1665 let mut expired = false;
1666 for entry in self.tracked_entries.values_mut() {
1667 if let Some(deadline) = entry.timer_deadline
1668 && now >= deadline
1669 {
1670 entry.timer_deadline = None;
1671 entry.restart_detected = false;
1672 entry.awaiting_rejoin = false;
1673 expired = true;
1674 }
1675 }
1676
1677 self.tracked_entries.retain(|peer_id, entry| {
1679 let is_currently_present = self
1680 .close_up_peers
1681 .iter()
1682 .any(|(_, present_peer)| present_peer == peer_id);
1683
1684 entry.awaiting_rejoin
1685 || entry.restart_detected
1686 || entry.timer_deadline.is_some()
1687 || is_currently_present
1688 });
1689
1690 expired
1691 }
1692
1693 fn is_distance_within_close_range(&self, distance: Distance) -> bool {
1694 if self.close_up_peers.len() < CLOSE_GROUP_TRACKING_LIMIT {
1696 return true;
1697 }
1698
1699 if let Some(&(farthest_distance, _)) = self.close_up_peers.iter().next_back() {
1701 distance < farthest_distance
1702 } else {
1703 true
1704 }
1705 }
1706
1707 fn distance_to_peer(&self, peer_id: PeerId) -> Distance {
1708 self.self_address.distance(&NetworkAddress::from(peer_id))
1709 }
1710}
1711
1712#[derive(Debug, Default)]
1713struct BehaviourEntry {
1714 awaiting_rejoin: bool,
1715 restart_detected: bool,
1716 timer_deadline: Option<Instant>,
1717}
1718
1719#[derive(Debug, PartialEq, Eq)]
1720enum ReplicationDirective {
1721 Trigger,
1722 Skip,
1723 Ignore,
1724}
1725
1726impl ReplicationDirective {
1727 fn should_trigger(&self) -> bool {
1728 matches!(self, Self::Trigger)
1729 }
1730}
1731
1732#[cfg(test)]
1733mod close_group_tracker_tests {
1734 use super::*;
1735 use std::time::Duration;
1736
1737 fn random_peer() -> PeerId {
1738 let keypair = libp2p::identity::Keypair::generate_ed25519();
1739 PeerId::from(keypair.public())
1740 }
1741
1742 #[test]
1743 fn new_peer_triggers_replication() {
1744 let mut tracker = CloseGroupTracker::new(random_peer());
1745 let peer = random_peer();
1746
1747 assert_eq!(
1748 tracker.record_peer_added(peer),
1749 ReplicationDirective::Trigger
1750 );
1751
1752 assert_eq!(
1753 tracker.record_peer_removed(peer),
1754 ReplicationDirective::Trigger
1755 );
1756 }
1757
1758 #[test]
1759 fn restart_detection_skips_replication() {
1760 let mut tracker = CloseGroupTracker::new(random_peer());
1761 let peer = random_peer();
1762
1763 assert_eq!(
1764 tracker.record_peer_added(peer),
1765 ReplicationDirective::Trigger
1766 );
1767
1768 assert_eq!(
1769 tracker.record_peer_removed(peer),
1770 ReplicationDirective::Trigger
1771 );
1772
1773 assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1774
1775 assert_eq!(
1776 tracker.record_peer_removed(peer),
1777 ReplicationDirective::Skip
1778 );
1779
1780 assert!(tracker.handle_timer_expiry(
1781 Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1)
1782 ));
1783
1784 assert_eq!(
1785 tracker.record_peer_added(peer),
1786 ReplicationDirective::Trigger
1787 );
1788 }
1789
1790 #[test]
1791 fn peer_outside_close_group_is_ignored() {
1792 let mut tracker = CloseGroupTracker::new(random_peer());
1793
1794 let mut added_peers = Vec::new();
1796 for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1797 let peer = random_peer();
1798 let result = tracker.record_peer_added(peer);
1799 assert_eq!(result, ReplicationDirective::Trigger);
1801 added_peers.push(peer);
1802 }
1803
1804 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1805
1806 let mut ignored_count = 0;
1808 let mut trigger_count = 0;
1809 for _ in 0..50 {
1810 let peer = random_peer();
1811 match tracker.record_peer_added(peer) {
1812 ReplicationDirective::Ignore => ignored_count += 1,
1813 ReplicationDirective::Trigger => trigger_count += 1,
1814 _ => {}
1815 }
1816 }
1817
1818 assert!(
1821 ignored_count > 0 || trigger_count > 0,
1822 "Expected some peers to be processed"
1823 );
1824
1825 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1827 }
1828
1829 #[test]
1830 fn removal_of_untracked_peer_is_ignored() {
1831 let mut tracker = CloseGroupTracker::new(random_peer());
1832
1833 let unknown_peer = random_peer();
1835 assert_eq!(
1836 tracker.record_peer_removed(unknown_peer),
1837 ReplicationDirective::Ignore
1838 );
1839 }
1840
1841 #[test]
1842 fn timer_expiry_resets_restart_state() {
1843 let mut tracker = CloseGroupTracker::new(random_peer());
1844 let peer = random_peer();
1845
1846 assert_eq!(
1848 tracker.record_peer_added(peer),
1849 ReplicationDirective::Trigger
1850 );
1851 assert_eq!(
1852 tracker.record_peer_removed(peer),
1853 ReplicationDirective::Trigger
1854 );
1855
1856 assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1858
1859 assert_eq!(
1861 tracker.record_peer_removed(peer),
1862 ReplicationDirective::Skip
1863 );
1864
1865 assert!(!tracker.handle_timer_expiry(Instant::now()));
1867
1868 let after_expiry = Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1870 assert!(tracker.handle_timer_expiry(after_expiry));
1871
1872 assert_eq!(
1874 tracker.record_peer_added(peer),
1875 ReplicationDirective::Trigger
1876 );
1877 }
1878
1879 #[test]
1880 fn eviction_maintains_closest_peers() {
1881 let self_peer = random_peer();
1882 let mut tracker = CloseGroupTracker::new(self_peer);
1883
1884 let mut peers_with_distances: Vec<(PeerId, Distance)> = Vec::new();
1886 for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1887 let peer = random_peer();
1888 let distance = tracker.distance_to_peer(peer);
1889 let _ = tracker.record_peer_added(peer);
1890 peers_with_distances.push((peer, distance));
1891 }
1892
1893 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1894
1895 peers_with_distances.sort_by_key(|(_, d)| *d);
1897 let farthest_distance = peers_with_distances.last().map(|(_, d)| *d).unwrap();
1898
1899 let mut found_closer = false;
1902 for _ in 0..100 {
1903 let new_peer = random_peer();
1904 let new_distance = tracker.distance_to_peer(new_peer);
1905 if new_distance < farthest_distance {
1906 let result = tracker.record_peer_added(new_peer);
1907 assert_eq!(result, ReplicationDirective::Trigger);
1908 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1910 found_closer = true;
1911 break;
1912 }
1913 }
1914
1915 assert!(
1918 found_closer,
1919 "Could not find a peer closer than the farthest in 100 attempts"
1920 );
1921 }
1922
1923 #[test]
1924 fn tracked_entries_cleaned_up_on_timer_expiry() {
1925 let mut tracker = CloseGroupTracker::new(random_peer());
1926 let peer = random_peer();
1927
1928 let _ = tracker.record_peer_added(peer);
1930 let _ = tracker.record_peer_removed(peer);
1931 let _ = tracker.record_peer_added(peer);
1932 let _ = tracker.record_peer_removed(peer);
1933
1934 assert!(tracker.tracked_entries.contains_key(&peer));
1936
1937 let after_expiry = Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1939 let _ = tracker.handle_timer_expiry(after_expiry);
1940
1941 assert!(!tracker.tracked_entries.contains_key(&peer));
1944 }
1945}
1946
1947async fn scoring_peer(
1948 network: Network,
1949 peer: (PeerId, Addresses),
1950 request: Request,
1951 expected_proofs: HashMap<NetworkAddress, ChunkProof>,
1952) -> usize {
1953 let peer_id = peer.0;
1954 let start = Instant::now();
1955 let responses = network
1956 .send_and_get_responses(&[peer], &request, true)
1957 .await;
1958
1959 if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
1960 responses.get(&peer_id)
1961 {
1962 if answers.is_empty() {
1963 info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
1964 return 0;
1965 }
1966 let elapsed = start.elapsed();
1967
1968 let mut received_proofs = vec![];
1969 for (addr, proof) in answers {
1970 if let Ok(proof) = proof {
1971 received_proofs.push((addr.clone(), proof.clone()));
1972 }
1973 }
1974
1975 let score = mark_peer(elapsed, received_proofs, &expected_proofs);
1976 info!(
1977 "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
1978 answers.len()
1979 );
1980 score
1981 } else {
1982 info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
1983 0
1984 }
1985}
1986
1987fn mark_peer(
1993 duration: Duration,
1994 answers: Vec<(NetworkAddress, ChunkProof)>,
1995 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1996) -> usize {
1997 let duration_score = duration_score_scheme(duration);
1998 let challenge_score = challenge_score_scheme(answers, expected_proofs);
1999
2000 duration_score * challenge_score
2001}
2002
2003fn duration_score_scheme(duration: Duration) -> usize {
2005 let in_ms = if let Some(value) = duration.as_millis().to_usize() {
2007 value
2008 } else {
2009 info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
2010 1000
2011 };
2012
2013 let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
2014 HIGHEST_SCORE - step
2015}
2016
2017fn challenge_score_scheme(
2019 answers: Vec<(NetworkAddress, ChunkProof)>,
2020 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
2021) -> usize {
2022 let mut correct_answers = 0;
2023 for (addr, chunk_proof) in answers {
2024 if let Some(expected_proof) = expected_proofs.get(&addr) {
2025 if expected_proof.verify(&chunk_proof) {
2026 correct_answers += 1;
2027 } else {
2028 info!("Spot a false answer to the challenge regarding {addr:?}");
2029 return 0;
2031 }
2032 }
2033 }
2034 std::cmp::min(
2041 HIGHEST_SCORE,
2042 HIGHEST_SCORE * correct_answers / expected_proofs.len(),
2043 )
2044}
2045
2046#[cfg(test)]
2047mod tests {
2048 use super::*;
2049 use std::str::FromStr;
2050
2051 #[test]
2052 fn test_no_local_peers() {
2053 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
2054 let target = NetworkAddress::from(PeerId::random());
2055 let num_of_peers = Some(5);
2056 let range = None;
2057 let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
2058
2059 assert_eq!(result, vec![]);
2060 }
2061
2062 #[test]
2063 fn test_fewer_local_peers_than_num_of_peers() {
2064 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2065 (
2066 PeerId::random(),
2067 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2068 ),
2069 (
2070 PeerId::random(),
2071 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2072 ),
2073 (
2074 PeerId::random(),
2075 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2076 ),
2077 ];
2078 let target = NetworkAddress::from(PeerId::random());
2079 let num_of_peers = Some(2);
2080 let range = None;
2081 let result = Node::calculate_get_closest_peers(
2082 local_peers.clone(),
2083 target.clone(),
2084 num_of_peers,
2085 range,
2086 );
2087
2088 let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2090 .iter()
2091 .map(|(peer_id, multi_addrs)| {
2092 let addr = NetworkAddress::from(*peer_id);
2093 (addr, multi_addrs.clone())
2094 })
2095 .collect();
2096 expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
2097 let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
2098
2099 assert_eq!(expected_result, result);
2100 }
2101
2102 #[test]
2103 fn test_with_range_and_num_of_peers() {
2104 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2105 (
2106 PeerId::random(),
2107 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2108 ),
2109 (
2110 PeerId::random(),
2111 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2112 ),
2113 (
2114 PeerId::random(),
2115 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2116 ),
2117 ];
2118 let target = NetworkAddress::from(PeerId::random());
2119 let num_of_peers = Some(0);
2120 let range_value = [128; 32];
2121 let range = Some(range_value);
2122 let result = Node::calculate_get_closest_peers(
2123 local_peers.clone(),
2124 target.clone(),
2125 num_of_peers,
2126 range,
2127 );
2128
2129 let distance = U256::from_big_endian(&range_value);
2131 let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2132 .into_iter()
2133 .filter_map(|(peer_id, multi_addrs)| {
2134 let addr = NetworkAddress::from(peer_id);
2135 if target.distance(&addr).0 <= distance {
2136 Some((addr, multi_addrs.clone()))
2137 } else {
2138 None
2139 }
2140 })
2141 .collect();
2142
2143 assert_eq!(expected_result, result);
2144 }
2145
2146 mod merkle_payment_tests {
2147 use super::*;
2148
2149 #[test]
2151 fn test_timestamp_validation_accepts_valid_timestamp() {
2152 let now = std::time::SystemTime::now()
2153 .duration_since(std::time::UNIX_EPOCH)
2154 .unwrap()
2155 .as_secs();
2156
2157 let valid_timestamp = now - 3600;
2159
2160 let age = now.saturating_sub(valid_timestamp);
2162
2163 assert!(
2164 valid_timestamp <= now,
2165 "Valid timestamp should not be in the future"
2166 );
2167 assert!(
2168 age <= MERKLE_PAYMENT_EXPIRATION,
2169 "Valid timestamp should not be expired"
2170 );
2171 }
2172
2173 #[test]
2175 fn test_timestamp_validation_rejects_future_timestamp() {
2176 let now = std::time::SystemTime::now()
2177 .duration_since(std::time::UNIX_EPOCH)
2178 .unwrap()
2179 .as_secs();
2180
2181 let future_timestamp = now + 3600;
2183
2184 assert!(
2186 future_timestamp > now,
2187 "Future timestamp should be rejected"
2188 );
2189 }
2190
2191 #[test]
2193 fn test_timestamp_validation_rejects_expired_timestamp() {
2194 let now = std::time::SystemTime::now()
2195 .duration_since(std::time::UNIX_EPOCH)
2196 .unwrap()
2197 .as_secs();
2198
2199 let expired_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 86400);
2201
2202 let age = now.saturating_sub(expired_timestamp);
2204
2205 assert!(
2207 age > MERKLE_PAYMENT_EXPIRATION,
2208 "Expired timestamp should be rejected"
2209 );
2210 }
2211
2212 #[test]
2214 fn test_timestamp_validation_at_expiration_boundary() {
2215 let now = std::time::SystemTime::now()
2216 .duration_since(std::time::UNIX_EPOCH)
2217 .unwrap()
2218 .as_secs();
2219
2220 let boundary_timestamp = now - MERKLE_PAYMENT_EXPIRATION;
2222
2223 let age = now.saturating_sub(boundary_timestamp);
2224
2225 assert_eq!(age, MERKLE_PAYMENT_EXPIRATION);
2227 assert!(
2229 age <= MERKLE_PAYMENT_EXPIRATION,
2230 "Timestamp exactly at boundary should not be rejected"
2231 );
2232 }
2233
2234 #[test]
2236 fn test_timestamp_validation_beyond_expiration_boundary() {
2237 let now = std::time::SystemTime::now()
2238 .duration_since(std::time::UNIX_EPOCH)
2239 .unwrap()
2240 .as_secs();
2241
2242 let beyond_boundary_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 1);
2244
2245 let age = now.saturating_sub(beyond_boundary_timestamp);
2246
2247 assert!(
2248 age > MERKLE_PAYMENT_EXPIRATION,
2249 "Timestamp beyond boundary should be rejected"
2250 );
2251 }
2252
2253 #[test]
2255 fn test_timestamp_validation_at_current_time() {
2256 let now = std::time::SystemTime::now()
2257 .duration_since(std::time::UNIX_EPOCH)
2258 .unwrap()
2259 .as_secs();
2260
2261 let current_timestamp = now;
2263
2264 let age = now.saturating_sub(current_timestamp);
2265
2266 assert!(
2267 current_timestamp <= now,
2268 "Current timestamp should not be in future"
2269 );
2270 assert!(
2271 age <= MERKLE_PAYMENT_EXPIRATION,
2272 "Current timestamp should not be expired"
2273 );
2274 assert_eq!(age, 0, "Age should be 0 for current timestamp");
2275 }
2276
2277 #[test]
2279 fn test_timestamp_validation_near_future_boundary() {
2280 let now = std::time::SystemTime::now()
2281 .duration_since(std::time::UNIX_EPOCH)
2282 .unwrap()
2283 .as_secs();
2284
2285 let near_future_timestamp = now + 1;
2287
2288 assert!(
2289 near_future_timestamp > now,
2290 "Near-future timestamp should be rejected"
2291 );
2292 }
2293
2294 #[test]
2296 fn test_merkle_payment_expiration_constant() {
2297 const SEVEN_DAYS_IN_SECONDS: u64 = 7 * 24 * 60 * 60;
2298 assert_eq!(
2299 MERKLE_PAYMENT_EXPIRATION, SEVEN_DAYS_IN_SECONDS,
2300 "MERKLE_PAYMENT_EXPIRATION should be 7 days"
2301 );
2302 }
2303 }
2304}