1use super::{
10 Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkError, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::bootstrap::Bootstrap;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_evm::merkle_payments::MERKLE_PAYMENT_EXPIRATION;
22use ant_protocol::{
23 CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
24 error::Error as ProtocolError,
25 messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
26 storage::{Chunk, ValidationType, try_deserialize_record},
27};
28use bytes::Bytes;
29use futures::stream::{self, StreamExt};
30use itertools::Itertools;
31use libp2p::{
32 Multiaddr, PeerId,
33 identity::Keypair,
34 kad::{KBucketDistance as Distance, Record, U256},
35 request_response::OutboundFailure,
36};
37use num_traits::cast::ToPrimitive;
38use rand::{
39 Rng, SeedableRng,
40 rngs::{OsRng, StdRng},
41 seq::SliceRandom,
42 thread_rng,
43};
44use std::{
45 collections::{BTreeSet, HashMap, HashSet},
46 net::SocketAddr,
47 path::PathBuf,
48 sync::{
49 Arc,
50 atomic::{AtomicUsize, Ordering},
51 },
52 time::{Duration, Instant},
53};
54use tokio::sync::{Mutex, watch};
55use tokio::{
56 sync::mpsc::Receiver,
57 task::{JoinSet, spawn},
58};
59
60pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
63
64const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
67
68const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
70
71const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
74
75const HIGHEST_SCORE: usize = 100;
77
78const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
81
82const TIME_STEP: usize = 20;
84
85const REPLICA_FETCH_RETRY_DELAY: Duration = Duration::from_secs(15 * 60);
87
88const REPLICA_FETCH_PEER_COUNT: usize = 5;
90
91const MIN_HEALTHY_REPLICA_COUNT: usize = 3;
93
94const CLOSE_NEIGHBOUR_DISTANCE_INDEX: usize = 7;
96
97const CLOSE_GROUP_TRACKING_LIMIT: usize = 20;
104
105const CLOSE_GROUP_RESTART_SUPPRESSION: Duration = Duration::from_secs(90);
115
116pub struct NodeBuilder {
118 addr: SocketAddr,
119 bootstrap: Bootstrap,
120 evm_address: RewardsAddress,
121 evm_network: EvmNetwork,
122 identity_keypair: Keypair,
123 local: bool,
124 #[cfg(feature = "open-metrics")]
125 metrics_server_port: Option<u16>,
127 no_upnp: bool,
128 relay_client: bool,
129 root_dir: PathBuf,
130}
131
132impl NodeBuilder {
133 pub fn new(
136 identity_keypair: Keypair,
137 bootstrap_flow: Bootstrap,
138 evm_address: RewardsAddress,
139 evm_network: EvmNetwork,
140 addr: SocketAddr,
141 root_dir: PathBuf,
142 ) -> Self {
143 Self {
144 addr,
145 bootstrap: bootstrap_flow,
146 evm_address,
147 evm_network,
148 identity_keypair,
149 local: false,
150 #[cfg(feature = "open-metrics")]
151 metrics_server_port: None,
152 no_upnp: false,
153 relay_client: false,
154 root_dir,
155 }
156 }
157
158 pub fn local(&mut self, local: bool) {
160 self.local = local;
161 }
162
163 #[cfg(feature = "open-metrics")]
164 pub fn metrics_server_port(&mut self, port: Option<u16>) {
166 self.metrics_server_port = port;
167 }
168
169 pub fn relay_client(&mut self, relay_client: bool) {
171 self.relay_client = relay_client;
172 }
173
174 pub fn no_upnp(&mut self, no_upnp: bool) {
176 self.no_upnp = no_upnp;
177 }
178
179 pub fn build_and_run(self) -> Result<RunningNode> {
192 #[cfg(feature = "open-metrics")]
194 let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
195 let mut metrics_registries = MetricsRegistries::default();
197 let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
198
199 (Some(metrics_recorder), metrics_registries)
200 } else {
201 (None, MetricsRegistries::default())
202 };
203
204 let (shutdown_tx, shutdown_rx) = watch::channel(false);
206
207 let network_config = NetworkConfig {
209 keypair: self.identity_keypair,
210 local: self.local,
211 listen_addr: self.addr,
212 root_dir: self.root_dir.clone(),
213 shutdown_rx: shutdown_rx.clone(),
214 bootstrap: self.bootstrap,
215 no_upnp: self.no_upnp,
216 relay_client: self.relay_client,
217 custom_request_timeout: None,
218 #[cfg(feature = "open-metrics")]
219 metrics_registries,
220 #[cfg(feature = "open-metrics")]
221 metrics_server_port: self.metrics_server_port,
222 };
223 let (network, network_event_receiver) = Network::init(network_config)?;
224
225 let node_events_channel = NodeEventsChannel::default();
227 let node = NodeInner {
228 network: network.clone(),
229 events_channel: node_events_channel.clone(),
230 close_group_tracker: Mutex::new(CloseGroupTracker::new(network.peer_id())),
231 reward_address: self.evm_address,
232 #[cfg(feature = "open-metrics")]
233 metrics_recorder,
234 evm_network: self.evm_network,
235 };
236 let node = Node {
237 inner: Arc::new(node),
238 };
239
240 node.run(network_event_receiver, shutdown_rx);
242 let running_node = RunningNode {
243 shutdown_sender: shutdown_tx,
244 network,
245 node_events_channel,
246 root_dir_path: self.root_dir,
247 rewards_address: self.evm_address,
248 };
249
250 Ok(running_node)
251 }
252}
253
254#[derive(Clone)]
258pub(crate) struct Node {
259 inner: Arc<NodeInner>,
260}
261
262struct NodeInner {
265 events_channel: NodeEventsChannel,
266 network: Network,
267 close_group_tracker: Mutex<CloseGroupTracker>,
268 #[cfg(feature = "open-metrics")]
269 metrics_recorder: Option<NodeMetricsRecorder>,
270 reward_address: RewardsAddress,
271 evm_network: EvmNetwork,
272}
273
274impl Node {
275 pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
277 &self.inner.events_channel
278 }
279
280 pub(crate) fn network(&self) -> &Network {
282 &self.inner.network
283 }
284
285 fn close_group_tracker(&self) -> &Mutex<CloseGroupTracker> {
286 &self.inner.close_group_tracker
287 }
288
289 #[cfg(feature = "open-metrics")]
290 pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
293 self.inner.metrics_recorder.as_ref()
294 }
295
296 pub(crate) fn reward_address(&self) -> &RewardsAddress {
298 &self.inner.reward_address
299 }
300
301 pub(crate) fn evm_network(&self) -> &EvmNetwork {
302 &self.inner.evm_network
303 }
304
305 fn run(
308 self,
309 mut network_event_receiver: Receiver<NetworkEvent>,
310 mut shutdown_rx: watch::Receiver<bool>,
311 ) {
312 let mut rng = StdRng::from_entropy();
313
314 let peers_connected = Arc::new(AtomicUsize::new(0));
315
316 let _node_task = spawn(async move {
317 let replication_interval: u64 = rng.gen_range(
320 PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
321 );
322 let replication_interval_time = Duration::from_secs(replication_interval);
323 debug!("Replication interval set to {replication_interval_time:?}");
324
325 let mut replication_interval = tokio::time::interval(replication_interval_time);
326 let _ = replication_interval.tick().await; let mut uptime_metrics_update_interval =
329 tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
330 let _ = uptime_metrics_update_interval.tick().await; let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
335 UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
336 ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
337 );
338 let irrelevant_records_cleanup_interval_time =
339 Duration::from_secs(irrelevant_records_cleanup_interval);
340 let mut irrelevant_records_cleanup_interval =
341 tokio::time::interval(irrelevant_records_cleanup_interval_time);
342 let _ = irrelevant_records_cleanup_interval.tick().await; let storage_challenge_interval: u64 =
347 rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
348 let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
349 debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
350
351 let mut storage_challenge_interval =
352 tokio::time::interval(storage_challenge_interval_time);
353 let _ = storage_challenge_interval.tick().await; loop {
356 let peers_connected = &peers_connected;
357
358 tokio::select! {
359 result = shutdown_rx.changed() => {
361 if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
362 info!("Shutdown signal received or sender dropped. Exiting network events loop.");
363 break;
364 }
365 },
366 net_event = network_event_receiver.recv() => {
367 match net_event {
368 Some(event) => {
369 let start = Instant::now();
370 let event_string = format!("{event:?}");
371
372 self.handle_network_event(event, peers_connected).await;
373 trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
374
375 }
376 None => {
377 error!("The `NetworkEvent` channel is closed");
378 self.events_channel().broadcast(NodeEvent::ChannelClosed);
379 break;
380 }
381 }
382 }
383 _ = replication_interval.tick() => {
385 let now = Instant::now();
386
387 {
388 let mut tracker = self.close_group_tracker().lock().await;
389 if tracker.handle_timer_expiry(now) {
390 trace!("Replication timer expired for tracked peers; periodic run will cover it");
391 }
392 }
393
394 let start = now;
395 let network = self.network().clone();
396 self.record_metrics(Marker::IntervalReplicationTriggered);
397
398 let _handle = spawn(async move {
399 Self::try_interval_replication(network);
400 trace!("Periodic replication took {:?}", start.elapsed());
401 });
402 }
403 _ = uptime_metrics_update_interval.tick() => {
404 #[cfg(feature = "open-metrics")]
405 if let Some(metrics_recorder) = self.metrics_recorder() {
406 let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
407 }
408 }
409 _ = irrelevant_records_cleanup_interval.tick() => {
410 let network = self.network().clone();
411
412 let _handle = spawn(async move {
413 Self::trigger_irrelevant_record_cleanup(network);
414 });
415 }
416 _ = storage_challenge_interval.tick() => {
418 let start = Instant::now();
419 debug!("Periodic storage challenge triggered");
420 let network = self.network().clone();
421
422 let _handle = spawn(async move {
423 Self::storage_challenge(network).await;
424 trace!("Periodic storage challenge took {:?}", start.elapsed());
425 });
426 }
427 }
428 }
429 });
430 }
431
432 pub(crate) fn record_metrics(&self, marker: Marker) {
435 marker.log();
436 #[cfg(feature = "open-metrics")]
437 if let Some(metrics_recorder) = self.metrics_recorder() {
438 metrics_recorder.record(marker)
439 }
440 }
441
442 async fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
447 let start = Instant::now();
448 let event_string = format!("{event:?}");
449 let event_header;
450
451 if let NetworkEvent::QueryRequestReceived {
453 query: Query::GetVersion { .. },
454 ..
455 } = event
456 {
457 trace!("Handling NetworkEvent {event_string}");
458 } else {
459 debug!("Handling NetworkEvent {event_string}");
460 }
461
462 match event {
463 NetworkEvent::PeerAdded(peer_id, connected_peers) => {
464 event_header = "PeerAdded";
465 let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
467 if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
468 self.events_channel()
469 .broadcast(NodeEvent::ConnectedToNetwork);
470 }
471
472 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
473 self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
474
475 let network = self.network().clone();
477 let _handle = spawn(async move {
478 Self::try_query_peer_version(network, peer_id, Default::default()).await;
479 });
480
481 let replication_decision = {
482 let mut tracker = self.close_group_tracker().lock().await;
483 tracker.record_peer_added(peer_id)
484 };
485
486 if replication_decision.should_trigger() {
487 let network = self.network().clone();
488 self.record_metrics(Marker::IntervalReplicationTriggered);
489 let _handle = spawn(async move {
490 Self::try_interval_replication(network);
491 });
492 } else if matches!(replication_decision, ReplicationDirective::Skip) {
493 trace!(
494 "Replication skipped for {peer_id:?} addition due to restart behaviour tracking"
495 );
496 }
497 }
498 NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
499 event_header = "PeerRemoved";
500 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
501 self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
502
503 let self_id = self.network().peer_id();
504 let distance =
505 NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
506 info!(
507 "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
508 distance.ilog2()
509 );
510
511 let replication_decision = {
512 let mut tracker = self.close_group_tracker().lock().await;
513 tracker.record_peer_removed(peer_id)
514 };
515
516 if replication_decision.should_trigger() {
517 let network = self.network().clone();
518 self.record_metrics(Marker::IntervalReplicationTriggered);
519 let _handle = spawn(async move {
520 Self::try_interval_replication(network);
521 });
522 } else if matches!(replication_decision, ReplicationDirective::Skip) {
523 trace!(
524 "Replication skipped for {peer_id:?} removal due to restart behaviour tracking"
525 );
526 }
527 }
528 NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
529 event_header = "PeerWithUnsupportedProtocol";
530 }
531 NetworkEvent::NewListenAddr(_) => {
532 event_header = "NewListenAddr";
533 }
534 NetworkEvent::ResponseReceived { res } => {
535 event_header = "ResponseReceived";
536 if let Err(err) = self.handle_response(res) {
537 error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
538 }
539 }
540 NetworkEvent::KeysToFetchForReplication(keys) => {
541 event_header = "KeysToFetchForReplication";
542 self.record_metrics(Marker::fetching_keys_for_replication(&keys));
543
544 if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
545 error!("Error while trying to fetch replicated data {err:?}");
546 }
547 }
548 NetworkEvent::QueryRequestReceived { query, channel } => {
549 event_header = "QueryRequestReceived";
550 let node = self.clone();
551 let payment_address = *self.reward_address();
552
553 let _handle = spawn(async move {
554 let network = node.network().clone();
555 let res = Self::handle_query(node, query, payment_address).await;
556
557 if let Response::Query(QueryResponse::GetVersion { .. }) = res {
559 trace!("Sending response {res:?}");
560 } else {
561 debug!("Sending response {res:?}");
562 }
563
564 network.send_response(res, channel);
565 });
566 }
567 NetworkEvent::UnverifiedRecord(record) => {
568 event_header = "UnverifiedRecord";
569 let self_clone = self.clone();
571 let _handle = spawn(async move {
572 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
573 match self_clone.validate_and_store_record(record).await {
574 Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
575 Err(err) => {
576 self_clone.record_metrics(Marker::RecordRejected(&key, &err));
577 }
578 }
579 });
580 }
581 NetworkEvent::TerminateNode { reason } => {
582 event_header = "TerminateNode";
583 error!("Received termination from swarm_driver due to {reason:?}");
584 self.events_channel()
585 .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
586 }
587 NetworkEvent::FailedToFetchHolders(bad_nodes) => {
588 event_header = "FailedToFetchHolders";
589 let network = self.network().clone();
590 let pretty_log: Vec<_> = bad_nodes
591 .iter()
592 .map(|(peer_id, record_key)| {
593 let pretty_key = PrettyPrintRecordKey::from(record_key);
594 (peer_id, pretty_key)
595 })
596 .collect();
597 debug!(
601 "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
602 );
603 let _handle = spawn(async move {
604 for (peer_id, record_key) in bad_nodes {
605 if let Ok(false) = network.is_record_key_present_locally(&record_key).await
608 {
609 error!(
610 "From peer {peer_id:?}, failed to fetch record {:?}",
611 PrettyPrintRecordKey::from(&record_key)
612 );
613 network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
614 }
615 }
616 });
617 }
618 NetworkEvent::QuoteVerification { quotes } => {
619 event_header = "QuoteVerification";
620 let network = self.network().clone();
621
622 let _handle = spawn(async move {
623 quotes_verification(&network, quotes).await;
624 });
625 }
626 NetworkEvent::FreshReplicateToFetch { holder, keys } => {
627 event_header = "FreshReplicateToFetch";
628 self.fresh_replicate_to_fetch(holder, keys);
629 }
630 NetworkEvent::PeersForVersionQuery(peers) => {
631 event_header = "PeersForVersionQuery";
632 let network = self.network().clone();
633 let _handle = spawn(async move {
634 Self::query_peers_version(network, peers).await;
635 });
636 }
637 NetworkEvent::NetworkWideReplication { keys } => {
638 event_header = "NetworkWideReplication";
639 self.perform_network_wide_replication(keys);
640 }
641 }
642
643 trace!(
644 "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
645 start.elapsed()
646 );
647 }
648
649 fn handle_response(&self, response: Response) -> Result<()> {
651 match response {
652 Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
653 warn!("Mishandled replicate response, should be handled earlier");
655 }
656 Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
657 error!(
658 "Response to replication shall be handled by called not by common handler, {resp:?}"
659 );
660 }
661 Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
662 }
664 other => {
665 warn!("handle_response not implemented for {other:?}");
666 }
667 };
668
669 Ok(())
670 }
671
672 async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
673 let network = node.network();
674 let resp: QueryResponse = match query {
675 Query::GetStoreQuote {
676 key,
677 data_type,
678 data_size,
679 nonce,
680 difficulty,
681 } => {
682 let record_key = key.to_record_key();
683 let self_id = network.peer_id();
684
685 let maybe_quoting_metrics = network
686 .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
687 .await;
688
689 let storage_proofs = if let Some(nonce) = nonce {
690 Self::respond_x_closest_record_proof(
691 network,
692 key.clone(),
693 nonce,
694 difficulty,
695 false,
696 )
697 .await
698 } else {
699 vec![]
700 };
701
702 match maybe_quoting_metrics {
703 Ok((quoting_metrics, is_already_stored)) => {
704 if is_already_stored {
705 QueryResponse::GetStoreQuote {
706 quote: Err(ProtocolError::RecordExists(
707 PrettyPrintRecordKey::from(&record_key).into_owned(),
708 )),
709 peer_address: NetworkAddress::from(self_id),
710 storage_proofs,
711 }
712 } else {
713 QueryResponse::GetStoreQuote {
714 quote: Self::create_quote_for_storecost(
715 network,
716 &key,
717 "ing_metrics,
718 &payment_address,
719 ),
720 peer_address: NetworkAddress::from(self_id),
721 storage_proofs,
722 }
723 }
724 }
725 Err(err) => {
726 warn!("GetStoreQuote failed for {key:?}: {err}");
727 QueryResponse::GetStoreQuote {
728 quote: Err(ProtocolError::GetStoreQuoteFailed),
729 peer_address: NetworkAddress::from(self_id),
730 storage_proofs,
731 }
732 }
733 }
734 }
735 Query::GetReplicatedRecord { requester: _, key } => {
736 let our_address = NetworkAddress::from(network.peer_id());
737 let record_key = key.to_record_key();
738
739 let result = match network.get_local_record(&record_key).await {
740 Ok(Some(record)) => Ok((our_address, Bytes::from(record.value))),
741 Ok(None) => Err(ProtocolError::ReplicatedRecordNotFound {
742 holder: Box::new(our_address),
743 key: Box::new(key.clone()),
744 }),
745 Err(err) => Err(ProtocolError::PutRecordFailed(format!(
747 "Error to fetch local record for GetReplicatedRecord {err:?}"
748 ))),
749 };
750
751 QueryResponse::GetReplicatedRecord(result)
752 }
753 Query::GetChunkExistenceProof {
754 key,
755 nonce,
756 difficulty,
757 } => QueryResponse::GetChunkExistenceProof(
758 Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
759 ),
760 Query::CheckNodeInProblem(target_address) => {
761 debug!("Got CheckNodeInProblem for peer {target_address:?}");
762
763 let is_in_trouble =
764 if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
765 result
766 } else {
767 debug!("Could not get status of {target_address:?}.");
768 false
769 };
770
771 QueryResponse::CheckNodeInProblem {
772 reporter_address: NetworkAddress::from(network.peer_id()),
773 target_address,
774 is_in_trouble,
775 }
776 }
777 Query::GetClosestPeers {
778 key,
779 num_of_peers,
780 range,
781 sign_result,
782 } => {
783 debug!(
784 "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
785 );
786 Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
787 .await
788 }
789 Query::GetVersion(_) => QueryResponse::GetVersion {
790 peer: NetworkAddress::from(network.peer_id()),
791 version: ant_build_info::package_version(),
792 },
793 Query::PutRecord {
794 holder,
795 address,
796 serialized_record,
797 } => {
798 let record = Record {
799 key: address.to_record_key(),
800 value: serialized_record,
801 publisher: None,
802 expires: None,
803 };
804
805 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
806 let result = match node.validate_and_store_record(record).await {
807 Ok(()) => Ok(()),
808 Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
809 node.record_metrics(Marker::RecordRejected(
810 &key,
811 &PutValidationError::OutdatedRecordCounter { counter, expected },
812 ));
813 Err(ProtocolError::OutdatedRecordCounter { counter, expected })
814 }
815 Err(PutValidationError::TopologyVerificationFailed {
816 target_address,
817 valid_count,
818 total_paid,
819 closest_count,
820 node_peers,
821 paid_peers,
822 }) => {
823 node.record_metrics(Marker::RecordRejected(
824 &key,
825 &PutValidationError::TopologyVerificationFailed {
826 target_address: target_address.clone(),
827 valid_count,
828 total_paid,
829 closest_count,
830 node_peers: node_peers.clone(),
831 paid_peers: paid_peers.clone(),
832 },
833 ));
834 Err(ProtocolError::TopologyVerificationFailed {
835 target_address: Box::new(target_address),
836 valid_count,
837 total_paid,
838 closest_count,
839 node_peers,
840 paid_peers,
841 })
842 }
843 Err(err) => {
844 node.record_metrics(Marker::RecordRejected(&key, &err));
845 Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
846 }
847 };
848 QueryResponse::PutRecord {
849 result,
850 peer_address: holder,
851 record_addr: address,
852 }
853 }
854 Query::GetMerkleCandidateQuote {
855 key,
856 data_type,
857 data_size,
858 merkle_payment_timestamp,
859 } => {
860 Self::respond_merkle_candidate_quote(
861 network,
862 key,
863 data_type,
864 data_size,
865 merkle_payment_timestamp,
866 payment_address,
867 )
868 .await
869 }
870 };
871 Response::Query(resp)
872 }
873
874 async fn respond_get_closest_peers(
875 network: &Network,
876 target: NetworkAddress,
877 num_of_peers: Option<usize>,
878 range: Option<[u8; 32]>,
879 sign_result: bool,
880 ) -> QueryResponse {
881 let local_peers = network.get_local_peers_with_multiaddr().await;
882 let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
883 Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
884 } else {
885 vec![]
886 };
887
888 let signature = if sign_result {
889 let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
890 bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
891 network.sign(&bytes).ok()
892 } else {
893 None
894 };
895
896 QueryResponse::GetClosestPeers {
897 target,
898 peers,
899 signature,
900 }
901 }
902
903 fn calculate_get_closest_peers(
904 peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
905 target: NetworkAddress,
906 num_of_peers: Option<usize>,
907 range: Option<[u8; 32]>,
908 ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
909 match (num_of_peers, range) {
910 (_, Some(value)) => {
911 let distance = U256::from_big_endian(&value);
912 peer_addrs
913 .iter()
914 .filter_map(|(peer_id, multi_addrs)| {
915 let addr = NetworkAddress::from(*peer_id);
916 if target.distance(&addr).0 <= distance {
917 Some((addr, multi_addrs.clone()))
918 } else {
919 None
920 }
921 })
922 .collect()
923 }
924 (Some(num_of_peers), _) => {
925 let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
926 .iter()
927 .map(|(peer_id, multi_addrs)| {
928 let addr = NetworkAddress::from(*peer_id);
929 (addr, multi_addrs.clone())
930 })
931 .collect();
932 result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
933 result.into_iter().take(num_of_peers).collect()
934 }
935 (None, None) => vec![],
936 }
937 }
938
939 async fn respond_merkle_candidate_quote(
943 network: &Network,
944 key: NetworkAddress,
945 data_type: u32,
946 data_size: usize,
947 merkle_payment_timestamp: u64,
948 payment_address: RewardsAddress,
949 ) -> QueryResponse {
950 debug!(
951 "merkle payment: GetMerkleCandidateQuote for target {key:?}, timestamp: {merkle_payment_timestamp}, data_type: {data_type}, data_size: {data_size}"
952 );
953
954 const TIMESTAMP_TOLERANCE: u64 = 24 * 60 * 60; let now = std::time::SystemTime::now()
964 .duration_since(std::time::UNIX_EPOCH)
965 .unwrap_or_default()
966 .as_secs();
967
968 let future_threshold = now + TIMESTAMP_TOLERANCE;
970 if merkle_payment_timestamp > future_threshold {
971 let error_msg = format!(
972 "Rejected future timestamp {merkle_payment_timestamp} (current time: {now}, threshold: {future_threshold})"
973 );
974 warn!("{error_msg} for {key:?}");
975 return QueryResponse::GetMerkleCandidateQuote(Err(
976 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
977 ));
978 }
979
980 let expiration_threshold = MERKLE_PAYMENT_EXPIRATION + TIMESTAMP_TOLERANCE;
982 let age = now.saturating_sub(merkle_payment_timestamp);
983 if age > expiration_threshold {
984 let error_msg = format!(
985 "Rejected expired timestamp {merkle_payment_timestamp} (age: {age}s, max: {expiration_threshold}s)",
986 );
987 warn!("{error_msg} for {key:?}");
988 return QueryResponse::GetMerkleCandidateQuote(Err(
989 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
990 ));
991 }
992
993 let record_key = key.to_record_key();
995 let (quoting_metrics, _is_already_stored) = match network
996 .get_local_quoting_metrics(record_key, data_type, data_size)
997 .await
998 {
999 Ok(metrics) => metrics,
1000 Err(err) => {
1001 let error_msg = format!("Failed to get quoting metrics for {key:?}: {err}");
1002 warn!("{error_msg}");
1003 return QueryResponse::GetMerkleCandidateQuote(Err(
1004 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1005 ));
1006 }
1007 };
1008
1009 let pub_key = network.get_pub_key();
1011 let reward_address = payment_address;
1012 let bytes = ant_evm::merkle_payments::MerklePaymentCandidateNode::bytes_to_sign(
1013 "ing_metrics,
1014 &reward_address,
1015 merkle_payment_timestamp,
1016 );
1017 let signature = match network.sign(&bytes) {
1018 Ok(sig) => sig,
1019 Err(e) => {
1020 let error_msg = format!("Failed to sign candidate node for {key:?}: {e}");
1021 error!("{error_msg}");
1022 return QueryResponse::GetMerkleCandidateQuote(Err(
1023 ProtocolError::FailedToSignMerkleCandidate(error_msg),
1024 ));
1025 }
1026 };
1027
1028 let candidate = ant_evm::merkle_payments::MerklePaymentCandidateNode {
1029 quoting_metrics,
1030 reward_address,
1031 merkle_payment_timestamp,
1032 pub_key,
1033 signature,
1034 };
1035 QueryResponse::GetMerkleCandidateQuote(Ok(candidate))
1036 }
1037
1038 async fn respond_x_closest_record_proof(
1041 network: &Network,
1042 key: NetworkAddress,
1043 nonce: Nonce,
1044 difficulty: usize,
1045 chunk_only: bool,
1046 ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
1047 let start = Instant::now();
1048 let mut results = vec![];
1049 if difficulty == 1 {
1050 let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
1052 if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
1053 let proof = ChunkProof::new(&record.value, nonce);
1054 debug!("Chunk proof for {key:?} is {proof:?}");
1055 result = Ok(proof)
1056 } else {
1057 debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
1058 }
1059
1060 results.push((key.clone(), result));
1061 } else {
1062 let all_local_records = network.get_all_local_record_addresses().await;
1063
1064 if let Ok(all_local_records) = all_local_records {
1065 let mut all_chunk_addrs: Vec<_> = if chunk_only {
1066 all_local_records
1067 .iter()
1068 .filter_map(|(addr, record_type)| {
1069 if *record_type == ValidationType::Chunk {
1070 Some(addr.clone())
1071 } else {
1072 None
1073 }
1074 })
1075 .collect()
1076 } else {
1077 all_local_records.keys().cloned().collect()
1078 };
1079
1080 all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
1082
1083 let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
1085
1086 for addr in all_chunk_addrs.iter().take(workload_factor) {
1087 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
1088 {
1089 let proof = ChunkProof::new(&record.value, nonce);
1090 debug!("Chunk proof for {key:?} is {proof:?}");
1091 results.push((addr.clone(), Ok(proof)));
1092 }
1093 }
1094 }
1095
1096 info!(
1097 "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
1098 results.len(),
1099 start.elapsed()
1100 );
1101 }
1102
1103 results
1104 }
1105
1106 async fn storage_challenge(network: Network) {
1110 let start = Instant::now();
1111 let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
1112 network.get_k_closest_local_peers_to_the_target(None).await
1113 {
1114 closest_peers
1115 .into_iter()
1116 .take(CLOSE_GROUP_SIZE)
1117 .collect_vec()
1118 } else {
1119 error!("Cannot get local neighbours");
1120 return;
1121 };
1122 if closest_peers.len() < CLOSE_GROUP_SIZE {
1123 debug!(
1124 "Not enough neighbours ({}/{}) to carry out storage challenge.",
1125 closest_peers.len(),
1126 CLOSE_GROUP_SIZE
1127 );
1128 return;
1129 }
1130
1131 let mut verify_candidates: Vec<NetworkAddress> =
1132 if let Ok(all_keys) = network.get_all_local_record_addresses().await {
1133 all_keys
1134 .iter()
1135 .filter_map(|(addr, record_type)| {
1136 if ValidationType::Chunk == *record_type {
1137 Some(addr.clone())
1138 } else {
1139 None
1140 }
1141 })
1142 .collect()
1143 } else {
1144 error!("Failed to get local record addresses.");
1145 return;
1146 };
1147 let num_of_targets = verify_candidates.len();
1148 if num_of_targets < 50 {
1149 debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
1150 return;
1151 }
1152
1153 let self_addr = NetworkAddress::from(network.peer_id());
1156 verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
1157 let index: usize = OsRng.gen_range(0..num_of_targets / 2);
1158 let target = verify_candidates[index].clone();
1159 let difficulty = CLOSE_GROUP_SIZE;
1161 verify_candidates.sort_by_key(|addr| target.distance(addr));
1162 let expected_targets = verify_candidates.into_iter().take(difficulty);
1163 let nonce: Nonce = thread_rng().r#gen::<u64>();
1164 let mut expected_proofs = HashMap::new();
1165 for addr in expected_targets {
1166 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
1167 let expected_proof = ChunkProof::new(&record.value, nonce);
1168 let _ = expected_proofs.insert(addr, expected_proof);
1169 } else {
1170 error!("Local record {addr:?} cann't be loaded from disk.");
1171 }
1172 }
1173 let request = Request::Query(Query::GetChunkExistenceProof {
1174 key: target.clone(),
1175 nonce,
1176 difficulty,
1177 });
1178
1179 let mut tasks = JoinSet::new();
1180 for (peer_id, addresses) in closest_peers {
1181 if peer_id == network.peer_id() {
1182 continue;
1183 }
1184 let network_clone = network.clone();
1185 let request_clone = request.clone();
1186 let expected_proofs_clone = expected_proofs.clone();
1187 let _ = tasks.spawn(async move {
1188 let res = scoring_peer(
1189 network_clone,
1190 (peer_id, addresses),
1191 request_clone,
1192 expected_proofs_clone,
1193 )
1194 .await;
1195 (peer_id, res)
1196 });
1197 }
1198
1199 let mut peer_scores = vec![];
1200 while let Some(res) = tasks.join_next().await {
1201 match res {
1202 Ok((peer_id, score)) => {
1203 let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
1204 if !is_healthy {
1205 info!(
1206 "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
1207 );
1208 network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
1210 }
1211 peer_scores.push((peer_id, is_healthy));
1212 }
1213 Err(e) => {
1214 info!("StorageChallenge task completed with error {e:?}");
1215 }
1216 }
1217 }
1218 if !peer_scores.is_empty() {
1219 network.notify_peer_scores(peer_scores);
1220 }
1221
1222 Self::verify_local_replication_health(network.clone()).await;
1223
1224 info!(
1225 "Completed node StorageChallenge against neighbours in {:?}!",
1226 start.elapsed()
1227 );
1228 }
1229
1230 async fn verify_local_replication_health(network: Network) {
1232 let closest_peers = match network.get_k_closest_local_peers_to_the_target(None).await {
1233 Ok(peers) => peers,
1234 Err(err) => {
1235 warn!("Cannot fetch closest peers for replica verification: {err:?}");
1236 return;
1237 }
1238 };
1239
1240 if closest_peers.len() <= CLOSE_NEIGHBOUR_DISTANCE_INDEX {
1241 debug!(
1242 "Skipping replica verification as we only know {} neighbours (need > {}).",
1243 closest_peers.len(),
1244 CLOSE_NEIGHBOUR_DISTANCE_INDEX
1245 );
1246 return;
1247 }
1248
1249 let self_address = NetworkAddress::from(network.peer_id());
1250 let Some((threshold_peer, _)) = closest_peers.get(CLOSE_NEIGHBOUR_DISTANCE_INDEX) else {
1251 debug!("Unable to determine distance threshold for replica verification.");
1252 return;
1253 };
1254
1255 let threshold_distance = self_address.distance(&NetworkAddress::from(*threshold_peer));
1256 let Some(threshold_ilog2) = threshold_distance.ilog2() else {
1257 debug!("Threshold distance lacks ilog2; cannot proceed with replica verification.");
1258 return;
1259 };
1260
1261 let local_records = match network.get_all_local_record_addresses().await {
1262 Ok(records) => records,
1263 Err(err) => {
1264 warn!("Failed to list local records for replica verification: {err:?}");
1265 return;
1266 }
1267 };
1268
1269 let mut nearby_records: Vec<NetworkAddress> = local_records
1270 .into_iter()
1271 .filter_map(|(address, record_type)| {
1272 if record_type != ValidationType::Chunk {
1273 return None;
1274 }
1275 let distance = self_address.distance(&address);
1276 distance.ilog2().and_then(|record_ilog2| {
1277 if record_ilog2 <= threshold_ilog2 {
1278 Some(address)
1279 } else {
1280 None
1281 }
1282 })
1283 })
1284 .collect();
1285
1286 nearby_records.shuffle(&mut thread_rng());
1287 let target_record = if let Some(entry) = nearby_records.first().cloned() {
1288 entry
1289 } else {
1290 debug!("No nearby chunk records available for replica verification.");
1291 return;
1292 };
1293
1294 let pretty_key = PrettyPrintRecordKey::from(&target_record.to_record_key()).into_owned();
1295
1296 let candidate_peers = match network
1297 .get_k_closest_local_peers_to_the_target(Some(target_record.clone()))
1298 .await
1299 {
1300 Ok(peers) => peers
1301 .into_iter()
1302 .filter(|(peer_id, _)| peer_id != &network.peer_id())
1303 .take(REPLICA_FETCH_PEER_COUNT)
1304 .collect::<Vec<_>>(),
1305 Err(err) => {
1306 warn!(
1307 "Cannot fetch record-specific closest peers for replica verification: {err:?}"
1308 );
1309 return;
1310 }
1311 };
1312
1313 if candidate_peers.len() < REPLICA_FETCH_PEER_COUNT {
1314 debug!(
1315 "Only {} peers available for replica verification (need at least {}).",
1316 candidate_peers.len(),
1317 REPLICA_FETCH_PEER_COUNT
1318 );
1319 return;
1320 }
1321
1322 debug!(
1323 "Verifying replicated record {pretty_key:?} against {} closest peers.",
1324 candidate_peers.len()
1325 );
1326
1327 let (successful_peers, failed_peers) = Self::fetch_record_from_peers_with_addresses(
1328 network.clone(),
1329 target_record.clone(),
1330 candidate_peers,
1331 )
1332 .await;
1333
1334 if failed_peers.is_empty() {
1335 debug!("All peers returned record {pretty_key:?} during replica verification.");
1336 return;
1337 }
1338
1339 if successful_peers.len() < MIN_HEALTHY_REPLICA_COUNT {
1340 warn!(
1341 "Replica verification fetched only {} copies of {pretty_key:?}. Record is unhealthy; skipping peer classification.",
1342 successful_peers.len()
1343 );
1344 return;
1345 }
1346
1347 if !failed_peers.is_empty() {
1348 info!(
1349 "Scheduling replica verification retry for {} peers on record {pretty_key:?}.",
1350 failed_peers.len()
1351 );
1352 Self::schedule_record_fetch_retry(network.clone(), target_record.clone(), failed_peers)
1353 .await;
1354 }
1355 }
1356
1357 async fn fetch_record_from_peers_with_addresses(
1359 network: Network,
1360 record_address: NetworkAddress,
1361 peers: Vec<(PeerId, Addresses)>,
1362 ) -> (Vec<PeerId>, Vec<PeerId>) {
1363 let request = Request::Query(Query::GetReplicatedRecord {
1364 requester: NetworkAddress::from(network.peer_id()),
1365 key: record_address.clone(),
1366 });
1367 let expected_key = record_address.to_record_key();
1368 let pretty_key = PrettyPrintRecordKey::from(&expected_key).into_owned();
1369
1370 let mut successes = Vec::new();
1371 let mut failures = Vec::new();
1372 let concurrency = peers.len();
1373 let results = stream::iter(peers.into_iter().map(|(peer_id, addrs)| {
1374 let network_clone = network.clone();
1375 let request_clone = request.clone();
1376 async move {
1377 let result = network_clone
1378 .send_request(request_clone, peer_id, addrs.clone())
1379 .await;
1380 (peer_id, addrs, result)
1381 }
1382 }))
1383 .buffer_unordered(concurrency)
1384 .collect::<Vec<_>>()
1385 .await;
1386
1387 for res in results {
1388 match res {
1389 (
1390 peer_id,
1391 _addrs,
1392 Ok((Response::Query(QueryResponse::GetReplicatedRecord(result)), _)),
1393 ) => {
1394 match result {
1395 Ok((_holder, value)) => {
1396 let record =
1398 Record::new(record_address.to_record_key(), value.to_vec());
1399 if let Ok(chunk) = try_deserialize_record::<Chunk>(&record)
1400 && chunk.network_address().to_record_key() == expected_key
1401 {
1402 successes.push(peer_id);
1403 } else {
1404 warn!(
1405 "Peer {peer_id:?} responded with an incorrect chunk copy of {pretty_key:?}."
1406 );
1407 failures.push(peer_id);
1408 }
1409 }
1410 Err(err) => {
1411 info!(
1412 "Peer {peer_id:?} responded with error {err:?} for replicated record {pretty_key:?}."
1413 );
1414 failures.push(peer_id);
1415 }
1416 }
1417 }
1418 (peer_id, _addrs, Ok((other_response, _))) => {
1419 warn!(
1420 "Peer {peer_id:?} responded with unexpected message {other_response:?} for {pretty_key:?}."
1421 );
1422 failures.push(peer_id);
1423 }
1424 (peer_id, _addrs, Err(err)) => {
1425 info!(
1426 "Failed to reach peer {peer_id:?} for replicated record {pretty_key:?}: {err:?}"
1427 );
1428 failures.push(peer_id);
1429 }
1430 }
1431 }
1432
1433 (successes, failures)
1434 }
1435
1436 async fn schedule_record_fetch_retry(
1438 network: Network,
1439 record_address: NetworkAddress,
1440 failed_peers: Vec<PeerId>,
1441 ) {
1442 let retry_peers: HashSet<_> = failed_peers.into_iter().collect();
1443 if retry_peers.is_empty() {
1444 return;
1445 }
1446
1447 let record_clone = record_address.clone();
1448 let network_clone = network.clone();
1449
1450 tokio::time::sleep(REPLICA_FETCH_RETRY_DELAY).await;
1451 let pretty_key = PrettyPrintRecordKey::from(&record_clone.to_record_key()).into_owned();
1452
1453 let refreshed_candidates =
1454 Self::refresh_retry_candidate_addresses(&network_clone, &record_clone, &retry_peers)
1455 .await;
1456
1457 if refreshed_candidates.is_empty() {
1458 info!(
1459 "Skipping replica retry for {pretty_key:?}; no tracked peers remain close to the record."
1460 );
1461 return;
1462 }
1463
1464 let (_, still_failed) = Self::fetch_record_from_peers_with_addresses(
1465 network_clone.clone(),
1466 record_clone.clone(),
1467 refreshed_candidates,
1468 )
1469 .await;
1470
1471 if still_failed.is_empty() {
1472 info!("All peers successfully returned {pretty_key:?} during replica retry.");
1473 return;
1474 }
1475
1476 warn!(
1477 "{} peers still failed to provide {pretty_key:?}; evicting and blacklisting.",
1478 still_failed.len()
1479 );
1480 for peer_id in still_failed {
1481 Self::evict_and_blacklist_peer(network_clone.clone(), peer_id);
1482 }
1483 }
1484
1485 async fn refresh_retry_candidate_addresses(
1487 network: &Network,
1488 record_address: &NetworkAddress,
1489 retry_peers: &HashSet<PeerId>,
1490 ) -> Vec<(PeerId, Addresses)> {
1491 let pretty_key = PrettyPrintRecordKey::from(&record_address.to_record_key()).into_owned();
1492 let Ok(closest_peers) = network.get_closest_peers(record_address).await else {
1493 warn!(
1494 "Failed to refresh peer addresses for {pretty_key:?}; unable to retry replica fetch."
1495 );
1496 return Vec::new();
1497 };
1498
1499 let closest_map: HashMap<PeerId, Addresses> = closest_peers.into_iter().collect();
1500
1501 retry_peers
1502 .iter()
1503 .filter_map(|peer_id| {
1504 closest_map
1505 .get(peer_id)
1506 .cloned()
1507 .map(|addrs| (*peer_id, addrs))
1508 })
1509 .collect()
1510 }
1511
1512 fn evict_and_blacklist_peer(network: Network, peer_id: PeerId) {
1514 network.remove_peer(peer_id);
1515 network.add_peer_to_blocklist(peer_id);
1516 }
1517
1518 async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1520 for (peer_id, addrs) in peers {
1522 Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1523 }
1524 }
1525
1526 async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1528 let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1529 let version = match network.send_request(request, peer, addrs).await {
1531 Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1532 trace!("Fetched peer version {peer:?} as {version:?}");
1533 version
1534 }
1535 Ok(other) => {
1536 info!("Not a fetched peer version {peer:?}, {other:?}");
1537 "none".to_string()
1538 }
1539 Err(err) => {
1540 info!("Failed to fetch peer version {peer:?} with error {err:?}");
1541 if let NetworkError::OutboundError(OutboundFailure::DialFailure) = err {
1545 network.remove_peer(peer);
1546 return;
1547 }
1548 "old".to_string()
1549 }
1550 };
1551 network.notify_node_version(peer, version);
1552 }
1553}
1554
1555#[derive(Debug)]
1556struct CloseGroupTracker {
1557 self_address: NetworkAddress,
1558 close_up_peers: BTreeSet<(Distance, PeerId)>,
1562 tracked_entries: HashMap<PeerId, BehaviourEntry>,
1563}
1564
1565impl CloseGroupTracker {
1566 fn new(self_peer_id: PeerId) -> Self {
1567 Self {
1568 self_address: NetworkAddress::from(self_peer_id),
1569 close_up_peers: BTreeSet::new(),
1570 tracked_entries: HashMap::new(),
1571 }
1572 }
1573
1574 fn record_peer_added(&mut self, peer_id: PeerId) -> ReplicationDirective {
1575 let distance = self.distance_to_peer(peer_id);
1576
1577 let was_present = self.close_up_peers.contains(&(distance, peer_id));
1579
1580 let is_close_enough = self.is_distance_within_close_range(distance);
1582
1583 if is_close_enough {
1584 let _ = self.close_up_peers.insert((distance, peer_id));
1586
1587 if self.close_up_peers.len() > CLOSE_GROUP_TRACKING_LIMIT
1589 && let Some(&(farthest_distance, farthest_peer)) =
1590 self.close_up_peers.iter().next_back()
1591 {
1592 let _ = self
1593 .close_up_peers
1594 .remove(&(farthest_distance, farthest_peer));
1595 }
1596 } else {
1597 let _ = self.tracked_entries.remove(&peer_id);
1599 return ReplicationDirective::Ignore;
1600 }
1601
1602 use std::collections::hash_map::Entry;
1603 match self.tracked_entries.entry(peer_id) {
1604 Entry::Vacant(vacant_entry) => {
1605 let _ = vacant_entry.insert(BehaviourEntry::default());
1606 if !was_present {
1607 ReplicationDirective::Trigger
1608 } else {
1609 ReplicationDirective::Ignore
1610 }
1611 }
1612 Entry::Occupied(mut occupied_entry) => {
1613 let entry = occupied_entry.get_mut();
1614 if entry.awaiting_rejoin || entry.timer_deadline.is_some() {
1615 entry.restart_detected = true;
1616 entry.awaiting_rejoin = false;
1617 entry.timer_deadline = None;
1618 ReplicationDirective::Skip
1619 } else if !was_present {
1620 entry.restart_detected = false;
1621 ReplicationDirective::Trigger
1622 } else {
1623 ReplicationDirective::Ignore
1624 }
1625 }
1626 }
1627 }
1628
1629 fn record_peer_removed(&mut self, peer_id: PeerId) -> ReplicationDirective {
1630 let distance = self.distance_to_peer(peer_id);
1631
1632 let was_tracked = self.close_up_peers.contains(&(distance, peer_id));
1634
1635 if !was_tracked {
1636 return ReplicationDirective::Ignore;
1638 }
1639
1640 let _ = self.close_up_peers.remove(&(distance, peer_id));
1642
1643 let entry = self.tracked_entries.entry(peer_id).or_default();
1644
1645 if entry.restart_detected {
1646 entry.timer_deadline = Some(Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION);
1647 entry.awaiting_rejoin = false;
1648 ReplicationDirective::Skip
1649 } else {
1650 entry.awaiting_rejoin = true;
1651 entry.timer_deadline = None;
1652 ReplicationDirective::Trigger
1653 }
1654 }
1655
1656 fn handle_timer_expiry(&mut self, now: Instant) -> bool {
1657 let mut expired = false;
1658 for entry in self.tracked_entries.values_mut() {
1659 if let Some(deadline) = entry.timer_deadline
1660 && now >= deadline
1661 {
1662 entry.timer_deadline = None;
1663 entry.restart_detected = false;
1664 entry.awaiting_rejoin = false;
1665 expired = true;
1666 }
1667 }
1668
1669 self.tracked_entries.retain(|peer_id, entry| {
1671 let is_currently_present = self
1672 .close_up_peers
1673 .iter()
1674 .any(|(_, present_peer)| present_peer == peer_id);
1675
1676 entry.awaiting_rejoin
1677 || entry.restart_detected
1678 || entry.timer_deadline.is_some()
1679 || is_currently_present
1680 });
1681
1682 expired
1683 }
1684
1685 fn is_distance_within_close_range(&self, distance: Distance) -> bool {
1686 if self.close_up_peers.len() < CLOSE_GROUP_TRACKING_LIMIT {
1688 return true;
1689 }
1690
1691 if let Some(&(farthest_distance, _)) = self.close_up_peers.iter().next_back() {
1693 distance < farthest_distance
1694 } else {
1695 true
1696 }
1697 }
1698
1699 fn distance_to_peer(&self, peer_id: PeerId) -> Distance {
1700 self.self_address.distance(&NetworkAddress::from(peer_id))
1701 }
1702}
1703
1704#[derive(Debug, Default)]
1705struct BehaviourEntry {
1706 awaiting_rejoin: bool,
1707 restart_detected: bool,
1708 timer_deadline: Option<Instant>,
1709}
1710
1711#[derive(Debug, PartialEq, Eq)]
1712enum ReplicationDirective {
1713 Trigger,
1714 Skip,
1715 Ignore,
1716}
1717
1718impl ReplicationDirective {
1719 fn should_trigger(&self) -> bool {
1720 matches!(self, Self::Trigger)
1721 }
1722}
1723
1724#[cfg(test)]
1725mod close_group_tracker_tests {
1726 use super::*;
1727 use std::time::Duration;
1728
1729 fn random_peer() -> PeerId {
1730 let keypair = libp2p::identity::Keypair::generate_ed25519();
1731 PeerId::from(keypair.public())
1732 }
1733
1734 #[test]
1735 fn new_peer_triggers_replication() {
1736 let mut tracker = CloseGroupTracker::new(random_peer());
1737 let peer = random_peer();
1738
1739 assert_eq!(
1740 tracker.record_peer_added(peer),
1741 ReplicationDirective::Trigger
1742 );
1743
1744 assert_eq!(
1745 tracker.record_peer_removed(peer),
1746 ReplicationDirective::Trigger
1747 );
1748 }
1749
1750 #[test]
1751 fn restart_detection_skips_replication() {
1752 let mut tracker = CloseGroupTracker::new(random_peer());
1753 let peer = random_peer();
1754
1755 assert_eq!(
1756 tracker.record_peer_added(peer),
1757 ReplicationDirective::Trigger
1758 );
1759
1760 assert_eq!(
1761 tracker.record_peer_removed(peer),
1762 ReplicationDirective::Trigger
1763 );
1764
1765 assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1766
1767 assert_eq!(
1768 tracker.record_peer_removed(peer),
1769 ReplicationDirective::Skip
1770 );
1771
1772 assert!(tracker.handle_timer_expiry(
1773 Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1)
1774 ));
1775
1776 assert_eq!(
1777 tracker.record_peer_added(peer),
1778 ReplicationDirective::Trigger
1779 );
1780 }
1781
1782 #[test]
1783 fn peer_outside_close_group_is_ignored() {
1784 let mut tracker = CloseGroupTracker::new(random_peer());
1785
1786 let mut added_peers = Vec::new();
1788 for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1789 let peer = random_peer();
1790 let result = tracker.record_peer_added(peer);
1791 assert_eq!(result, ReplicationDirective::Trigger);
1793 added_peers.push(peer);
1794 }
1795
1796 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1797
1798 let mut ignored_count = 0;
1800 let mut trigger_count = 0;
1801 for _ in 0..50 {
1802 let peer = random_peer();
1803 match tracker.record_peer_added(peer) {
1804 ReplicationDirective::Ignore => ignored_count += 1,
1805 ReplicationDirective::Trigger => trigger_count += 1,
1806 _ => {}
1807 }
1808 }
1809
1810 assert!(
1813 ignored_count > 0 || trigger_count > 0,
1814 "Expected some peers to be processed"
1815 );
1816
1817 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1819 }
1820
1821 #[test]
1822 fn removal_of_untracked_peer_is_ignored() {
1823 let mut tracker = CloseGroupTracker::new(random_peer());
1824
1825 let unknown_peer = random_peer();
1827 assert_eq!(
1828 tracker.record_peer_removed(unknown_peer),
1829 ReplicationDirective::Ignore
1830 );
1831 }
1832
1833 #[test]
1834 fn timer_expiry_resets_restart_state() {
1835 let mut tracker = CloseGroupTracker::new(random_peer());
1836 let peer = random_peer();
1837
1838 assert_eq!(
1840 tracker.record_peer_added(peer),
1841 ReplicationDirective::Trigger
1842 );
1843 assert_eq!(
1844 tracker.record_peer_removed(peer),
1845 ReplicationDirective::Trigger
1846 );
1847
1848 assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1850
1851 assert_eq!(
1853 tracker.record_peer_removed(peer),
1854 ReplicationDirective::Skip
1855 );
1856
1857 assert!(!tracker.handle_timer_expiry(Instant::now()));
1859
1860 let after_expiry =
1862 Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1863 assert!(tracker.handle_timer_expiry(after_expiry));
1864
1865 assert_eq!(
1867 tracker.record_peer_added(peer),
1868 ReplicationDirective::Trigger
1869 );
1870 }
1871
1872 #[test]
1873 fn eviction_maintains_closest_peers() {
1874 let self_peer = random_peer();
1875 let mut tracker = CloseGroupTracker::new(self_peer);
1876
1877 let mut peers_with_distances: Vec<(PeerId, Distance)> = Vec::new();
1879 for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1880 let peer = random_peer();
1881 let distance = tracker.distance_to_peer(peer);
1882 let _ = tracker.record_peer_added(peer);
1883 peers_with_distances.push((peer, distance));
1884 }
1885
1886 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1887
1888 peers_with_distances.sort_by_key(|(_, d)| *d);
1890 let farthest_distance = peers_with_distances.last().map(|(_, d)| *d).unwrap();
1891
1892 let mut found_closer = false;
1895 for _ in 0..100 {
1896 let new_peer = random_peer();
1897 let new_distance = tracker.distance_to_peer(new_peer);
1898 if new_distance < farthest_distance {
1899 let result = tracker.record_peer_added(new_peer);
1900 assert_eq!(result, ReplicationDirective::Trigger);
1901 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1903 found_closer = true;
1904 break;
1905 }
1906 }
1907
1908 assert!(
1911 found_closer,
1912 "Could not find a peer closer than the farthest in 100 attempts"
1913 );
1914 }
1915
1916 #[test]
1917 fn tracked_entries_cleaned_up_on_timer_expiry() {
1918 let mut tracker = CloseGroupTracker::new(random_peer());
1919 let peer = random_peer();
1920
1921 let _ = tracker.record_peer_added(peer);
1923 let _ = tracker.record_peer_removed(peer);
1924 let _ = tracker.record_peer_added(peer);
1925 let _ = tracker.record_peer_removed(peer);
1926
1927 assert!(tracker.tracked_entries.contains_key(&peer));
1929
1930 let after_expiry =
1932 Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1933 let _ = tracker.handle_timer_expiry(after_expiry);
1934
1935 assert!(!tracker.tracked_entries.contains_key(&peer));
1938 }
1939}
1940
1941async fn scoring_peer(
1942 network: Network,
1943 peer: (PeerId, Addresses),
1944 request: Request,
1945 expected_proofs: HashMap<NetworkAddress, ChunkProof>,
1946) -> usize {
1947 let peer_id = peer.0;
1948 let start = Instant::now();
1949 let responses = network
1950 .send_and_get_responses(&[peer], &request, true)
1951 .await;
1952
1953 if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
1954 responses.get(&peer_id)
1955 {
1956 if answers.is_empty() {
1957 info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
1958 return 0;
1959 }
1960 let elapsed = start.elapsed();
1961
1962 let mut received_proofs = vec![];
1963 for (addr, proof) in answers {
1964 if let Ok(proof) = proof {
1965 received_proofs.push((addr.clone(), proof.clone()));
1966 }
1967 }
1968
1969 let score = mark_peer(elapsed, received_proofs, &expected_proofs);
1970 info!(
1971 "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
1972 answers.len()
1973 );
1974 score
1975 } else {
1976 info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
1977 0
1978 }
1979}
1980
1981fn mark_peer(
1987 duration: Duration,
1988 answers: Vec<(NetworkAddress, ChunkProof)>,
1989 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1990) -> usize {
1991 let duration_score = duration_score_scheme(duration);
1992 let challenge_score = challenge_score_scheme(answers, expected_proofs);
1993
1994 duration_score * challenge_score
1995}
1996
1997fn duration_score_scheme(duration: Duration) -> usize {
1999 let in_ms = if let Some(value) = duration.as_millis().to_usize() {
2001 value
2002 } else {
2003 info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
2004 1000
2005 };
2006
2007 let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
2008 HIGHEST_SCORE - step
2009}
2010
2011fn challenge_score_scheme(
2013 answers: Vec<(NetworkAddress, ChunkProof)>,
2014 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
2015) -> usize {
2016 let mut correct_answers = 0;
2017 for (addr, chunk_proof) in answers {
2018 if let Some(expected_proof) = expected_proofs.get(&addr) {
2019 if expected_proof.verify(&chunk_proof) {
2020 correct_answers += 1;
2021 } else {
2022 info!("Spot a false answer to the challenge regarding {addr:?}");
2023 return 0;
2025 }
2026 }
2027 }
2028 std::cmp::min(
2035 HIGHEST_SCORE,
2036 HIGHEST_SCORE * correct_answers / expected_proofs.len(),
2037 )
2038}
2039
2040#[cfg(test)]
2041mod tests {
2042 use super::*;
2043 use std::str::FromStr;
2044
2045 #[test]
2046 fn test_no_local_peers() {
2047 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
2048 let target = NetworkAddress::from(PeerId::random());
2049 let num_of_peers = Some(5);
2050 let range = None;
2051 let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
2052
2053 assert_eq!(result, vec![]);
2054 }
2055
2056 #[test]
2057 fn test_fewer_local_peers_than_num_of_peers() {
2058 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2059 (
2060 PeerId::random(),
2061 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2062 ),
2063 (
2064 PeerId::random(),
2065 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2066 ),
2067 (
2068 PeerId::random(),
2069 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2070 ),
2071 ];
2072 let target = NetworkAddress::from(PeerId::random());
2073 let num_of_peers = Some(2);
2074 let range = None;
2075 let result = Node::calculate_get_closest_peers(
2076 local_peers.clone(),
2077 target.clone(),
2078 num_of_peers,
2079 range,
2080 );
2081
2082 let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2084 .iter()
2085 .map(|(peer_id, multi_addrs)| {
2086 let addr = NetworkAddress::from(*peer_id);
2087 (addr, multi_addrs.clone())
2088 })
2089 .collect();
2090 expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
2091 let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
2092
2093 assert_eq!(expected_result, result);
2094 }
2095
2096 #[test]
2097 fn test_with_range_and_num_of_peers() {
2098 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2099 (
2100 PeerId::random(),
2101 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2102 ),
2103 (
2104 PeerId::random(),
2105 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2106 ),
2107 (
2108 PeerId::random(),
2109 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2110 ),
2111 ];
2112 let target = NetworkAddress::from(PeerId::random());
2113 let num_of_peers = Some(0);
2114 let range_value = [128; 32];
2115 let range = Some(range_value);
2116 let result = Node::calculate_get_closest_peers(
2117 local_peers.clone(),
2118 target.clone(),
2119 num_of_peers,
2120 range,
2121 );
2122
2123 let distance = U256::from_big_endian(&range_value);
2125 let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2126 .into_iter()
2127 .filter_map(|(peer_id, multi_addrs)| {
2128 let addr = NetworkAddress::from(peer_id);
2129 if target.distance(&addr).0 <= distance {
2130 Some((addr, multi_addrs.clone()))
2131 } else {
2132 None
2133 }
2134 })
2135 .collect();
2136
2137 assert_eq!(expected_result, result);
2138 }
2139
2140 mod merkle_payment_tests {
2141 use super::*;
2142
2143 #[test]
2145 fn test_timestamp_validation_accepts_valid_timestamp() {
2146 let now = std::time::SystemTime::now()
2147 .duration_since(std::time::UNIX_EPOCH)
2148 .unwrap()
2149 .as_secs();
2150
2151 let valid_timestamp = now - 3600;
2153
2154 let age = now.saturating_sub(valid_timestamp);
2156
2157 assert!(
2158 valid_timestamp <= now,
2159 "Valid timestamp should not be in the future"
2160 );
2161 assert!(
2162 age <= MERKLE_PAYMENT_EXPIRATION,
2163 "Valid timestamp should not be expired"
2164 );
2165 }
2166
2167 #[test]
2169 fn test_timestamp_validation_rejects_future_timestamp() {
2170 let now = std::time::SystemTime::now()
2171 .duration_since(std::time::UNIX_EPOCH)
2172 .unwrap()
2173 .as_secs();
2174
2175 let future_timestamp = now + 3600;
2177
2178 assert!(
2180 future_timestamp > now,
2181 "Future timestamp should be rejected"
2182 );
2183 }
2184
2185 #[test]
2187 fn test_timestamp_validation_rejects_expired_timestamp() {
2188 let now = std::time::SystemTime::now()
2189 .duration_since(std::time::UNIX_EPOCH)
2190 .unwrap()
2191 .as_secs();
2192
2193 let expired_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 86400);
2195
2196 let age = now.saturating_sub(expired_timestamp);
2198
2199 assert!(
2201 age > MERKLE_PAYMENT_EXPIRATION,
2202 "Expired timestamp should be rejected"
2203 );
2204 }
2205
2206 #[test]
2208 fn test_timestamp_validation_at_expiration_boundary() {
2209 let now = std::time::SystemTime::now()
2210 .duration_since(std::time::UNIX_EPOCH)
2211 .unwrap()
2212 .as_secs();
2213
2214 let boundary_timestamp = now - MERKLE_PAYMENT_EXPIRATION;
2216
2217 let age = now.saturating_sub(boundary_timestamp);
2218
2219 assert_eq!(age, MERKLE_PAYMENT_EXPIRATION);
2221 assert!(
2223 age <= MERKLE_PAYMENT_EXPIRATION,
2224 "Timestamp exactly at boundary should not be rejected"
2225 );
2226 }
2227
2228 #[test]
2230 fn test_timestamp_validation_beyond_expiration_boundary() {
2231 let now = std::time::SystemTime::now()
2232 .duration_since(std::time::UNIX_EPOCH)
2233 .unwrap()
2234 .as_secs();
2235
2236 let beyond_boundary_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 1);
2238
2239 let age = now.saturating_sub(beyond_boundary_timestamp);
2240
2241 assert!(
2242 age > MERKLE_PAYMENT_EXPIRATION,
2243 "Timestamp beyond boundary should be rejected"
2244 );
2245 }
2246
2247 #[test]
2249 fn test_timestamp_validation_at_current_time() {
2250 let now = std::time::SystemTime::now()
2251 .duration_since(std::time::UNIX_EPOCH)
2252 .unwrap()
2253 .as_secs();
2254
2255 let current_timestamp = now;
2257
2258 let age = now.saturating_sub(current_timestamp);
2259
2260 assert!(
2261 current_timestamp <= now,
2262 "Current timestamp should not be in future"
2263 );
2264 assert!(
2265 age <= MERKLE_PAYMENT_EXPIRATION,
2266 "Current timestamp should not be expired"
2267 );
2268 assert_eq!(age, 0, "Age should be 0 for current timestamp");
2269 }
2270
2271 #[test]
2273 fn test_timestamp_validation_near_future_boundary() {
2274 let now = std::time::SystemTime::now()
2275 .duration_since(std::time::UNIX_EPOCH)
2276 .unwrap()
2277 .as_secs();
2278
2279 let near_future_timestamp = now + 1;
2281
2282 assert!(
2283 near_future_timestamp > now,
2284 "Near-future timestamp should be rejected"
2285 );
2286 }
2287
2288 #[test]
2290 fn test_merkle_payment_expiration_constant() {
2291 const SEVEN_DAYS_IN_SECONDS: u64 = 7 * 24 * 60 * 60;
2292 assert_eq!(
2293 MERKLE_PAYMENT_EXPIRATION, SEVEN_DAYS_IN_SECONDS,
2294 "MERKLE_PAYMENT_EXPIRATION should be 7 days"
2295 );
2296 }
2297 }
2298}