1use super::{
10 Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::bootstrap::Bootstrap;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_evm::merkle_payments::MERKLE_PAYMENT_EXPIRATION;
22use ant_protocol::{
23 CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
24 error::Error as ProtocolError,
25 messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
26 storage::{Chunk, ValidationType, try_deserialize_record},
27};
28use bytes::Bytes;
29use futures::stream::{self, StreamExt};
30use itertools::Itertools;
31use libp2p::{
32 Multiaddr, PeerId,
33 identity::Keypair,
34 kad::{KBucketDistance as Distance, Record, U256},
35};
36use num_traits::cast::ToPrimitive;
37use rand::{
38 Rng, SeedableRng,
39 rngs::{OsRng, StdRng},
40 seq::SliceRandom,
41 thread_rng,
42};
43use std::{
44 collections::{BTreeSet, HashMap, HashSet},
45 net::SocketAddr,
46 path::PathBuf,
47 sync::{
48 Arc,
49 atomic::{AtomicUsize, Ordering},
50 },
51 time::{Duration, Instant},
52};
53use tokio::sync::{Mutex, watch};
54use tokio::{
55 sync::mpsc::Receiver,
56 task::{JoinSet, spawn},
57};
58
59pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
62
63const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
66
67const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
69
70const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
73
74const HIGHEST_SCORE: usize = 100;
76
77const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
80
81const TIME_STEP: usize = 20;
83
84const REPLICA_FETCH_RETRY_DELAY: Duration = Duration::from_secs(15 * 60);
86
87const REPLICA_FETCH_PEER_COUNT: usize = 5;
89
90const MIN_HEALTHY_REPLICA_COUNT: usize = 3;
92
93const CLOSE_NEIGHBOUR_DISTANCE_INDEX: usize = 7;
95
96const CLOSE_GROUP_TRACKING_LIMIT: usize = 20;
103
104const CLOSE_GROUP_RESTART_SUPPRESSION: Duration = Duration::from_secs(90);
114
115pub struct NodeBuilder {
117 addr: SocketAddr,
118 bootstrap: Bootstrap,
119 evm_address: RewardsAddress,
120 evm_network: EvmNetwork,
121 identity_keypair: Keypair,
122 local: bool,
123 #[cfg(feature = "open-metrics")]
124 metrics_server_port: Option<u16>,
126 no_upnp: bool,
127 relay_client: bool,
128 root_dir: PathBuf,
129}
130
131impl NodeBuilder {
132 pub fn new(
135 identity_keypair: Keypair,
136 bootstrap_flow: Bootstrap,
137 evm_address: RewardsAddress,
138 evm_network: EvmNetwork,
139 addr: SocketAddr,
140 root_dir: PathBuf,
141 ) -> Self {
142 Self {
143 addr,
144 bootstrap: bootstrap_flow,
145 evm_address,
146 evm_network,
147 identity_keypair,
148 local: false,
149 #[cfg(feature = "open-metrics")]
150 metrics_server_port: None,
151 no_upnp: false,
152 relay_client: false,
153 root_dir,
154 }
155 }
156
157 pub fn local(&mut self, local: bool) {
159 self.local = local;
160 }
161
162 #[cfg(feature = "open-metrics")]
163 pub fn metrics_server_port(&mut self, port: Option<u16>) {
165 self.metrics_server_port = port;
166 }
167
168 pub fn relay_client(&mut self, relay_client: bool) {
170 self.relay_client = relay_client;
171 }
172
173 pub fn no_upnp(&mut self, no_upnp: bool) {
175 self.no_upnp = no_upnp;
176 }
177
178 pub fn build_and_run(self) -> Result<RunningNode> {
191 #[cfg(feature = "open-metrics")]
193 let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
194 let mut metrics_registries = MetricsRegistries::default();
196 let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
197
198 (Some(metrics_recorder), metrics_registries)
199 } else {
200 (None, MetricsRegistries::default())
201 };
202
203 let (shutdown_tx, shutdown_rx) = watch::channel(false);
205
206 let network_config = NetworkConfig {
208 keypair: self.identity_keypair,
209 local: self.local,
210 listen_addr: self.addr,
211 root_dir: self.root_dir.clone(),
212 shutdown_rx: shutdown_rx.clone(),
213 bootstrap: self.bootstrap,
214 no_upnp: self.no_upnp,
215 relay_client: self.relay_client,
216 custom_request_timeout: None,
217 #[cfg(feature = "open-metrics")]
218 metrics_registries,
219 #[cfg(feature = "open-metrics")]
220 metrics_server_port: self.metrics_server_port,
221 };
222 let (network, network_event_receiver) = Network::init(network_config)?;
223
224 let node_events_channel = NodeEventsChannel::default();
226 let node = NodeInner {
227 network: network.clone(),
228 events_channel: node_events_channel.clone(),
229 close_group_tracker: Mutex::new(CloseGroupTracker::new(network.peer_id())),
230 reward_address: self.evm_address,
231 #[cfg(feature = "open-metrics")]
232 metrics_recorder,
233 evm_network: self.evm_network,
234 };
235 let node = Node {
236 inner: Arc::new(node),
237 };
238
239 node.run(network_event_receiver, shutdown_rx);
241 let running_node = RunningNode {
242 shutdown_sender: shutdown_tx,
243 network,
244 node_events_channel,
245 root_dir_path: self.root_dir,
246 rewards_address: self.evm_address,
247 };
248
249 Ok(running_node)
250 }
251}
252
253#[derive(Clone)]
257pub(crate) struct Node {
258 inner: Arc<NodeInner>,
259}
260
261struct NodeInner {
264 events_channel: NodeEventsChannel,
265 network: Network,
266 close_group_tracker: Mutex<CloseGroupTracker>,
267 #[cfg(feature = "open-metrics")]
268 metrics_recorder: Option<NodeMetricsRecorder>,
269 reward_address: RewardsAddress,
270 evm_network: EvmNetwork,
271}
272
273impl Node {
274 pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
276 &self.inner.events_channel
277 }
278
279 pub(crate) fn network(&self) -> &Network {
281 &self.inner.network
282 }
283
284 fn close_group_tracker(&self) -> &Mutex<CloseGroupTracker> {
285 &self.inner.close_group_tracker
286 }
287
288 #[cfg(feature = "open-metrics")]
289 pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
292 self.inner.metrics_recorder.as_ref()
293 }
294
295 pub(crate) fn reward_address(&self) -> &RewardsAddress {
297 &self.inner.reward_address
298 }
299
300 pub(crate) fn evm_network(&self) -> &EvmNetwork {
301 &self.inner.evm_network
302 }
303
304 fn run(
307 self,
308 mut network_event_receiver: Receiver<NetworkEvent>,
309 mut shutdown_rx: watch::Receiver<bool>,
310 ) {
311 let mut rng = StdRng::from_entropy();
312
313 let peers_connected = Arc::new(AtomicUsize::new(0));
314
315 let _node_task = spawn(async move {
316 let replication_interval: u64 = rng.gen_range(
319 PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
320 );
321 let replication_interval_time = Duration::from_secs(replication_interval);
322 debug!("Replication interval set to {replication_interval_time:?}");
323
324 let mut replication_interval = tokio::time::interval(replication_interval_time);
325 let _ = replication_interval.tick().await; let mut uptime_metrics_update_interval =
328 tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
329 let _ = uptime_metrics_update_interval.tick().await; let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
334 UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
335 ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
336 );
337 let irrelevant_records_cleanup_interval_time =
338 Duration::from_secs(irrelevant_records_cleanup_interval);
339 let mut irrelevant_records_cleanup_interval =
340 tokio::time::interval(irrelevant_records_cleanup_interval_time);
341 let _ = irrelevant_records_cleanup_interval.tick().await; let storage_challenge_interval: u64 =
346 rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
347 let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
348 debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
349
350 let mut storage_challenge_interval =
351 tokio::time::interval(storage_challenge_interval_time);
352 let _ = storage_challenge_interval.tick().await; loop {
355 let peers_connected = &peers_connected;
356
357 tokio::select! {
358 result = shutdown_rx.changed() => {
360 if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
361 info!("Shutdown signal received or sender dropped. Exiting network events loop.");
362 break;
363 }
364 },
365 net_event = network_event_receiver.recv() => {
366 match net_event {
367 Some(event) => {
368 let start = Instant::now();
369 let event_string = format!("{event:?}");
370
371 self.handle_network_event(event, peers_connected).await;
372 trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
373
374 }
375 None => {
376 error!("The `NetworkEvent` channel is closed");
377 self.events_channel().broadcast(NodeEvent::ChannelClosed);
378 break;
379 }
380 }
381 }
382 _ = replication_interval.tick() => {
384 let now = Instant::now();
385
386 {
387 let mut tracker = self.close_group_tracker().lock().await;
388 if tracker.handle_timer_expiry(now) {
389 trace!("Replication timer expired for tracked peers; periodic run will cover it");
390 }
391 }
392
393 let start = now;
394 let network = self.network().clone();
395 self.record_metrics(Marker::IntervalReplicationTriggered);
396
397 let _handle = spawn(async move {
398 Self::try_interval_replication(network);
399 trace!("Periodic replication took {:?}", start.elapsed());
400 });
401 }
402 _ = uptime_metrics_update_interval.tick() => {
403 #[cfg(feature = "open-metrics")]
404 if let Some(metrics_recorder) = self.metrics_recorder() {
405 let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
406 }
407 }
408 _ = irrelevant_records_cleanup_interval.tick() => {
409 let network = self.network().clone();
410
411 let _handle = spawn(async move {
412 Self::trigger_irrelevant_record_cleanup(network);
413 });
414 }
415 _ = storage_challenge_interval.tick() => {
417 let start = Instant::now();
418 debug!("Periodic storage challenge triggered");
419 let network = self.network().clone();
420
421 let _handle = spawn(async move {
422 Self::storage_challenge(network).await;
423 trace!("Periodic storage challenge took {:?}", start.elapsed());
424 });
425 }
426 }
427 }
428 });
429 }
430
431 pub(crate) fn record_metrics(&self, marker: Marker) {
434 marker.log();
435 #[cfg(feature = "open-metrics")]
436 if let Some(metrics_recorder) = self.metrics_recorder() {
437 metrics_recorder.record(marker)
438 }
439 }
440
441 async fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
446 let start = Instant::now();
447 let event_string = format!("{event:?}");
448 let event_header;
449
450 if let NetworkEvent::QueryRequestReceived {
452 query: Query::GetVersion { .. },
453 ..
454 } = event
455 {
456 trace!("Handling NetworkEvent {event_string}");
457 } else {
458 debug!("Handling NetworkEvent {event_string}");
459 }
460
461 match event {
462 NetworkEvent::PeerAdded(peer_id, connected_peers) => {
463 event_header = "PeerAdded";
464 let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
466 if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
467 self.events_channel()
468 .broadcast(NodeEvent::ConnectedToNetwork);
469 }
470
471 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
472 self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
473
474 let network = self.network().clone();
476 let _handle = spawn(async move {
477 Self::try_query_peer_version(network, peer_id, Default::default()).await;
478 });
479
480 let replication_decision = {
481 let mut tracker = self.close_group_tracker().lock().await;
482 tracker.record_peer_added(peer_id)
483 };
484
485 if replication_decision.should_trigger() {
486 let network = self.network().clone();
487 self.record_metrics(Marker::IntervalReplicationTriggered);
488 let _handle = spawn(async move {
489 Self::try_interval_replication(network);
490 });
491 } else if matches!(replication_decision, ReplicationDirective::Skip) {
492 trace!(
493 "Replication skipped for {peer_id:?} addition due to restart behaviour tracking"
494 );
495 }
496 }
497 NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
498 event_header = "PeerRemoved";
499 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
500 self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
501
502 let self_id = self.network().peer_id();
503 let distance =
504 NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
505 info!(
506 "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
507 distance.ilog2()
508 );
509
510 let replication_decision = {
511 let mut tracker = self.close_group_tracker().lock().await;
512 tracker.record_peer_removed(peer_id)
513 };
514
515 if replication_decision.should_trigger() {
516 let network = self.network().clone();
517 self.record_metrics(Marker::IntervalReplicationTriggered);
518 let _handle = spawn(async move {
519 Self::try_interval_replication(network);
520 });
521 } else if matches!(replication_decision, ReplicationDirective::Skip) {
522 trace!(
523 "Replication skipped for {peer_id:?} removal due to restart behaviour tracking"
524 );
525 }
526 }
527 NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
528 event_header = "PeerWithUnsupportedProtocol";
529 }
530 NetworkEvent::NewListenAddr(_) => {
531 event_header = "NewListenAddr";
532 }
533 NetworkEvent::ResponseReceived { res } => {
534 event_header = "ResponseReceived";
535 if let Err(err) = self.handle_response(res) {
536 error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
537 }
538 }
539 NetworkEvent::KeysToFetchForReplication(keys) => {
540 event_header = "KeysToFetchForReplication";
541 self.record_metrics(Marker::fetching_keys_for_replication(&keys));
542
543 if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
544 error!("Error while trying to fetch replicated data {err:?}");
545 }
546 }
547 NetworkEvent::QueryRequestReceived { query, channel } => {
548 event_header = "QueryRequestReceived";
549 let node = self.clone();
550 let payment_address = *self.reward_address();
551
552 let _handle = spawn(async move {
553 let network = node.network().clone();
554 let res = Self::handle_query(node, query, payment_address).await;
555
556 if let Response::Query(QueryResponse::GetVersion { .. }) = res {
558 trace!("Sending response {res:?}");
559 } else {
560 debug!("Sending response {res:?}");
561 }
562
563 network.send_response(res, channel);
564 });
565 }
566 NetworkEvent::UnverifiedRecord(record) => {
567 event_header = "UnverifiedRecord";
568 let self_clone = self.clone();
570 let _handle = spawn(async move {
571 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
572 match self_clone.validate_and_store_record(record).await {
573 Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
574 Err(err) => {
575 self_clone.record_metrics(Marker::RecordRejected(&key, &err));
576 }
577 }
578 });
579 }
580 NetworkEvent::TerminateNode { reason } => {
581 event_header = "TerminateNode";
582 error!("Received termination from swarm_driver due to {reason:?}");
583 self.events_channel()
584 .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
585 }
586 NetworkEvent::FailedToFetchHolders(bad_nodes) => {
587 event_header = "FailedToFetchHolders";
588 let network = self.network().clone();
589 let pretty_log: Vec<_> = bad_nodes
590 .iter()
591 .map(|(peer_id, record_key)| {
592 let pretty_key = PrettyPrintRecordKey::from(record_key);
593 (peer_id, pretty_key)
594 })
595 .collect();
596 debug!(
600 "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
601 );
602 let _handle = spawn(async move {
603 for (peer_id, record_key) in bad_nodes {
604 if let Ok(false) = network.is_record_key_present_locally(&record_key).await
607 {
608 error!(
609 "From peer {peer_id:?}, failed to fetch record {:?}",
610 PrettyPrintRecordKey::from(&record_key)
611 );
612 network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
613 }
614 }
615 });
616 }
617 NetworkEvent::QuoteVerification { quotes } => {
618 event_header = "QuoteVerification";
619 let network = self.network().clone();
620
621 let _handle = spawn(async move {
622 quotes_verification(&network, quotes).await;
623 });
624 }
625 NetworkEvent::FreshReplicateToFetch { holder, keys } => {
626 event_header = "FreshReplicateToFetch";
627 self.fresh_replicate_to_fetch(holder, keys);
628 }
629 NetworkEvent::PeersForVersionQuery(peers) => {
630 event_header = "PeersForVersionQuery";
631 let network = self.network().clone();
632 let _handle = spawn(async move {
633 Self::query_peers_version(network, peers).await;
634 });
635 }
636 NetworkEvent::NetworkWideReplication { keys } => {
637 event_header = "NetworkWideReplication";
638 self.perform_network_wide_replication(keys);
639 }
640 }
641
642 trace!(
643 "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
644 start.elapsed()
645 );
646 }
647
648 fn handle_response(&self, response: Response) -> Result<()> {
650 match response {
651 Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
652 warn!("Mishandled replicate response, should be handled earlier");
654 }
655 Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
656 error!(
657 "Response to replication shall be handled by called not by common handler, {resp:?}"
658 );
659 }
660 Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
661 }
663 other => {
664 warn!("handle_response not implemented for {other:?}");
665 }
666 };
667
668 Ok(())
669 }
670
671 async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
672 let network = node.network();
673 let resp: QueryResponse = match query {
674 Query::GetStoreQuote {
675 key,
676 data_type,
677 data_size,
678 nonce,
679 difficulty,
680 } => {
681 let record_key = key.to_record_key();
682 let self_id = network.peer_id();
683
684 let maybe_quoting_metrics = network
685 .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
686 .await;
687
688 let storage_proofs = if let Some(nonce) = nonce {
689 Self::respond_x_closest_record_proof(
690 network,
691 key.clone(),
692 nonce,
693 difficulty,
694 false,
695 )
696 .await
697 } else {
698 vec![]
699 };
700
701 match maybe_quoting_metrics {
702 Ok((quoting_metrics, is_already_stored)) => {
703 if is_already_stored {
704 QueryResponse::GetStoreQuote {
705 quote: Err(ProtocolError::RecordExists(
706 PrettyPrintRecordKey::from(&record_key).into_owned(),
707 )),
708 peer_address: NetworkAddress::from(self_id),
709 storage_proofs,
710 }
711 } else {
712 QueryResponse::GetStoreQuote {
713 quote: Self::create_quote_for_storecost(
714 network,
715 &key,
716 "ing_metrics,
717 &payment_address,
718 ),
719 peer_address: NetworkAddress::from(self_id),
720 storage_proofs,
721 }
722 }
723 }
724 Err(err) => {
725 warn!("GetStoreQuote failed for {key:?}: {err}");
726 QueryResponse::GetStoreQuote {
727 quote: Err(ProtocolError::GetStoreQuoteFailed),
728 peer_address: NetworkAddress::from(self_id),
729 storage_proofs,
730 }
731 }
732 }
733 }
734 Query::GetReplicatedRecord { requester: _, key } => {
735 let our_address = NetworkAddress::from(network.peer_id());
736 let record_key = key.to_record_key();
737
738 let result = match network.get_local_record(&record_key).await {
739 Ok(Some(record)) => Ok((our_address, Bytes::from(record.value))),
740 Ok(None) => Err(ProtocolError::ReplicatedRecordNotFound {
741 holder: Box::new(our_address),
742 key: Box::new(key.clone()),
743 }),
744 Err(err) => Err(ProtocolError::PutRecordFailed(format!(
746 "Error to fetch local record for GetReplicatedRecord {err:?}"
747 ))),
748 };
749
750 QueryResponse::GetReplicatedRecord(result)
751 }
752 Query::GetChunkExistenceProof {
753 key,
754 nonce,
755 difficulty,
756 } => QueryResponse::GetChunkExistenceProof(
757 Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
758 ),
759 Query::CheckNodeInProblem(target_address) => {
760 debug!("Got CheckNodeInProblem for peer {target_address:?}");
761
762 let is_in_trouble =
763 if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
764 result
765 } else {
766 debug!("Could not get status of {target_address:?}.");
767 false
768 };
769
770 QueryResponse::CheckNodeInProblem {
771 reporter_address: NetworkAddress::from(network.peer_id()),
772 target_address,
773 is_in_trouble,
774 }
775 }
776 Query::GetClosestPeers {
777 key,
778 num_of_peers,
779 range,
780 sign_result,
781 } => {
782 debug!(
783 "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
784 );
785 Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
786 .await
787 }
788 Query::GetVersion(_) => QueryResponse::GetVersion {
789 peer: NetworkAddress::from(network.peer_id()),
790 version: ant_build_info::package_version(),
791 },
792 Query::PutRecord {
793 holder,
794 address,
795 serialized_record,
796 } => {
797 let record = Record {
798 key: address.to_record_key(),
799 value: serialized_record,
800 publisher: None,
801 expires: None,
802 };
803
804 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
805 let result = match node.validate_and_store_record(record).await {
806 Ok(()) => Ok(()),
807 Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
808 node.record_metrics(Marker::RecordRejected(
809 &key,
810 &PutValidationError::OutdatedRecordCounter { counter, expected },
811 ));
812 Err(ProtocolError::OutdatedRecordCounter { counter, expected })
813 }
814 Err(PutValidationError::TopologyVerificationFailed {
815 target_address,
816 valid_count,
817 total_paid,
818 closest_count,
819 node_peers,
820 paid_peers,
821 }) => {
822 node.record_metrics(Marker::RecordRejected(
823 &key,
824 &PutValidationError::TopologyVerificationFailed {
825 target_address: target_address.clone(),
826 valid_count,
827 total_paid,
828 closest_count,
829 node_peers: node_peers.clone(),
830 paid_peers: paid_peers.clone(),
831 },
832 ));
833 Err(ProtocolError::TopologyVerificationFailed {
834 target_address: Box::new(target_address),
835 valid_count,
836 total_paid,
837 closest_count,
838 node_peers,
839 paid_peers,
840 })
841 }
842 Err(err) => {
843 node.record_metrics(Marker::RecordRejected(&key, &err));
844 Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
845 }
846 };
847 QueryResponse::PutRecord {
848 result,
849 peer_address: holder,
850 record_addr: address,
851 }
852 }
853 Query::GetMerkleCandidateQuote {
854 key,
855 data_type,
856 data_size,
857 merkle_payment_timestamp,
858 } => {
859 Self::respond_merkle_candidate_quote(
860 network,
861 key,
862 data_type,
863 data_size,
864 merkle_payment_timestamp,
865 payment_address,
866 )
867 .await
868 }
869 #[cfg(feature = "developer")]
870 Query::DevGetClosestPeersFromNetwork { key, num_of_peers } => {
871 debug!(
872 "Got DevGetClosestPeersFromNetwork targeting {key:?} with {num_of_peers:?} peers"
873 );
874 Self::respond_dev_get_closest_peers_from_network(network, key, num_of_peers).await
875 }
876 };
877 Response::Query(resp)
878 }
879
880 async fn respond_get_closest_peers(
881 network: &Network,
882 target: NetworkAddress,
883 num_of_peers: Option<usize>,
884 range: Option<[u8; 32]>,
885 sign_result: bool,
886 ) -> QueryResponse {
887 let local_peers = network.get_local_peers_with_multiaddr().await;
888 let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
889 Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
890 } else {
891 vec![]
892 };
893
894 let signature = if sign_result {
895 let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
896 bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
897 network.sign(&bytes).ok()
898 } else {
899 None
900 };
901
902 QueryResponse::GetClosestPeers {
903 target,
904 peers,
905 signature,
906 }
907 }
908
909 #[cfg(feature = "developer")]
914 async fn respond_dev_get_closest_peers_from_network(
915 network: &Network,
916 target: NetworkAddress,
917 num_of_peers: Option<usize>,
918 ) -> QueryResponse {
919 use ant_protocol::messages::QueryResponse;
920
921 let queried_node = NetworkAddress::from(network.peer_id());
922 debug!(
923 "DevGetClosestPeersFromNetwork: node {queried_node:?} querying network for closest peers to {target:?}"
924 );
925
926 let result = network.get_closest_peers(&target).await;
928
929 let peers = match result {
930 Ok(peers) => {
931 let mut converted: Vec<(NetworkAddress, Vec<Multiaddr>)> = peers
932 .into_iter()
933 .map(|(peer_id, addrs)| (NetworkAddress::from(peer_id), addrs.0))
934 .collect();
935
936 if let Some(n) = num_of_peers {
938 converted.sort_by_key(|(addr, _)| target.distance(addr));
939 converted.truncate(n);
940 }
941
942 debug!(
943 "DevGetClosestPeersFromNetwork: found {} peers closest to {target:?}",
944 converted.len()
945 );
946 converted
947 }
948 Err(err) => {
949 warn!(
950 "DevGetClosestPeersFromNetwork: failed to query network for {target:?}: {err:?}"
951 );
952 vec![]
953 }
954 };
955
956 QueryResponse::DevGetClosestPeersFromNetwork {
957 target,
958 queried_node,
959 peers,
960 }
961 }
962
963 fn calculate_get_closest_peers(
964 peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
965 target: NetworkAddress,
966 num_of_peers: Option<usize>,
967 range: Option<[u8; 32]>,
968 ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
969 match (num_of_peers, range) {
970 (_, Some(value)) => {
971 let distance = U256::from_big_endian(&value);
972 peer_addrs
973 .iter()
974 .filter_map(|(peer_id, multi_addrs)| {
975 let addr = NetworkAddress::from(*peer_id);
976 if target.distance(&addr).0 <= distance {
977 Some((addr, multi_addrs.clone()))
978 } else {
979 None
980 }
981 })
982 .collect()
983 }
984 (Some(num_of_peers), _) => {
985 let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
986 .iter()
987 .map(|(peer_id, multi_addrs)| {
988 let addr = NetworkAddress::from(*peer_id);
989 (addr, multi_addrs.clone())
990 })
991 .collect();
992 result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
993 result.into_iter().take(num_of_peers).collect()
994 }
995 (None, None) => vec![],
996 }
997 }
998
999 async fn respond_merkle_candidate_quote(
1003 network: &Network,
1004 key: NetworkAddress,
1005 data_type: u32,
1006 data_size: usize,
1007 merkle_payment_timestamp: u64,
1008 payment_address: RewardsAddress,
1009 ) -> QueryResponse {
1010 debug!(
1011 "merkle payment: GetMerkleCandidateQuote for target {key:?}, timestamp: {merkle_payment_timestamp}, data_type: {data_type}, data_size: {data_size}"
1012 );
1013
1014 const TIMESTAMP_TOLERANCE: u64 = 24 * 60 * 60; let now = std::time::SystemTime::now()
1024 .duration_since(std::time::UNIX_EPOCH)
1025 .unwrap_or_default()
1026 .as_secs();
1027
1028 let future_threshold = now + TIMESTAMP_TOLERANCE;
1030 if merkle_payment_timestamp > future_threshold {
1031 let error_msg = format!(
1032 "Rejected future timestamp {merkle_payment_timestamp} (current time: {now}, threshold: {future_threshold})"
1033 );
1034 warn!("{error_msg} for {key:?}");
1035 return QueryResponse::GetMerkleCandidateQuote(Err(
1036 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1037 ));
1038 }
1039
1040 let expiration_threshold = MERKLE_PAYMENT_EXPIRATION + TIMESTAMP_TOLERANCE;
1042 let age = now.saturating_sub(merkle_payment_timestamp);
1043 if age > expiration_threshold {
1044 let error_msg = format!(
1045 "Rejected expired timestamp {merkle_payment_timestamp} (age: {age}s, max: {expiration_threshold}s)",
1046 );
1047 warn!("{error_msg} for {key:?}");
1048 return QueryResponse::GetMerkleCandidateQuote(Err(
1049 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1050 ));
1051 }
1052
1053 let record_key = key.to_record_key();
1055 let (quoting_metrics, _is_already_stored) = match network
1056 .get_local_quoting_metrics(record_key, data_type, data_size)
1057 .await
1058 {
1059 Ok(metrics) => metrics,
1060 Err(err) => {
1061 let error_msg = format!("Failed to get quoting metrics for {key:?}: {err}");
1062 warn!("{error_msg}");
1063 return QueryResponse::GetMerkleCandidateQuote(Err(
1064 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1065 ));
1066 }
1067 };
1068
1069 let pub_key = network.get_pub_key();
1071 let reward_address = payment_address;
1072 let bytes = ant_evm::merkle_payments::MerklePaymentCandidateNode::bytes_to_sign(
1073 "ing_metrics,
1074 &reward_address,
1075 merkle_payment_timestamp,
1076 );
1077 let signature = match network.sign(&bytes) {
1078 Ok(sig) => sig,
1079 Err(e) => {
1080 let error_msg = format!("Failed to sign candidate node for {key:?}: {e}");
1081 error!("{error_msg}");
1082 return QueryResponse::GetMerkleCandidateQuote(Err(
1083 ProtocolError::FailedToSignMerkleCandidate(error_msg),
1084 ));
1085 }
1086 };
1087
1088 let candidate = ant_evm::merkle_payments::MerklePaymentCandidateNode {
1089 quoting_metrics,
1090 reward_address,
1091 merkle_payment_timestamp,
1092 pub_key,
1093 signature,
1094 };
1095 QueryResponse::GetMerkleCandidateQuote(Ok(candidate))
1096 }
1097
1098 async fn respond_x_closest_record_proof(
1101 network: &Network,
1102 key: NetworkAddress,
1103 nonce: Nonce,
1104 difficulty: usize,
1105 chunk_only: bool,
1106 ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
1107 let start = Instant::now();
1108 let mut results = vec![];
1109 if difficulty == 1 {
1110 let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
1112 if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
1113 let proof = ChunkProof::new(&record.value, nonce);
1114 debug!("Chunk proof for {key:?} is {proof:?}");
1115 result = Ok(proof)
1116 } else {
1117 debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
1118 }
1119
1120 results.push((key.clone(), result));
1121 } else {
1122 let all_local_records = network.get_all_local_record_addresses().await;
1123
1124 if let Ok(all_local_records) = all_local_records {
1125 let mut all_chunk_addrs: Vec<_> = if chunk_only {
1126 all_local_records
1127 .iter()
1128 .filter_map(|(addr, record_type)| {
1129 if *record_type == ValidationType::Chunk {
1130 Some(addr.clone())
1131 } else {
1132 None
1133 }
1134 })
1135 .collect()
1136 } else {
1137 all_local_records.keys().cloned().collect()
1138 };
1139
1140 all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
1142
1143 let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
1145
1146 for addr in all_chunk_addrs.iter().take(workload_factor) {
1147 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
1148 {
1149 let proof = ChunkProof::new(&record.value, nonce);
1150 debug!("Chunk proof for {key:?} is {proof:?}");
1151 results.push((addr.clone(), Ok(proof)));
1152 }
1153 }
1154 }
1155
1156 info!(
1157 "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
1158 results.len(),
1159 start.elapsed()
1160 );
1161 }
1162
1163 results
1164 }
1165
1166 async fn storage_challenge(network: Network) {
1170 let start = Instant::now();
1171 let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
1172 network.get_k_closest_local_peers_to_the_target(None).await
1173 {
1174 closest_peers
1175 .into_iter()
1176 .take(CLOSE_GROUP_SIZE)
1177 .collect_vec()
1178 } else {
1179 error!("Cannot get local neighbours");
1180 return;
1181 };
1182 if closest_peers.len() < CLOSE_GROUP_SIZE {
1183 debug!(
1184 "Not enough neighbours ({}/{}) to carry out storage challenge.",
1185 closest_peers.len(),
1186 CLOSE_GROUP_SIZE
1187 );
1188 return;
1189 }
1190
1191 let mut verify_candidates: Vec<NetworkAddress> =
1192 if let Ok(all_keys) = network.get_all_local_record_addresses().await {
1193 all_keys
1194 .iter()
1195 .filter_map(|(addr, record_type)| {
1196 if ValidationType::Chunk == *record_type {
1197 Some(addr.clone())
1198 } else {
1199 None
1200 }
1201 })
1202 .collect()
1203 } else {
1204 error!("Failed to get local record addresses.");
1205 return;
1206 };
1207 let num_of_targets = verify_candidates.len();
1208 if num_of_targets < 50 {
1209 debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
1210 return;
1211 }
1212
1213 let self_addr = NetworkAddress::from(network.peer_id());
1216 verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
1217 let index: usize = OsRng.gen_range(0..num_of_targets / 2);
1218 let target = verify_candidates[index].clone();
1219 let difficulty = CLOSE_GROUP_SIZE;
1221 verify_candidates.sort_by_key(|addr| target.distance(addr));
1222 let expected_targets = verify_candidates.into_iter().take(difficulty);
1223 let nonce: Nonce = thread_rng().r#gen::<u64>();
1224 let mut expected_proofs = HashMap::new();
1225 for addr in expected_targets {
1226 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
1227 let expected_proof = ChunkProof::new(&record.value, nonce);
1228 let _ = expected_proofs.insert(addr, expected_proof);
1229 } else {
1230 error!("Local record {addr:?} cann't be loaded from disk.");
1231 }
1232 }
1233 let request = Request::Query(Query::GetChunkExistenceProof {
1234 key: target.clone(),
1235 nonce,
1236 difficulty,
1237 });
1238
1239 let mut tasks = JoinSet::new();
1240 for (peer_id, addresses) in closest_peers {
1241 if peer_id == network.peer_id() {
1242 continue;
1243 }
1244 let network_clone = network.clone();
1245 let request_clone = request.clone();
1246 let expected_proofs_clone = expected_proofs.clone();
1247 let _ = tasks.spawn(async move {
1248 let res = scoring_peer(
1249 network_clone,
1250 (peer_id, addresses),
1251 request_clone,
1252 expected_proofs_clone,
1253 )
1254 .await;
1255 (peer_id, res)
1256 });
1257 }
1258
1259 let mut peer_scores = vec![];
1260 while let Some(res) = tasks.join_next().await {
1261 match res {
1262 Ok((peer_id, score)) => {
1263 let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
1264 if !is_healthy {
1265 info!(
1266 "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
1267 );
1268 network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
1270 }
1271 peer_scores.push((peer_id, is_healthy));
1272 }
1273 Err(e) => {
1274 info!("StorageChallenge task completed with error {e:?}");
1275 }
1276 }
1277 }
1278 if !peer_scores.is_empty() {
1279 network.notify_peer_scores(peer_scores);
1280 }
1281
1282 Self::verify_local_replication_health(network.clone()).await;
1283
1284 info!(
1285 "Completed node StorageChallenge against neighbours in {:?}!",
1286 start.elapsed()
1287 );
1288 }
1289
1290 async fn verify_local_replication_health(network: Network) {
1292 let closest_peers = match network.get_k_closest_local_peers_to_the_target(None).await {
1293 Ok(peers) => peers,
1294 Err(err) => {
1295 warn!("Cannot fetch closest peers for replica verification: {err:?}");
1296 return;
1297 }
1298 };
1299
1300 if closest_peers.len() <= CLOSE_NEIGHBOUR_DISTANCE_INDEX {
1301 debug!(
1302 "Skipping replica verification as we only know {} neighbours (need > {}).",
1303 closest_peers.len(),
1304 CLOSE_NEIGHBOUR_DISTANCE_INDEX
1305 );
1306 return;
1307 }
1308
1309 let self_address = NetworkAddress::from(network.peer_id());
1310 let Some((threshold_peer, _)) = closest_peers.get(CLOSE_NEIGHBOUR_DISTANCE_INDEX) else {
1311 debug!("Unable to determine distance threshold for replica verification.");
1312 return;
1313 };
1314
1315 let threshold_distance = self_address.distance(&NetworkAddress::from(*threshold_peer));
1316 let Some(threshold_ilog2) = threshold_distance.ilog2() else {
1317 debug!("Threshold distance lacks ilog2; cannot proceed with replica verification.");
1318 return;
1319 };
1320
1321 let local_records = match network.get_all_local_record_addresses().await {
1322 Ok(records) => records,
1323 Err(err) => {
1324 warn!("Failed to list local records for replica verification: {err:?}");
1325 return;
1326 }
1327 };
1328
1329 let mut nearby_records: Vec<NetworkAddress> = local_records
1330 .into_iter()
1331 .filter_map(|(address, record_type)| {
1332 if record_type != ValidationType::Chunk {
1333 return None;
1334 }
1335 let distance = self_address.distance(&address);
1336 distance.ilog2().and_then(|record_ilog2| {
1337 if record_ilog2 <= threshold_ilog2 {
1338 Some(address)
1339 } else {
1340 None
1341 }
1342 })
1343 })
1344 .collect();
1345
1346 nearby_records.shuffle(&mut thread_rng());
1347 let target_record = if let Some(entry) = nearby_records.first().cloned() {
1348 entry
1349 } else {
1350 debug!("No nearby chunk records available for replica verification.");
1351 return;
1352 };
1353
1354 let pretty_key = PrettyPrintRecordKey::from(&target_record.to_record_key()).into_owned();
1355
1356 let candidate_peers = match network
1357 .get_k_closest_local_peers_to_the_target(Some(target_record.clone()))
1358 .await
1359 {
1360 Ok(peers) => peers
1361 .into_iter()
1362 .filter(|(peer_id, _)| peer_id != &network.peer_id())
1363 .take(REPLICA_FETCH_PEER_COUNT)
1364 .collect::<Vec<_>>(),
1365 Err(err) => {
1366 warn!(
1367 "Cannot fetch record-specific closest peers for replica verification: {err:?}"
1368 );
1369 return;
1370 }
1371 };
1372
1373 if candidate_peers.len() < REPLICA_FETCH_PEER_COUNT {
1374 debug!(
1375 "Only {} peers available for replica verification (need at least {}).",
1376 candidate_peers.len(),
1377 REPLICA_FETCH_PEER_COUNT
1378 );
1379 return;
1380 }
1381
1382 debug!(
1383 "Verifying replicated record {pretty_key:?} against {} closest peers.",
1384 candidate_peers.len()
1385 );
1386
1387 let (successful_peers, failed_peers) = Self::fetch_record_from_peers_with_addresses(
1388 network.clone(),
1389 target_record.clone(),
1390 candidate_peers,
1391 )
1392 .await;
1393
1394 if failed_peers.is_empty() {
1395 debug!("All peers returned record {pretty_key:?} during replica verification.");
1396 return;
1397 }
1398
1399 if successful_peers.len() < MIN_HEALTHY_REPLICA_COUNT {
1400 warn!(
1401 "Replica verification fetched only {} copies of {pretty_key:?}. Record is unhealthy; skipping peer classification.",
1402 successful_peers.len()
1403 );
1404 return;
1405 }
1406
1407 if !failed_peers.is_empty() {
1408 info!(
1409 "Scheduling replica verification retry for {} peers on record {pretty_key:?}.",
1410 failed_peers.len()
1411 );
1412 Self::schedule_record_fetch_retry(network.clone(), target_record.clone(), failed_peers)
1413 .await;
1414 }
1415 }
1416
1417 async fn fetch_record_from_peers_with_addresses(
1419 network: Network,
1420 record_address: NetworkAddress,
1421 peers: Vec<(PeerId, Addresses)>,
1422 ) -> (Vec<PeerId>, Vec<PeerId>) {
1423 let request = Request::Query(Query::GetReplicatedRecord {
1424 requester: NetworkAddress::from(network.peer_id()),
1425 key: record_address.clone(),
1426 });
1427 let expected_key = record_address.to_record_key();
1428 let pretty_key = PrettyPrintRecordKey::from(&expected_key).into_owned();
1429
1430 let mut successes = Vec::new();
1431 let mut failures = Vec::new();
1432 let concurrency = peers.len();
1433 let results = stream::iter(peers.into_iter().map(|(peer_id, addrs)| {
1434 let network_clone = network.clone();
1435 let request_clone = request.clone();
1436 async move {
1437 let result = network_clone
1438 .send_request(request_clone, peer_id, addrs.clone())
1439 .await;
1440 (peer_id, addrs, result)
1441 }
1442 }))
1443 .buffer_unordered(concurrency)
1444 .collect::<Vec<_>>()
1445 .await;
1446
1447 for res in results {
1448 match res {
1449 (
1450 peer_id,
1451 _addrs,
1452 Ok((Response::Query(QueryResponse::GetReplicatedRecord(result)), _)),
1453 ) => {
1454 match result {
1455 Ok((_holder, value)) => {
1456 let record =
1458 Record::new(record_address.to_record_key(), value.to_vec());
1459 if let Ok(chunk) = try_deserialize_record::<Chunk>(&record)
1460 && chunk.network_address().to_record_key() == expected_key
1461 {
1462 successes.push(peer_id);
1463 } else {
1464 warn!(
1465 "Peer {peer_id:?} responded with an incorrect chunk copy of {pretty_key:?}."
1466 );
1467 failures.push(peer_id);
1468 }
1469 }
1470 Err(err) => {
1471 info!(
1472 "Peer {peer_id:?} responded with error {err:?} for replicated record {pretty_key:?}."
1473 );
1474 failures.push(peer_id);
1475 }
1476 }
1477 }
1478 (peer_id, _addrs, Ok((other_response, _))) => {
1479 warn!(
1480 "Peer {peer_id:?} responded with unexpected message {other_response:?} for {pretty_key:?}."
1481 );
1482 failures.push(peer_id);
1483 }
1484 (peer_id, _addrs, Err(err)) => {
1485 info!(
1486 "Failed to reach peer {peer_id:?} for replicated record {pretty_key:?}: {err:?}"
1487 );
1488 failures.push(peer_id);
1489 }
1490 }
1491 }
1492
1493 (successes, failures)
1494 }
1495
1496 async fn schedule_record_fetch_retry(
1498 network: Network,
1499 record_address: NetworkAddress,
1500 failed_peers: Vec<PeerId>,
1501 ) {
1502 let retry_peers: HashSet<_> = failed_peers.into_iter().collect();
1503 if retry_peers.is_empty() {
1504 return;
1505 }
1506
1507 let record_clone = record_address.clone();
1508 let network_clone = network.clone();
1509
1510 tokio::time::sleep(REPLICA_FETCH_RETRY_DELAY).await;
1511 let pretty_key = PrettyPrintRecordKey::from(&record_clone.to_record_key()).into_owned();
1512
1513 let refreshed_candidates =
1514 Self::refresh_retry_candidate_addresses(&network_clone, &record_clone, &retry_peers)
1515 .await;
1516
1517 if refreshed_candidates.is_empty() {
1518 info!(
1519 "Skipping replica retry for {pretty_key:?}; no tracked peers remain close to the record."
1520 );
1521 return;
1522 }
1523
1524 let (_, still_failed) = Self::fetch_record_from_peers_with_addresses(
1525 network_clone.clone(),
1526 record_clone.clone(),
1527 refreshed_candidates,
1528 )
1529 .await;
1530
1531 if still_failed.is_empty() {
1532 info!("All peers successfully returned {pretty_key:?} during replica retry.");
1533 return;
1534 }
1535
1536 warn!(
1537 "{} peers still failed to provide {pretty_key:?}; evicting and blacklisting.",
1538 still_failed.len()
1539 );
1540 for peer_id in still_failed {
1541 Self::evict_and_blacklist_peer(network_clone.clone(), peer_id);
1542 }
1543 }
1544
1545 async fn refresh_retry_candidate_addresses(
1547 network: &Network,
1548 record_address: &NetworkAddress,
1549 retry_peers: &HashSet<PeerId>,
1550 ) -> Vec<(PeerId, Addresses)> {
1551 let pretty_key = PrettyPrintRecordKey::from(&record_address.to_record_key()).into_owned();
1552 let Ok(closest_peers) = network.get_closest_peers(record_address).await else {
1553 warn!(
1554 "Failed to refresh peer addresses for {pretty_key:?}; unable to retry replica fetch."
1555 );
1556 return Vec::new();
1557 };
1558
1559 let closest_map: HashMap<PeerId, Addresses> = closest_peers.into_iter().collect();
1560
1561 retry_peers
1562 .iter()
1563 .filter_map(|peer_id| {
1564 closest_map
1565 .get(peer_id)
1566 .cloned()
1567 .map(|addrs| (*peer_id, addrs))
1568 })
1569 .collect()
1570 }
1571
1572 fn evict_and_blacklist_peer(network: Network, peer_id: PeerId) {
1574 network.remove_peer(peer_id);
1575 network.add_peer_to_blocklist(peer_id);
1576 }
1577
1578 async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1580 for (peer_id, addrs) in peers {
1582 Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1583 }
1584 }
1585
1586 async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1588 let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1589 let version = match network.send_request(request, peer, addrs).await {
1591 Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1592 trace!("Fetched peer version {peer:?} as {version:?}");
1593 version
1594 }
1595 Ok(other) => {
1596 info!("Not a fetched peer version {peer:?}, {other:?}");
1597 "none".to_string()
1598 }
1599 Err(err) => {
1600 info!("Failed to fetch peer version {peer:?} with error {err:?}");
1601 network.remove_peer(peer);
1605 return;
1606 }
1607 };
1608 network.notify_node_version(peer, version);
1609 }
1610}
1611
1612#[derive(Debug)]
1613struct CloseGroupTracker {
1614 self_address: NetworkAddress,
1615 close_up_peers: BTreeSet<(Distance, PeerId)>,
1619 tracked_entries: HashMap<PeerId, BehaviourEntry>,
1620}
1621
1622impl CloseGroupTracker {
1623 fn new(self_peer_id: PeerId) -> Self {
1624 Self {
1625 self_address: NetworkAddress::from(self_peer_id),
1626 close_up_peers: BTreeSet::new(),
1627 tracked_entries: HashMap::new(),
1628 }
1629 }
1630
1631 fn record_peer_added(&mut self, peer_id: PeerId) -> ReplicationDirective {
1632 let distance = self.distance_to_peer(peer_id);
1633
1634 let was_present = self.close_up_peers.contains(&(distance, peer_id));
1636
1637 let is_close_enough = self.is_distance_within_close_range(distance);
1639
1640 if is_close_enough {
1641 let _ = self.close_up_peers.insert((distance, peer_id));
1643
1644 if self.close_up_peers.len() > CLOSE_GROUP_TRACKING_LIMIT
1646 && let Some(&(farthest_distance, farthest_peer)) =
1647 self.close_up_peers.iter().next_back()
1648 {
1649 let _ = self
1650 .close_up_peers
1651 .remove(&(farthest_distance, farthest_peer));
1652 }
1653 } else {
1654 let _ = self.tracked_entries.remove(&peer_id);
1656 return ReplicationDirective::Ignore;
1657 }
1658
1659 use std::collections::hash_map::Entry;
1660 match self.tracked_entries.entry(peer_id) {
1661 Entry::Vacant(vacant_entry) => {
1662 let _ = vacant_entry.insert(BehaviourEntry::default());
1663 if !was_present {
1664 ReplicationDirective::Trigger
1665 } else {
1666 ReplicationDirective::Ignore
1667 }
1668 }
1669 Entry::Occupied(mut occupied_entry) => {
1670 let entry = occupied_entry.get_mut();
1671 if entry.awaiting_rejoin || entry.timer_deadline.is_some() {
1672 entry.restart_detected = true;
1673 entry.awaiting_rejoin = false;
1674 entry.timer_deadline = None;
1675 ReplicationDirective::Skip
1676 } else if !was_present {
1677 entry.restart_detected = false;
1678 ReplicationDirective::Trigger
1679 } else {
1680 ReplicationDirective::Ignore
1681 }
1682 }
1683 }
1684 }
1685
1686 fn record_peer_removed(&mut self, peer_id: PeerId) -> ReplicationDirective {
1687 let distance = self.distance_to_peer(peer_id);
1688
1689 let was_tracked = self.close_up_peers.contains(&(distance, peer_id));
1691
1692 if !was_tracked {
1693 return ReplicationDirective::Ignore;
1695 }
1696
1697 let _ = self.close_up_peers.remove(&(distance, peer_id));
1699
1700 let entry = self.tracked_entries.entry(peer_id).or_default();
1701
1702 if entry.restart_detected {
1703 entry.timer_deadline = Some(Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION);
1704 entry.awaiting_rejoin = false;
1705 ReplicationDirective::Skip
1706 } else {
1707 entry.awaiting_rejoin = true;
1708 entry.timer_deadline = None;
1709 ReplicationDirective::Trigger
1710 }
1711 }
1712
1713 fn handle_timer_expiry(&mut self, now: Instant) -> bool {
1714 let mut expired = false;
1715 for entry in self.tracked_entries.values_mut() {
1716 if let Some(deadline) = entry.timer_deadline
1717 && now >= deadline
1718 {
1719 entry.timer_deadline = None;
1720 entry.restart_detected = false;
1721 entry.awaiting_rejoin = false;
1722 expired = true;
1723 }
1724 }
1725
1726 self.tracked_entries.retain(|peer_id, entry| {
1728 let is_currently_present = self
1729 .close_up_peers
1730 .iter()
1731 .any(|(_, present_peer)| present_peer == peer_id);
1732
1733 entry.awaiting_rejoin
1734 || entry.restart_detected
1735 || entry.timer_deadline.is_some()
1736 || is_currently_present
1737 });
1738
1739 expired
1740 }
1741
1742 fn is_distance_within_close_range(&self, distance: Distance) -> bool {
1743 if self.close_up_peers.len() < CLOSE_GROUP_TRACKING_LIMIT {
1745 return true;
1746 }
1747
1748 if let Some(&(farthest_distance, _)) = self.close_up_peers.iter().next_back() {
1750 distance < farthest_distance
1751 } else {
1752 true
1753 }
1754 }
1755
1756 fn distance_to_peer(&self, peer_id: PeerId) -> Distance {
1757 self.self_address.distance(&NetworkAddress::from(peer_id))
1758 }
1759}
1760
1761#[derive(Debug, Default)]
1762struct BehaviourEntry {
1763 awaiting_rejoin: bool,
1764 restart_detected: bool,
1765 timer_deadline: Option<Instant>,
1766}
1767
1768#[derive(Debug, PartialEq, Eq)]
1769enum ReplicationDirective {
1770 Trigger,
1771 Skip,
1772 Ignore,
1773}
1774
1775impl ReplicationDirective {
1776 fn should_trigger(&self) -> bool {
1777 matches!(self, Self::Trigger)
1778 }
1779}
1780
1781#[cfg(test)]
1782mod close_group_tracker_tests {
1783 use super::*;
1784 use std::time::Duration;
1785
1786 fn random_peer() -> PeerId {
1787 let keypair = libp2p::identity::Keypair::generate_ed25519();
1788 PeerId::from(keypair.public())
1789 }
1790
1791 #[test]
1792 fn new_peer_triggers_replication() {
1793 let mut tracker = CloseGroupTracker::new(random_peer());
1794 let peer = random_peer();
1795
1796 assert_eq!(
1797 tracker.record_peer_added(peer),
1798 ReplicationDirective::Trigger
1799 );
1800
1801 assert_eq!(
1802 tracker.record_peer_removed(peer),
1803 ReplicationDirective::Trigger
1804 );
1805 }
1806
1807 #[test]
1808 fn restart_detection_skips_replication() {
1809 let mut tracker = CloseGroupTracker::new(random_peer());
1810 let peer = random_peer();
1811
1812 assert_eq!(
1813 tracker.record_peer_added(peer),
1814 ReplicationDirective::Trigger
1815 );
1816
1817 assert_eq!(
1818 tracker.record_peer_removed(peer),
1819 ReplicationDirective::Trigger
1820 );
1821
1822 assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1823
1824 assert_eq!(
1825 tracker.record_peer_removed(peer),
1826 ReplicationDirective::Skip
1827 );
1828
1829 assert!(tracker.handle_timer_expiry(
1830 Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1)
1831 ));
1832
1833 assert_eq!(
1834 tracker.record_peer_added(peer),
1835 ReplicationDirective::Trigger
1836 );
1837 }
1838
1839 #[test]
1840 fn peer_outside_close_group_is_ignored() {
1841 let mut tracker = CloseGroupTracker::new(random_peer());
1842
1843 let mut added_peers = Vec::new();
1845 for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1846 let peer = random_peer();
1847 let result = tracker.record_peer_added(peer);
1848 assert_eq!(result, ReplicationDirective::Trigger);
1850 added_peers.push(peer);
1851 }
1852
1853 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1854
1855 let mut ignored_count = 0;
1857 let mut trigger_count = 0;
1858 for _ in 0..50 {
1859 let peer = random_peer();
1860 match tracker.record_peer_added(peer) {
1861 ReplicationDirective::Ignore => ignored_count += 1,
1862 ReplicationDirective::Trigger => trigger_count += 1,
1863 _ => {}
1864 }
1865 }
1866
1867 assert!(
1870 ignored_count > 0 || trigger_count > 0,
1871 "Expected some peers to be processed"
1872 );
1873
1874 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1876 }
1877
1878 #[test]
1879 fn removal_of_untracked_peer_is_ignored() {
1880 let mut tracker = CloseGroupTracker::new(random_peer());
1881
1882 let unknown_peer = random_peer();
1884 assert_eq!(
1885 tracker.record_peer_removed(unknown_peer),
1886 ReplicationDirective::Ignore
1887 );
1888 }
1889
1890 #[test]
1891 fn timer_expiry_resets_restart_state() {
1892 let mut tracker = CloseGroupTracker::new(random_peer());
1893 let peer = random_peer();
1894
1895 assert_eq!(
1897 tracker.record_peer_added(peer),
1898 ReplicationDirective::Trigger
1899 );
1900 assert_eq!(
1901 tracker.record_peer_removed(peer),
1902 ReplicationDirective::Trigger
1903 );
1904
1905 assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1907
1908 assert_eq!(
1910 tracker.record_peer_removed(peer),
1911 ReplicationDirective::Skip
1912 );
1913
1914 assert!(!tracker.handle_timer_expiry(Instant::now()));
1916
1917 let after_expiry =
1919 Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1920 assert!(tracker.handle_timer_expiry(after_expiry));
1921
1922 assert_eq!(
1924 tracker.record_peer_added(peer),
1925 ReplicationDirective::Trigger
1926 );
1927 }
1928
1929 #[test]
1930 fn eviction_maintains_closest_peers() {
1931 let self_peer = random_peer();
1932 let mut tracker = CloseGroupTracker::new(self_peer);
1933
1934 let mut peers_with_distances: Vec<(PeerId, Distance)> = Vec::new();
1936 for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1937 let peer = random_peer();
1938 let distance = tracker.distance_to_peer(peer);
1939 let _ = tracker.record_peer_added(peer);
1940 peers_with_distances.push((peer, distance));
1941 }
1942
1943 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1944
1945 peers_with_distances.sort_by_key(|(_, d)| *d);
1947 let farthest_distance = peers_with_distances.last().map(|(_, d)| *d).unwrap();
1948
1949 let mut found_closer = false;
1952 for _ in 0..100 {
1953 let new_peer = random_peer();
1954 let new_distance = tracker.distance_to_peer(new_peer);
1955 if new_distance < farthest_distance {
1956 let result = tracker.record_peer_added(new_peer);
1957 assert_eq!(result, ReplicationDirective::Trigger);
1958 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1960 found_closer = true;
1961 break;
1962 }
1963 }
1964
1965 assert!(
1968 found_closer,
1969 "Could not find a peer closer than the farthest in 100 attempts"
1970 );
1971 }
1972
1973 #[test]
1974 fn tracked_entries_cleaned_up_on_timer_expiry() {
1975 let mut tracker = CloseGroupTracker::new(random_peer());
1976 let peer = random_peer();
1977
1978 let _ = tracker.record_peer_added(peer);
1980 let _ = tracker.record_peer_removed(peer);
1981 let _ = tracker.record_peer_added(peer);
1982 let _ = tracker.record_peer_removed(peer);
1983
1984 assert!(tracker.tracked_entries.contains_key(&peer));
1986
1987 let after_expiry =
1989 Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1990 let _ = tracker.handle_timer_expiry(after_expiry);
1991
1992 assert!(!tracker.tracked_entries.contains_key(&peer));
1995 }
1996}
1997
1998async fn scoring_peer(
1999 network: Network,
2000 peer: (PeerId, Addresses),
2001 request: Request,
2002 expected_proofs: HashMap<NetworkAddress, ChunkProof>,
2003) -> usize {
2004 let peer_id = peer.0;
2005 let start = Instant::now();
2006 let responses = network
2007 .send_and_get_responses(&[peer], &request, true)
2008 .await;
2009
2010 if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
2011 responses.get(&peer_id)
2012 {
2013 if answers.is_empty() {
2014 info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
2015 return 0;
2016 }
2017 let elapsed = start.elapsed();
2018
2019 let mut received_proofs = vec![];
2020 for (addr, proof) in answers {
2021 if let Ok(proof) = proof {
2022 received_proofs.push((addr.clone(), proof.clone()));
2023 }
2024 }
2025
2026 let score = mark_peer(elapsed, received_proofs, &expected_proofs);
2027 info!(
2028 "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
2029 answers.len()
2030 );
2031 score
2032 } else {
2033 info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
2034 0
2035 }
2036}
2037
2038fn mark_peer(
2044 duration: Duration,
2045 answers: Vec<(NetworkAddress, ChunkProof)>,
2046 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
2047) -> usize {
2048 let duration_score = duration_score_scheme(duration);
2049 let challenge_score = challenge_score_scheme(answers, expected_proofs);
2050
2051 duration_score * challenge_score
2052}
2053
2054fn duration_score_scheme(duration: Duration) -> usize {
2056 let in_ms = if let Some(value) = duration.as_millis().to_usize() {
2058 value
2059 } else {
2060 info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
2061 1000
2062 };
2063
2064 let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
2065 HIGHEST_SCORE - step
2066}
2067
2068fn challenge_score_scheme(
2070 answers: Vec<(NetworkAddress, ChunkProof)>,
2071 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
2072) -> usize {
2073 let mut correct_answers = 0;
2074 for (addr, chunk_proof) in answers {
2075 if let Some(expected_proof) = expected_proofs.get(&addr) {
2076 if expected_proof.verify(&chunk_proof) {
2077 correct_answers += 1;
2078 } else {
2079 info!("Spot a false answer to the challenge regarding {addr:?}");
2080 return 0;
2082 }
2083 }
2084 }
2085 std::cmp::min(
2092 HIGHEST_SCORE,
2093 HIGHEST_SCORE * correct_answers / expected_proofs.len(),
2094 )
2095}
2096
2097#[cfg(test)]
2098mod tests {
2099 use super::*;
2100 use std::str::FromStr;
2101
2102 #[test]
2103 fn test_no_local_peers() {
2104 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
2105 let target = NetworkAddress::from(PeerId::random());
2106 let num_of_peers = Some(5);
2107 let range = None;
2108 let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
2109
2110 assert_eq!(result, vec![]);
2111 }
2112
2113 #[test]
2114 fn test_fewer_local_peers_than_num_of_peers() {
2115 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2116 (
2117 PeerId::random(),
2118 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2119 ),
2120 (
2121 PeerId::random(),
2122 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2123 ),
2124 (
2125 PeerId::random(),
2126 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2127 ),
2128 ];
2129 let target = NetworkAddress::from(PeerId::random());
2130 let num_of_peers = Some(2);
2131 let range = None;
2132 let result = Node::calculate_get_closest_peers(
2133 local_peers.clone(),
2134 target.clone(),
2135 num_of_peers,
2136 range,
2137 );
2138
2139 let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2141 .iter()
2142 .map(|(peer_id, multi_addrs)| {
2143 let addr = NetworkAddress::from(*peer_id);
2144 (addr, multi_addrs.clone())
2145 })
2146 .collect();
2147 expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
2148 let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
2149
2150 assert_eq!(expected_result, result);
2151 }
2152
2153 #[test]
2154 fn test_with_range_and_num_of_peers() {
2155 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2156 (
2157 PeerId::random(),
2158 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2159 ),
2160 (
2161 PeerId::random(),
2162 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2163 ),
2164 (
2165 PeerId::random(),
2166 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2167 ),
2168 ];
2169 let target = NetworkAddress::from(PeerId::random());
2170 let num_of_peers = Some(0);
2171 let range_value = [128; 32];
2172 let range = Some(range_value);
2173 let result = Node::calculate_get_closest_peers(
2174 local_peers.clone(),
2175 target.clone(),
2176 num_of_peers,
2177 range,
2178 );
2179
2180 let distance = U256::from_big_endian(&range_value);
2182 let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2183 .into_iter()
2184 .filter_map(|(peer_id, multi_addrs)| {
2185 let addr = NetworkAddress::from(peer_id);
2186 if target.distance(&addr).0 <= distance {
2187 Some((addr, multi_addrs.clone()))
2188 } else {
2189 None
2190 }
2191 })
2192 .collect();
2193
2194 assert_eq!(expected_result, result);
2195 }
2196
2197 mod merkle_payment_tests {
2198 use super::*;
2199
2200 #[test]
2202 fn test_timestamp_validation_accepts_valid_timestamp() {
2203 let now = std::time::SystemTime::now()
2204 .duration_since(std::time::UNIX_EPOCH)
2205 .unwrap()
2206 .as_secs();
2207
2208 let valid_timestamp = now - 3600;
2210
2211 let age = now.saturating_sub(valid_timestamp);
2213
2214 assert!(
2215 valid_timestamp <= now,
2216 "Valid timestamp should not be in the future"
2217 );
2218 assert!(
2219 age <= MERKLE_PAYMENT_EXPIRATION,
2220 "Valid timestamp should not be expired"
2221 );
2222 }
2223
2224 #[test]
2226 fn test_timestamp_validation_rejects_future_timestamp() {
2227 let now = std::time::SystemTime::now()
2228 .duration_since(std::time::UNIX_EPOCH)
2229 .unwrap()
2230 .as_secs();
2231
2232 let future_timestamp = now + 3600;
2234
2235 assert!(
2237 future_timestamp > now,
2238 "Future timestamp should be rejected"
2239 );
2240 }
2241
2242 #[test]
2244 fn test_timestamp_validation_rejects_expired_timestamp() {
2245 let now = std::time::SystemTime::now()
2246 .duration_since(std::time::UNIX_EPOCH)
2247 .unwrap()
2248 .as_secs();
2249
2250 let expired_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 86400);
2252
2253 let age = now.saturating_sub(expired_timestamp);
2255
2256 assert!(
2258 age > MERKLE_PAYMENT_EXPIRATION,
2259 "Expired timestamp should be rejected"
2260 );
2261 }
2262
2263 #[test]
2265 fn test_timestamp_validation_at_expiration_boundary() {
2266 let now = std::time::SystemTime::now()
2267 .duration_since(std::time::UNIX_EPOCH)
2268 .unwrap()
2269 .as_secs();
2270
2271 let boundary_timestamp = now - MERKLE_PAYMENT_EXPIRATION;
2273
2274 let age = now.saturating_sub(boundary_timestamp);
2275
2276 assert_eq!(age, MERKLE_PAYMENT_EXPIRATION);
2278 assert!(
2280 age <= MERKLE_PAYMENT_EXPIRATION,
2281 "Timestamp exactly at boundary should not be rejected"
2282 );
2283 }
2284
2285 #[test]
2287 fn test_timestamp_validation_beyond_expiration_boundary() {
2288 let now = std::time::SystemTime::now()
2289 .duration_since(std::time::UNIX_EPOCH)
2290 .unwrap()
2291 .as_secs();
2292
2293 let beyond_boundary_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 1);
2295
2296 let age = now.saturating_sub(beyond_boundary_timestamp);
2297
2298 assert!(
2299 age > MERKLE_PAYMENT_EXPIRATION,
2300 "Timestamp beyond boundary should be rejected"
2301 );
2302 }
2303
2304 #[test]
2306 fn test_timestamp_validation_at_current_time() {
2307 let now = std::time::SystemTime::now()
2308 .duration_since(std::time::UNIX_EPOCH)
2309 .unwrap()
2310 .as_secs();
2311
2312 let current_timestamp = now;
2314
2315 let age = now.saturating_sub(current_timestamp);
2316
2317 assert!(
2318 current_timestamp <= now,
2319 "Current timestamp should not be in future"
2320 );
2321 assert!(
2322 age <= MERKLE_PAYMENT_EXPIRATION,
2323 "Current timestamp should not be expired"
2324 );
2325 assert_eq!(age, 0, "Age should be 0 for current timestamp");
2326 }
2327
2328 #[test]
2330 fn test_timestamp_validation_near_future_boundary() {
2331 let now = std::time::SystemTime::now()
2332 .duration_since(std::time::UNIX_EPOCH)
2333 .unwrap()
2334 .as_secs();
2335
2336 let near_future_timestamp = now + 1;
2338
2339 assert!(
2340 near_future_timestamp > now,
2341 "Near-future timestamp should be rejected"
2342 );
2343 }
2344
2345 #[test]
2347 fn test_merkle_payment_expiration_constant() {
2348 const SEVEN_DAYS_IN_SECONDS: u64 = 7 * 24 * 60 * 60;
2349 assert_eq!(
2350 MERKLE_PAYMENT_EXPIRATION, SEVEN_DAYS_IN_SECONDS,
2351 "MERKLE_PAYMENT_EXPIRATION should be 7 days"
2352 );
2353 }
2354 }
2355}