1use super::{
10 Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::bootstrap::Bootstrap;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_evm::merkle_payments::MERKLE_PAYMENT_EXPIRATION;
22use ant_protocol::{
23 CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
24 error::Error as ProtocolError,
25 messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
26 storage::{Chunk, ValidationType, try_deserialize_record},
27};
28use bytes::Bytes;
29use futures::stream::{self, StreamExt};
30use itertools::Itertools;
31use libp2p::{
32 Multiaddr, PeerId,
33 identity::Keypair,
34 kad::{KBucketDistance as Distance, Record, U256},
35};
36use num_traits::cast::ToPrimitive;
37use rand::{
38 Rng, SeedableRng,
39 rngs::{OsRng, StdRng},
40 seq::SliceRandom,
41 thread_rng,
42};
43use std::{
44 collections::{BTreeSet, HashMap, HashSet},
45 net::SocketAddr,
46 path::PathBuf,
47 sync::{
48 Arc,
49 atomic::{AtomicUsize, Ordering},
50 },
51 time::{Duration, Instant},
52};
53use tokio::sync::{Mutex, watch};
54use tokio::{
55 sync::mpsc::Receiver,
56 task::{JoinSet, spawn},
57};
58
59pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
62
63const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
66
67const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
69
70const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
73
74const HIGHEST_SCORE: usize = 100;
76
77const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
80
81const TIME_STEP: usize = 50;
90
91const REPLICA_FETCH_RETRY_DELAY: Duration = Duration::from_secs(15 * 60);
93
94const REPLICA_FETCH_PEER_COUNT: usize = 5;
96
97const MIN_HEALTHY_REPLICA_COUNT: usize = 3;
99
100const CLOSE_NEIGHBOUR_DISTANCE_INDEX: usize = 7;
102
103const CLOSE_GROUP_TRACKING_LIMIT: usize = 20;
110
111const CLOSE_GROUP_RESTART_SUPPRESSION: Duration = Duration::from_secs(90);
121
122pub struct NodeBuilder {
124 addr: SocketAddr,
125 bootstrap: Bootstrap,
126 evm_address: RewardsAddress,
127 evm_network: EvmNetwork,
128 identity_keypair: Keypair,
129 local: bool,
130 #[cfg(feature = "open-metrics")]
131 metrics_server_port: Option<u16>,
133 no_upnp: bool,
134 relay_client: bool,
135 root_dir: PathBuf,
136}
137
138impl NodeBuilder {
139 pub fn new(
142 identity_keypair: Keypair,
143 bootstrap_flow: Bootstrap,
144 evm_address: RewardsAddress,
145 evm_network: EvmNetwork,
146 addr: SocketAddr,
147 root_dir: PathBuf,
148 ) -> Self {
149 Self {
150 addr,
151 bootstrap: bootstrap_flow,
152 evm_address,
153 evm_network,
154 identity_keypair,
155 local: false,
156 #[cfg(feature = "open-metrics")]
157 metrics_server_port: None,
158 no_upnp: false,
159 relay_client: false,
160 root_dir,
161 }
162 }
163
164 pub fn local(&mut self, local: bool) {
166 self.local = local;
167 }
168
169 #[cfg(feature = "open-metrics")]
170 pub fn metrics_server_port(&mut self, port: Option<u16>) {
172 self.metrics_server_port = port;
173 }
174
175 pub fn relay_client(&mut self, relay_client: bool) {
177 self.relay_client = relay_client;
178 }
179
180 pub fn no_upnp(&mut self, no_upnp: bool) {
182 self.no_upnp = no_upnp;
183 }
184
185 pub fn build_and_run(self) -> Result<RunningNode> {
198 #[cfg(feature = "open-metrics")]
200 let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
201 let mut metrics_registries = MetricsRegistries::default();
203 let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
204
205 (Some(metrics_recorder), metrics_registries)
206 } else {
207 (None, MetricsRegistries::default())
208 };
209
210 let (shutdown_tx, shutdown_rx) = watch::channel(false);
212
213 let network_config = NetworkConfig {
215 keypair: self.identity_keypair,
216 local: self.local,
217 listen_addr: self.addr,
218 root_dir: self.root_dir.clone(),
219 shutdown_rx: shutdown_rx.clone(),
220 bootstrap: self.bootstrap,
221 no_upnp: self.no_upnp,
222 relay_client: self.relay_client,
223 custom_request_timeout: None,
224 #[cfg(feature = "open-metrics")]
225 metrics_registries,
226 #[cfg(feature = "open-metrics")]
227 metrics_server_port: self.metrics_server_port,
228 };
229 let (network, network_event_receiver) = Network::init(network_config)?;
230
231 let node_events_channel = NodeEventsChannel::default();
233 let node = NodeInner {
234 network: network.clone(),
235 events_channel: node_events_channel.clone(),
236 close_group_tracker: Mutex::new(CloseGroupTracker::new(network.peer_id())),
237 reward_address: self.evm_address,
238 #[cfg(feature = "open-metrics")]
239 metrics_recorder,
240 evm_network: self.evm_network,
241 };
242 let node = Node {
243 inner: Arc::new(node),
244 };
245
246 node.run(network_event_receiver, shutdown_rx);
248 let running_node = RunningNode {
249 shutdown_sender: shutdown_tx,
250 network,
251 node_events_channel,
252 root_dir_path: self.root_dir,
253 rewards_address: self.evm_address,
254 };
255
256 Ok(running_node)
257 }
258}
259
260#[derive(Clone)]
264pub(crate) struct Node {
265 inner: Arc<NodeInner>,
266}
267
268struct NodeInner {
271 events_channel: NodeEventsChannel,
272 network: Network,
273 close_group_tracker: Mutex<CloseGroupTracker>,
274 #[cfg(feature = "open-metrics")]
275 metrics_recorder: Option<NodeMetricsRecorder>,
276 reward_address: RewardsAddress,
277 evm_network: EvmNetwork,
278}
279
280impl Node {
281 pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
283 &self.inner.events_channel
284 }
285
286 pub(crate) fn network(&self) -> &Network {
288 &self.inner.network
289 }
290
291 fn close_group_tracker(&self) -> &Mutex<CloseGroupTracker> {
292 &self.inner.close_group_tracker
293 }
294
295 #[cfg(feature = "open-metrics")]
296 pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
299 self.inner.metrics_recorder.as_ref()
300 }
301
302 pub(crate) fn reward_address(&self) -> &RewardsAddress {
304 &self.inner.reward_address
305 }
306
307 pub(crate) fn evm_network(&self) -> &EvmNetwork {
308 &self.inner.evm_network
309 }
310
311 fn run(
314 self,
315 mut network_event_receiver: Receiver<NetworkEvent>,
316 mut shutdown_rx: watch::Receiver<bool>,
317 ) {
318 let mut rng = StdRng::from_entropy();
319
320 let peers_connected = Arc::new(AtomicUsize::new(0));
321
322 let _node_task = spawn(async move {
323 let replication_interval: u64 = rng.gen_range(
326 PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
327 );
328 let replication_interval_time = Duration::from_secs(replication_interval);
329 debug!("Replication interval set to {replication_interval_time:?}");
330
331 let mut replication_interval = tokio::time::interval(replication_interval_time);
332 let _ = replication_interval.tick().await; let mut uptime_metrics_update_interval =
335 tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
336 let _ = uptime_metrics_update_interval.tick().await; let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
341 UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
342 ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
343 );
344 let irrelevant_records_cleanup_interval_time =
345 Duration::from_secs(irrelevant_records_cleanup_interval);
346 let mut irrelevant_records_cleanup_interval =
347 tokio::time::interval(irrelevant_records_cleanup_interval_time);
348 let _ = irrelevant_records_cleanup_interval.tick().await; let storage_challenge_interval: u64 =
353 rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
354 let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
355 debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
356
357 let mut storage_challenge_interval =
358 tokio::time::interval(storage_challenge_interval_time);
359 let _ = storage_challenge_interval.tick().await; loop {
362 let peers_connected = &peers_connected;
363
364 tokio::select! {
365 result = shutdown_rx.changed() => {
367 if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
368 info!("Shutdown signal received or sender dropped. Exiting network events loop.");
369 break;
370 }
371 },
372 net_event = network_event_receiver.recv() => {
373 match net_event {
374 Some(event) => {
375 let start = Instant::now();
376 let event_string = format!("{event:?}");
377
378 self.handle_network_event(event, peers_connected).await;
379 trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
380
381 }
382 None => {
383 error!("The `NetworkEvent` channel is closed");
384 self.events_channel().broadcast(NodeEvent::ChannelClosed);
385 break;
386 }
387 }
388 }
389 _ = replication_interval.tick() => {
391 let now = Instant::now();
392
393 {
394 let mut tracker = self.close_group_tracker().lock().await;
395 if tracker.handle_timer_expiry(now) {
396 trace!("Replication timer expired for tracked peers; periodic run will cover it");
397 }
398 }
399
400 let start = now;
401 let network = self.network().clone();
402 self.record_metrics(Marker::IntervalReplicationTriggered);
403
404 let _handle = spawn(async move {
405 Self::try_interval_replication(network);
406 trace!("Periodic replication took {:?}", start.elapsed());
407 });
408 }
409 _ = uptime_metrics_update_interval.tick() => {
410 #[cfg(feature = "open-metrics")]
411 if let Some(metrics_recorder) = self.metrics_recorder() {
412 let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
413 }
414 }
415 _ = irrelevant_records_cleanup_interval.tick() => {
416 let network = self.network().clone();
417
418 let _handle = spawn(async move {
419 Self::trigger_irrelevant_record_cleanup(network);
420 });
421 }
422 _ = storage_challenge_interval.tick() => {
424 let start = Instant::now();
425 debug!("Periodic storage challenge triggered");
426 let network = self.network().clone();
427
428 let _handle = spawn(async move {
429 Self::storage_challenge(network).await;
430 trace!("Periodic storage challenge took {:?}", start.elapsed());
431 });
432 }
433 }
434 }
435 });
436 }
437
438 pub(crate) fn record_metrics(&self, marker: Marker) {
441 marker.log();
442 #[cfg(feature = "open-metrics")]
443 if let Some(metrics_recorder) = self.metrics_recorder() {
444 metrics_recorder.record(marker)
445 }
446 }
447
448 async fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
453 let start = Instant::now();
454 let event_string = format!("{event:?}");
455 let event_header;
456
457 if let NetworkEvent::QueryRequestReceived {
459 query: Query::GetVersion { .. },
460 ..
461 } = event
462 {
463 trace!("Handling NetworkEvent {event_string}");
464 } else {
465 debug!("Handling NetworkEvent {event_string}");
466 }
467
468 match event {
469 NetworkEvent::PeerAdded(peer_id, connected_peers) => {
470 event_header = "PeerAdded";
471 let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
473 if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
474 self.events_channel()
475 .broadcast(NodeEvent::ConnectedToNetwork);
476 }
477
478 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
479 self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
480
481 let network = self.network().clone();
483 let _handle = spawn(async move {
484 Self::try_query_peer_version(network, peer_id, Default::default()).await;
485 });
486
487 let replication_decision = {
488 let mut tracker = self.close_group_tracker().lock().await;
489 tracker.record_peer_added(peer_id)
490 };
491
492 if replication_decision.should_trigger() {
493 let network = self.network().clone();
494 self.record_metrics(Marker::IntervalReplicationTriggered);
495 let _handle = spawn(async move {
496 Self::try_interval_replication(network);
497 });
498 } else if matches!(replication_decision, ReplicationDirective::Skip) {
499 trace!(
500 "Replication skipped for {peer_id:?} addition due to restart behaviour tracking"
501 );
502 }
503 }
504 NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
505 event_header = "PeerRemoved";
506 self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
507 self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
508
509 let self_id = self.network().peer_id();
510 let distance =
511 NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
512 info!(
513 "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
514 distance.ilog2()
515 );
516
517 let replication_decision = {
518 let mut tracker = self.close_group_tracker().lock().await;
519 tracker.record_peer_removed(peer_id)
520 };
521
522 if replication_decision.should_trigger() {
523 let network = self.network().clone();
524 self.record_metrics(Marker::IntervalReplicationTriggered);
525 let _handle = spawn(async move {
526 Self::try_interval_replication(network);
527 });
528 } else if matches!(replication_decision, ReplicationDirective::Skip) {
529 trace!(
530 "Replication skipped for {peer_id:?} removal due to restart behaviour tracking"
531 );
532 }
533 }
534 NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
535 event_header = "PeerWithUnsupportedProtocol";
536 }
537 NetworkEvent::NewListenAddr(_) => {
538 event_header = "NewListenAddr";
539 }
540 NetworkEvent::ResponseReceived { res } => {
541 event_header = "ResponseReceived";
542 if let Err(err) = self.handle_response(res) {
543 error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
544 }
545 }
546 NetworkEvent::KeysToFetchForReplication(keys) => {
547 event_header = "KeysToFetchForReplication";
548 self.record_metrics(Marker::fetching_keys_for_replication(&keys));
549
550 if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
551 error!("Error while trying to fetch replicated data {err:?}");
552 }
553 }
554 NetworkEvent::QueryRequestReceived { query, channel } => {
555 event_header = "QueryRequestReceived";
556 let node = self.clone();
557 let payment_address = *self.reward_address();
558
559 let _handle = spawn(async move {
560 let network = node.network().clone();
561 let res = Self::handle_query(node, query, payment_address).await;
562
563 if let Response::Query(QueryResponse::GetVersion { .. }) = res {
565 trace!("Sending response {res:?}");
566 } else {
567 debug!("Sending response {res:?}");
568 }
569
570 network.send_response(res, channel);
571 });
572 }
573 NetworkEvent::UnverifiedRecord(record) => {
574 event_header = "UnverifiedRecord";
575 let self_clone = self.clone();
577 let _handle = spawn(async move {
578 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
579 match self_clone.validate_and_store_record(record).await {
580 Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
581 Err(err) => {
582 self_clone.record_metrics(Marker::RecordRejected(&key, &err));
583 }
584 }
585 });
586 }
587 NetworkEvent::TerminateNode { reason } => {
588 event_header = "TerminateNode";
589 error!("Received termination from swarm_driver due to {reason:?}");
590 self.events_channel()
591 .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
592 }
593 NetworkEvent::FailedToFetchHolders(bad_nodes) => {
594 event_header = "FailedToFetchHolders";
595 let network = self.network().clone();
596 let pretty_log: Vec<_> = bad_nodes
597 .iter()
598 .map(|(peer_id, record_key)| {
599 let pretty_key = PrettyPrintRecordKey::from(record_key);
600 (peer_id, pretty_key)
601 })
602 .collect();
603 debug!(
607 "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
608 );
609 let _handle = spawn(async move {
610 for (peer_id, record_key) in bad_nodes {
611 if let Ok(false) = network.is_record_key_present_locally(&record_key).await
614 {
615 error!(
616 "From peer {peer_id:?}, failed to fetch record {:?}",
617 PrettyPrintRecordKey::from(&record_key)
618 );
619 network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
620 }
621 }
622 });
623 }
624 NetworkEvent::QuoteVerification { quotes } => {
625 event_header = "QuoteVerification";
626 let network = self.network().clone();
627
628 let _handle = spawn(async move {
629 quotes_verification(&network, quotes).await;
630 });
631 }
632 NetworkEvent::FreshReplicateToFetch { holder, keys } => {
633 event_header = "FreshReplicateToFetch";
634 self.fresh_replicate_to_fetch(holder, keys);
635 }
636 NetworkEvent::PeersForVersionQuery(peers) => {
637 event_header = "PeersForVersionQuery";
638 let network = self.network().clone();
639 let _handle = spawn(async move {
640 Self::query_peers_version(network, peers).await;
641 });
642 }
643 NetworkEvent::NetworkWideReplication { keys } => {
644 event_header = "NetworkWideReplication";
645 self.perform_network_wide_replication(keys);
646 }
647 NetworkEvent::PeerVersionChecked {
648 peer_id,
649 peer_type,
650 result,
651 } => {
652 event_header = "PeerVersionChecked";
653 trace!(
656 target: "version_gate",
657 peer_id = %peer_id,
658 peer_type = %peer_type,
659 result = ?result,
660 "Peer version check completed"
661 );
662 }
663 NetworkEvent::PeerVersionRejected {
664 peer_id,
665 peer_type,
666 result,
667 min_version,
668 } => {
669 event_header = "PeerVersionRejected";
670 warn!(
672 target: "version_gate",
673 peer_id = %peer_id,
674 peer_type = %peer_type,
675 result = ?result,
676 min_version = %min_version,
677 "Peer rejected due to version requirements and disconnected"
678 );
679 }
680 }
681
682 trace!(
683 "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
684 start.elapsed()
685 );
686 }
687
688 fn handle_response(&self, response: Response) -> Result<()> {
690 match response {
691 Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
692 warn!("Mishandled replicate response, should be handled earlier");
694 }
695 Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
696 error!(
697 "Response to replication shall be handled by called not by common handler, {resp:?}"
698 );
699 }
700 Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
701 }
703 other => {
704 warn!("handle_response not implemented for {other:?}");
705 }
706 };
707
708 Ok(())
709 }
710
711 async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
712 let network = node.network();
713 let resp: QueryResponse = match query {
714 Query::GetStoreQuote {
715 key,
716 data_type,
717 data_size,
718 nonce,
719 difficulty,
720 } => {
721 let record_key = key.to_record_key();
722 let self_id = network.peer_id();
723
724 let maybe_quoting_metrics = network
725 .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
726 .await;
727
728 let storage_proofs = if let Some(nonce) = nonce {
729 Self::respond_x_closest_record_proof(
730 network,
731 key.clone(),
732 nonce,
733 difficulty,
734 false,
735 )
736 .await
737 } else {
738 vec![]
739 };
740
741 match maybe_quoting_metrics {
742 Ok((quoting_metrics, is_already_stored)) => {
743 if is_already_stored {
744 QueryResponse::GetStoreQuote {
745 quote: Err(ProtocolError::RecordExists(
746 PrettyPrintRecordKey::from(&record_key).into_owned(),
747 )),
748 peer_address: NetworkAddress::from(self_id),
749 storage_proofs,
750 }
751 } else {
752 QueryResponse::GetStoreQuote {
753 quote: Self::create_quote_for_storecost(
754 network,
755 &key,
756 "ing_metrics,
757 &payment_address,
758 ),
759 peer_address: NetworkAddress::from(self_id),
760 storage_proofs,
761 }
762 }
763 }
764 Err(err) => {
765 warn!("GetStoreQuote failed for {key:?}: {err}");
766 QueryResponse::GetStoreQuote {
767 quote: Err(ProtocolError::GetStoreQuoteFailed),
768 peer_address: NetworkAddress::from(self_id),
769 storage_proofs,
770 }
771 }
772 }
773 }
774 Query::GetReplicatedRecord { requester: _, key } => {
775 let our_address = NetworkAddress::from(network.peer_id());
776 let record_key = key.to_record_key();
777
778 let result = match network.get_local_record(&record_key).await {
779 Ok(Some(record)) => Ok((our_address, Bytes::from(record.value))),
780 Ok(None) => Err(ProtocolError::ReplicatedRecordNotFound {
781 holder: Box::new(our_address),
782 key: Box::new(key.clone()),
783 }),
784 Err(err) => Err(ProtocolError::PutRecordFailed(format!(
786 "Error to fetch local record for GetReplicatedRecord {err:?}"
787 ))),
788 };
789
790 QueryResponse::GetReplicatedRecord(result)
791 }
792 Query::GetChunkExistenceProof {
793 key,
794 nonce,
795 difficulty,
796 } => QueryResponse::GetChunkExistenceProof(
797 Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
798 ),
799 Query::CheckNodeInProblem(target_address) => {
800 debug!("Got CheckNodeInProblem for peer {target_address:?}");
801
802 let is_in_trouble =
803 if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
804 result
805 } else {
806 debug!("Could not get status of {target_address:?}.");
807 false
808 };
809
810 QueryResponse::CheckNodeInProblem {
811 reporter_address: NetworkAddress::from(network.peer_id()),
812 target_address,
813 is_in_trouble,
814 }
815 }
816 Query::GetClosestPeers {
817 key,
818 num_of_peers,
819 range,
820 sign_result,
821 } => {
822 debug!(
823 "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
824 );
825 Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
826 .await
827 }
828 Query::GetVersion(_) => QueryResponse::GetVersion {
829 peer: NetworkAddress::from(network.peer_id()),
830 version: ant_build_info::package_version(),
831 },
832 Query::PutRecord {
833 holder,
834 address,
835 serialized_record,
836 } => {
837 let record = Record {
838 key: address.to_record_key(),
839 value: serialized_record,
840 publisher: None,
841 expires: None,
842 };
843
844 let key = PrettyPrintRecordKey::from(&record.key).into_owned();
845 let result = match node.validate_and_store_record(record).await {
846 Ok(()) => Ok(()),
847 Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
848 node.record_metrics(Marker::RecordRejected(
849 &key,
850 &PutValidationError::OutdatedRecordCounter { counter, expected },
851 ));
852 Err(ProtocolError::OutdatedRecordCounter { counter, expected })
853 }
854 Err(PutValidationError::TopologyVerificationFailed {
855 target_address,
856 valid_count,
857 total_paid,
858 closest_count,
859 node_peers,
860 paid_peers,
861 }) => {
862 node.record_metrics(Marker::RecordRejected(
863 &key,
864 &PutValidationError::TopologyVerificationFailed {
865 target_address: target_address.clone(),
866 valid_count,
867 total_paid,
868 closest_count,
869 node_peers: node_peers.clone(),
870 paid_peers: paid_peers.clone(),
871 },
872 ));
873 Err(ProtocolError::TopologyVerificationFailed {
874 target_address: Box::new(target_address),
875 valid_count,
876 total_paid,
877 closest_count,
878 node_peers,
879 paid_peers,
880 })
881 }
882 Err(err) => {
883 node.record_metrics(Marker::RecordRejected(&key, &err));
884 Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
885 }
886 };
887 QueryResponse::PutRecord {
888 result,
889 peer_address: holder,
890 record_addr: address,
891 }
892 }
893 Query::GetMerkleCandidateQuote {
894 key,
895 data_type,
896 data_size,
897 merkle_payment_timestamp,
898 } => {
899 Self::respond_merkle_candidate_quote(
900 network,
901 key,
902 data_type,
903 data_size,
904 merkle_payment_timestamp,
905 payment_address,
906 )
907 .await
908 }
909 #[cfg(feature = "developer")]
910 Query::DevGetClosestPeersFromNetwork { key, num_of_peers } => {
911 debug!(
912 "Got DevGetClosestPeersFromNetwork targeting {key:?} with {num_of_peers:?} peers"
913 );
914 Self::respond_dev_get_closest_peers_from_network(network, key, num_of_peers).await
915 }
916 };
917 Response::Query(resp)
918 }
919
920 async fn respond_get_closest_peers(
921 network: &Network,
922 target: NetworkAddress,
923 num_of_peers: Option<usize>,
924 range: Option<[u8; 32]>,
925 sign_result: bool,
926 ) -> QueryResponse {
927 let local_peers = network.get_local_peers_with_multiaddr().await;
928 let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
929 Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
930 } else {
931 vec![]
932 };
933
934 let signature = if sign_result {
935 let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
936 bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
937 network.sign(&bytes).ok()
938 } else {
939 None
940 };
941
942 QueryResponse::GetClosestPeers {
943 target,
944 peers,
945 signature,
946 }
947 }
948
949 #[cfg(feature = "developer")]
954 async fn respond_dev_get_closest_peers_from_network(
955 network: &Network,
956 target: NetworkAddress,
957 num_of_peers: Option<usize>,
958 ) -> QueryResponse {
959 use ant_protocol::messages::QueryResponse;
960
961 let queried_node = NetworkAddress::from(network.peer_id());
962 debug!(
963 "DevGetClosestPeersFromNetwork: node {queried_node:?} querying network for closest peers to {target:?}"
964 );
965
966 let result = network.get_closest_peers(&target).await;
968
969 let peers = match result {
970 Ok(peers) => {
971 let mut converted: Vec<(NetworkAddress, Vec<Multiaddr>)> = peers
972 .into_iter()
973 .map(|(peer_id, addrs)| (NetworkAddress::from(peer_id), addrs.0))
974 .collect();
975
976 if let Some(n) = num_of_peers {
978 converted.sort_by_key(|(addr, _)| target.distance(addr));
979 converted.truncate(n);
980 }
981
982 debug!(
983 "DevGetClosestPeersFromNetwork: found {} peers closest to {target:?}",
984 converted.len()
985 );
986 converted
987 }
988 Err(err) => {
989 warn!(
990 "DevGetClosestPeersFromNetwork: failed to query network for {target:?}: {err:?}"
991 );
992 vec![]
993 }
994 };
995
996 QueryResponse::DevGetClosestPeersFromNetwork {
997 target,
998 queried_node,
999 peers,
1000 }
1001 }
1002
1003 fn calculate_get_closest_peers(
1004 peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
1005 target: NetworkAddress,
1006 num_of_peers: Option<usize>,
1007 range: Option<[u8; 32]>,
1008 ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
1009 match (num_of_peers, range) {
1010 (_, Some(value)) => {
1011 let distance = U256::from_big_endian(&value);
1012 peer_addrs
1013 .iter()
1014 .filter_map(|(peer_id, multi_addrs)| {
1015 let addr = NetworkAddress::from(*peer_id);
1016 if target.distance(&addr).0 <= distance {
1017 Some((addr, multi_addrs.clone()))
1018 } else {
1019 None
1020 }
1021 })
1022 .collect()
1023 }
1024 (Some(num_of_peers), _) => {
1025 let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
1026 .iter()
1027 .map(|(peer_id, multi_addrs)| {
1028 let addr = NetworkAddress::from(*peer_id);
1029 (addr, multi_addrs.clone())
1030 })
1031 .collect();
1032 result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
1033 result.into_iter().take(num_of_peers).collect()
1034 }
1035 (None, None) => vec![],
1036 }
1037 }
1038
1039 async fn respond_merkle_candidate_quote(
1043 network: &Network,
1044 key: NetworkAddress,
1045 data_type: u32,
1046 data_size: usize,
1047 merkle_payment_timestamp: u64,
1048 payment_address: RewardsAddress,
1049 ) -> QueryResponse {
1050 debug!(
1051 "merkle payment: GetMerkleCandidateQuote for target {key:?}, timestamp: {merkle_payment_timestamp}, data_type: {data_type}, data_size: {data_size}"
1052 );
1053
1054 const TIMESTAMP_TOLERANCE: u64 = 24 * 60 * 60; let now = std::time::SystemTime::now()
1064 .duration_since(std::time::UNIX_EPOCH)
1065 .unwrap_or_default()
1066 .as_secs();
1067
1068 let future_threshold = now + TIMESTAMP_TOLERANCE;
1070 if merkle_payment_timestamp > future_threshold {
1071 let error_msg = format!(
1072 "Rejected future timestamp {merkle_payment_timestamp} (current time: {now}, threshold: {future_threshold})"
1073 );
1074 warn!("{error_msg} for {key:?}");
1075 return QueryResponse::GetMerkleCandidateQuote(Err(
1076 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1077 ));
1078 }
1079
1080 let expiration_threshold = MERKLE_PAYMENT_EXPIRATION + TIMESTAMP_TOLERANCE;
1082 let age = now.saturating_sub(merkle_payment_timestamp);
1083 if age > expiration_threshold {
1084 let error_msg = format!(
1085 "Rejected expired timestamp {merkle_payment_timestamp} (age: {age}s, max: {expiration_threshold}s)",
1086 );
1087 warn!("{error_msg} for {key:?}");
1088 return QueryResponse::GetMerkleCandidateQuote(Err(
1089 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1090 ));
1091 }
1092
1093 let record_key = key.to_record_key();
1095 let (quoting_metrics, _is_already_stored) = match network
1096 .get_local_quoting_metrics(record_key, data_type, data_size)
1097 .await
1098 {
1099 Ok(metrics) => metrics,
1100 Err(err) => {
1101 let error_msg = format!("Failed to get quoting metrics for {key:?}: {err}");
1102 warn!("{error_msg}");
1103 return QueryResponse::GetMerkleCandidateQuote(Err(
1104 ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1105 ));
1106 }
1107 };
1108
1109 let pub_key = network.get_pub_key();
1111 let reward_address = payment_address;
1112 let bytes = ant_evm::merkle_payments::MerklePaymentCandidateNode::bytes_to_sign(
1113 "ing_metrics,
1114 &reward_address,
1115 merkle_payment_timestamp,
1116 );
1117 let signature = match network.sign(&bytes) {
1118 Ok(sig) => sig,
1119 Err(e) => {
1120 let error_msg = format!("Failed to sign candidate node for {key:?}: {e}");
1121 error!("{error_msg}");
1122 return QueryResponse::GetMerkleCandidateQuote(Err(
1123 ProtocolError::FailedToSignMerkleCandidate(error_msg),
1124 ));
1125 }
1126 };
1127
1128 let candidate = ant_evm::merkle_payments::MerklePaymentCandidateNode {
1129 quoting_metrics,
1130 reward_address,
1131 merkle_payment_timestamp,
1132 pub_key,
1133 signature,
1134 };
1135 QueryResponse::GetMerkleCandidateQuote(Ok(candidate))
1136 }
1137
1138 async fn respond_x_closest_record_proof(
1141 network: &Network,
1142 key: NetworkAddress,
1143 nonce: Nonce,
1144 difficulty: usize,
1145 chunk_only: bool,
1146 ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
1147 let start = Instant::now();
1148 let mut results = vec![];
1149 if difficulty == 1 {
1150 let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
1152 if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
1153 let proof = ChunkProof::new(&record.value, nonce);
1154 debug!("Chunk proof for {key:?} is {proof:?}");
1155 result = Ok(proof)
1156 } else {
1157 debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
1158 }
1159
1160 results.push((key.clone(), result));
1161 } else {
1162 let all_local_records = network.get_all_local_record_addresses().await;
1163
1164 if let Ok(all_local_records) = all_local_records {
1165 let mut all_chunk_addrs: Vec<_> = if chunk_only {
1166 all_local_records
1167 .iter()
1168 .filter_map(|(addr, record_type)| {
1169 if *record_type == ValidationType::Chunk {
1170 Some(addr.clone())
1171 } else {
1172 None
1173 }
1174 })
1175 .collect()
1176 } else {
1177 all_local_records.keys().cloned().collect()
1178 };
1179
1180 all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
1182
1183 let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
1185
1186 for addr in all_chunk_addrs.iter().take(workload_factor) {
1187 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
1188 {
1189 let proof = ChunkProof::new(&record.value, nonce);
1190 debug!("Chunk proof for {key:?} is {proof:?}");
1191 results.push((addr.clone(), Ok(proof)));
1192 }
1193 }
1194 }
1195
1196 info!(
1197 "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
1198 results.len(),
1199 start.elapsed()
1200 );
1201 }
1202
1203 results
1204 }
1205
1206 async fn storage_challenge(network: Network) {
1210 let start = Instant::now();
1211 let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
1212 network.get_k_closest_local_peers_to_the_target(None).await
1213 {
1214 closest_peers
1215 .into_iter()
1216 .take(CLOSE_GROUP_SIZE)
1217 .collect_vec()
1218 } else {
1219 error!("Cannot get local neighbours");
1220 return;
1221 };
1222 if closest_peers.len() < CLOSE_GROUP_SIZE {
1223 debug!(
1224 "Not enough neighbours ({}/{}) to carry out storage challenge.",
1225 closest_peers.len(),
1226 CLOSE_GROUP_SIZE
1227 );
1228 return;
1229 }
1230
1231 let mut verify_candidates: Vec<NetworkAddress> =
1232 if let Ok(all_keys) = network.get_all_local_record_addresses().await {
1233 all_keys
1234 .iter()
1235 .filter_map(|(addr, record_type)| {
1236 if ValidationType::Chunk == *record_type {
1237 Some(addr.clone())
1238 } else {
1239 None
1240 }
1241 })
1242 .collect()
1243 } else {
1244 error!("Failed to get local record addresses.");
1245 return;
1246 };
1247 let num_of_targets = verify_candidates.len();
1248 if num_of_targets < 50 {
1249 debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
1250 return;
1251 }
1252
1253 let self_addr = NetworkAddress::from(network.peer_id());
1256 verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
1257 let index: usize = OsRng.gen_range(0..num_of_targets / 2);
1258 let target = verify_candidates[index].clone();
1259 let difficulty = CLOSE_GROUP_SIZE;
1261 verify_candidates.sort_by_key(|addr| target.distance(addr));
1262 let expected_targets = verify_candidates.into_iter().take(difficulty);
1263 let nonce: Nonce = thread_rng().r#gen::<u64>();
1264 let mut expected_proofs = HashMap::new();
1265 for addr in expected_targets {
1266 if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
1267 let expected_proof = ChunkProof::new(&record.value, nonce);
1268 let _ = expected_proofs.insert(addr, expected_proof);
1269 } else {
1270 error!("Local record {addr:?} cann't be loaded from disk.");
1271 }
1272 }
1273 let request = Request::Query(Query::GetChunkExistenceProof {
1274 key: target.clone(),
1275 nonce,
1276 difficulty,
1277 });
1278
1279 let mut tasks = JoinSet::new();
1280 for (peer_id, addresses) in closest_peers {
1281 if peer_id == network.peer_id() {
1282 continue;
1283 }
1284 let network_clone = network.clone();
1285 let request_clone = request.clone();
1286 let expected_proofs_clone = expected_proofs.clone();
1287 let _ = tasks.spawn(async move {
1288 let res = scoring_peer(
1289 network_clone,
1290 (peer_id, addresses),
1291 request_clone,
1292 expected_proofs_clone,
1293 )
1294 .await;
1295 (peer_id, res)
1296 });
1297 }
1298
1299 let mut peer_scores = vec![];
1300 while let Some(res) = tasks.join_next().await {
1301 match res {
1302 Ok((peer_id, score)) => {
1303 let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
1304 if !is_healthy {
1305 info!(
1306 "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
1307 );
1308 network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
1310 }
1311 peer_scores.push((peer_id, is_healthy));
1312 }
1313 Err(e) => {
1314 info!("StorageChallenge task completed with error {e:?}");
1315 }
1316 }
1317 }
1318 if !peer_scores.is_empty() {
1319 network.notify_peer_scores(peer_scores);
1320 }
1321
1322 Self::verify_local_replication_health(network.clone()).await;
1323
1324 info!(
1325 "Completed node StorageChallenge against neighbours in {:?}!",
1326 start.elapsed()
1327 );
1328 }
1329
1330 async fn verify_local_replication_health(network: Network) {
1332 let closest_peers = match network.get_k_closest_local_peers_to_the_target(None).await {
1333 Ok(peers) => peers,
1334 Err(err) => {
1335 warn!("Cannot fetch closest peers for replica verification: {err:?}");
1336 return;
1337 }
1338 };
1339
1340 if closest_peers.len() <= CLOSE_NEIGHBOUR_DISTANCE_INDEX {
1341 debug!(
1342 "Skipping replica verification as we only know {} neighbours (need > {}).",
1343 closest_peers.len(),
1344 CLOSE_NEIGHBOUR_DISTANCE_INDEX
1345 );
1346 return;
1347 }
1348
1349 let self_address = NetworkAddress::from(network.peer_id());
1350 let Some((threshold_peer, _)) = closest_peers.get(CLOSE_NEIGHBOUR_DISTANCE_INDEX) else {
1351 debug!("Unable to determine distance threshold for replica verification.");
1352 return;
1353 };
1354
1355 let threshold_distance = self_address.distance(&NetworkAddress::from(*threshold_peer));
1356 let Some(threshold_ilog2) = threshold_distance.ilog2() else {
1357 debug!("Threshold distance lacks ilog2; cannot proceed with replica verification.");
1358 return;
1359 };
1360
1361 let local_records = match network.get_all_local_record_addresses().await {
1362 Ok(records) => records,
1363 Err(err) => {
1364 warn!("Failed to list local records for replica verification: {err:?}");
1365 return;
1366 }
1367 };
1368
1369 let mut nearby_records: Vec<NetworkAddress> = local_records
1370 .into_iter()
1371 .filter_map(|(address, record_type)| {
1372 if record_type != ValidationType::Chunk {
1373 return None;
1374 }
1375 let distance = self_address.distance(&address);
1376 distance.ilog2().and_then(|record_ilog2| {
1377 if record_ilog2 <= threshold_ilog2 {
1378 Some(address)
1379 } else {
1380 None
1381 }
1382 })
1383 })
1384 .collect();
1385
1386 nearby_records.shuffle(&mut thread_rng());
1387 let target_record = if let Some(entry) = nearby_records.first().cloned() {
1388 entry
1389 } else {
1390 debug!("No nearby chunk records available for replica verification.");
1391 return;
1392 };
1393
1394 let pretty_key = PrettyPrintRecordKey::from(&target_record.to_record_key()).into_owned();
1395
1396 let candidate_peers = match network
1397 .get_k_closest_local_peers_to_the_target(Some(target_record.clone()))
1398 .await
1399 {
1400 Ok(peers) => peers
1401 .into_iter()
1402 .filter(|(peer_id, _)| peer_id != &network.peer_id())
1403 .take(REPLICA_FETCH_PEER_COUNT)
1404 .collect::<Vec<_>>(),
1405 Err(err) => {
1406 warn!(
1407 "Cannot fetch record-specific closest peers for replica verification: {err:?}"
1408 );
1409 return;
1410 }
1411 };
1412
1413 if candidate_peers.len() < REPLICA_FETCH_PEER_COUNT {
1414 debug!(
1415 "Only {} peers available for replica verification (need at least {}).",
1416 candidate_peers.len(),
1417 REPLICA_FETCH_PEER_COUNT
1418 );
1419 return;
1420 }
1421
1422 debug!(
1423 "Verifying replicated record {pretty_key:?} against {} closest peers.",
1424 candidate_peers.len()
1425 );
1426
1427 let (successful_peers, failed_peers) = Self::fetch_record_from_peers_with_addresses(
1428 network.clone(),
1429 target_record.clone(),
1430 candidate_peers,
1431 )
1432 .await;
1433
1434 if failed_peers.is_empty() {
1435 debug!("All peers returned record {pretty_key:?} during replica verification.");
1436 return;
1437 }
1438
1439 if successful_peers.len() < MIN_HEALTHY_REPLICA_COUNT {
1440 warn!(
1441 "Replica verification fetched only {} copies of {pretty_key:?}. Record is unhealthy; skipping peer classification.",
1442 successful_peers.len()
1443 );
1444 return;
1445 }
1446
1447 if !failed_peers.is_empty() {
1448 info!(
1449 "Scheduling replica verification retry for {} peers on record {pretty_key:?}.",
1450 failed_peers.len()
1451 );
1452 Self::schedule_record_fetch_retry(network.clone(), target_record.clone(), failed_peers)
1453 .await;
1454 }
1455 }
1456
1457 async fn fetch_record_from_peers_with_addresses(
1459 network: Network,
1460 record_address: NetworkAddress,
1461 peers: Vec<(PeerId, Addresses)>,
1462 ) -> (Vec<PeerId>, Vec<PeerId>) {
1463 let request = Request::Query(Query::GetReplicatedRecord {
1464 requester: NetworkAddress::from(network.peer_id()),
1465 key: record_address.clone(),
1466 });
1467 let expected_key = record_address.to_record_key();
1468 let pretty_key = PrettyPrintRecordKey::from(&expected_key).into_owned();
1469
1470 let mut successes = Vec::new();
1471 let mut failures = Vec::new();
1472 let concurrency = peers.len();
1473 let results = stream::iter(peers.into_iter().map(|(peer_id, addrs)| {
1474 let network_clone = network.clone();
1475 let request_clone = request.clone();
1476 async move {
1477 let result = network_clone
1478 .send_request(request_clone, peer_id, addrs.clone())
1479 .await;
1480 (peer_id, addrs, result)
1481 }
1482 }))
1483 .buffer_unordered(concurrency)
1484 .collect::<Vec<_>>()
1485 .await;
1486
1487 for res in results {
1488 match res {
1489 (
1490 peer_id,
1491 _addrs,
1492 Ok((Response::Query(QueryResponse::GetReplicatedRecord(result)), _)),
1493 ) => {
1494 match result {
1495 Ok((_holder, value)) => {
1496 let record =
1498 Record::new(record_address.to_record_key(), value.to_vec());
1499 if let Ok(chunk) = try_deserialize_record::<Chunk>(&record)
1500 && chunk.network_address().to_record_key() == expected_key
1501 {
1502 successes.push(peer_id);
1503 } else {
1504 warn!(
1505 "Peer {peer_id:?} responded with an incorrect chunk copy of {pretty_key:?}."
1506 );
1507 failures.push(peer_id);
1508 }
1509 }
1510 Err(err) => {
1511 info!(
1512 "Peer {peer_id:?} responded with error {err:?} for replicated record {pretty_key:?}."
1513 );
1514 failures.push(peer_id);
1515 }
1516 }
1517 }
1518 (peer_id, _addrs, Ok((other_response, _))) => {
1519 warn!(
1520 "Peer {peer_id:?} responded with unexpected message {other_response:?} for {pretty_key:?}."
1521 );
1522 failures.push(peer_id);
1523 }
1524 (peer_id, _addrs, Err(err)) => {
1525 info!(
1526 "Failed to reach peer {peer_id:?} for replicated record {pretty_key:?}: {err:?}"
1527 );
1528 failures.push(peer_id);
1529 }
1530 }
1531 }
1532
1533 (successes, failures)
1534 }
1535
1536 async fn schedule_record_fetch_retry(
1538 network: Network,
1539 record_address: NetworkAddress,
1540 failed_peers: Vec<PeerId>,
1541 ) {
1542 let retry_peers: HashSet<_> = failed_peers.into_iter().collect();
1543 if retry_peers.is_empty() {
1544 return;
1545 }
1546
1547 let record_clone = record_address.clone();
1548 let network_clone = network.clone();
1549
1550 tokio::time::sleep(REPLICA_FETCH_RETRY_DELAY).await;
1551 let pretty_key = PrettyPrintRecordKey::from(&record_clone.to_record_key()).into_owned();
1552
1553 let refreshed_candidates =
1554 Self::refresh_retry_candidate_addresses(&network_clone, &record_clone, &retry_peers)
1555 .await;
1556
1557 if refreshed_candidates.is_empty() {
1558 info!(
1559 "Skipping replica retry for {pretty_key:?}; no tracked peers remain close to the record."
1560 );
1561 return;
1562 }
1563
1564 let (_, still_failed) = Self::fetch_record_from_peers_with_addresses(
1565 network_clone.clone(),
1566 record_clone.clone(),
1567 refreshed_candidates,
1568 )
1569 .await;
1570
1571 if still_failed.is_empty() {
1572 info!("All peers successfully returned {pretty_key:?} during replica retry.");
1573 return;
1574 }
1575
1576 warn!(
1577 "{} peers still failed to provide {pretty_key:?}; evicting and blacklisting.",
1578 still_failed.len()
1579 );
1580 for peer_id in still_failed {
1581 Self::evict_and_blacklist_peer(network_clone.clone(), peer_id);
1582 }
1583 }
1584
1585 async fn refresh_retry_candidate_addresses(
1587 network: &Network,
1588 record_address: &NetworkAddress,
1589 retry_peers: &HashSet<PeerId>,
1590 ) -> Vec<(PeerId, Addresses)> {
1591 let pretty_key = PrettyPrintRecordKey::from(&record_address.to_record_key()).into_owned();
1592 let Ok(closest_peers) = network.get_closest_peers(record_address).await else {
1593 warn!(
1594 "Failed to refresh peer addresses for {pretty_key:?}; unable to retry replica fetch."
1595 );
1596 return Vec::new();
1597 };
1598
1599 let closest_map: HashMap<PeerId, Addresses> = closest_peers.into_iter().collect();
1600
1601 retry_peers
1602 .iter()
1603 .filter_map(|peer_id| {
1604 closest_map
1605 .get(peer_id)
1606 .cloned()
1607 .map(|addrs| (*peer_id, addrs))
1608 })
1609 .collect()
1610 }
1611
1612 fn evict_and_blacklist_peer(network: Network, peer_id: PeerId) {
1614 network.remove_peer(peer_id);
1615 network.add_peer_to_blocklist(peer_id);
1616 }
1617
1618 async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1620 for (peer_id, addrs) in peers {
1622 Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1623 }
1624 }
1625
1626 async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1628 let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1629 let version = match network.send_request(request, peer, addrs).await {
1631 Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1632 trace!("Fetched peer version {peer:?} as {version:?}");
1633 version
1634 }
1635 Ok(other) => {
1636 info!("Not a fetched peer version {peer:?}, {other:?}");
1637 "none".to_string()
1638 }
1639 Err(err) => {
1640 info!("Failed to fetch peer version {peer:?} with error {err:?}");
1641 network.remove_peer(peer);
1645 return;
1646 }
1647 };
1648 network.notify_node_version(peer, version);
1649 }
1650}
1651
1652#[derive(Debug)]
1653struct CloseGroupTracker {
1654 self_address: NetworkAddress,
1655 close_up_peers: BTreeSet<(Distance, PeerId)>,
1659 tracked_entries: HashMap<PeerId, BehaviourEntry>,
1660}
1661
1662impl CloseGroupTracker {
1663 fn new(self_peer_id: PeerId) -> Self {
1664 Self {
1665 self_address: NetworkAddress::from(self_peer_id),
1666 close_up_peers: BTreeSet::new(),
1667 tracked_entries: HashMap::new(),
1668 }
1669 }
1670
1671 fn record_peer_added(&mut self, peer_id: PeerId) -> ReplicationDirective {
1672 let distance = self.distance_to_peer(peer_id);
1673
1674 let was_present = self.close_up_peers.contains(&(distance, peer_id));
1676
1677 let is_close_enough = self.is_distance_within_close_range(distance);
1679
1680 if is_close_enough {
1681 let _ = self.close_up_peers.insert((distance, peer_id));
1683
1684 if self.close_up_peers.len() > CLOSE_GROUP_TRACKING_LIMIT
1686 && let Some(&(farthest_distance, farthest_peer)) =
1687 self.close_up_peers.iter().next_back()
1688 {
1689 let _ = self
1690 .close_up_peers
1691 .remove(&(farthest_distance, farthest_peer));
1692 }
1693 } else {
1694 let _ = self.tracked_entries.remove(&peer_id);
1696 return ReplicationDirective::Ignore;
1697 }
1698
1699 use std::collections::hash_map::Entry;
1700 match self.tracked_entries.entry(peer_id) {
1701 Entry::Vacant(vacant_entry) => {
1702 let _ = vacant_entry.insert(BehaviourEntry::default());
1703 if !was_present {
1704 ReplicationDirective::Trigger
1705 } else {
1706 ReplicationDirective::Ignore
1707 }
1708 }
1709 Entry::Occupied(mut occupied_entry) => {
1710 let entry = occupied_entry.get_mut();
1711 if entry.awaiting_rejoin || entry.timer_deadline.is_some() {
1712 entry.restart_detected = true;
1713 entry.awaiting_rejoin = false;
1714 entry.timer_deadline = None;
1715 ReplicationDirective::Skip
1716 } else if !was_present {
1717 entry.restart_detected = false;
1718 ReplicationDirective::Trigger
1719 } else {
1720 ReplicationDirective::Ignore
1721 }
1722 }
1723 }
1724 }
1725
1726 fn record_peer_removed(&mut self, peer_id: PeerId) -> ReplicationDirective {
1727 let distance = self.distance_to_peer(peer_id);
1728
1729 let was_tracked = self.close_up_peers.contains(&(distance, peer_id));
1731
1732 if !was_tracked {
1733 return ReplicationDirective::Ignore;
1735 }
1736
1737 let _ = self.close_up_peers.remove(&(distance, peer_id));
1739
1740 let entry = self.tracked_entries.entry(peer_id).or_default();
1741
1742 if entry.restart_detected {
1743 entry.timer_deadline = Some(Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION);
1744 entry.awaiting_rejoin = false;
1745 ReplicationDirective::Skip
1746 } else {
1747 entry.awaiting_rejoin = true;
1748 entry.timer_deadline = None;
1749 ReplicationDirective::Trigger
1750 }
1751 }
1752
1753 fn handle_timer_expiry(&mut self, now: Instant) -> bool {
1754 let mut expired = false;
1755 for entry in self.tracked_entries.values_mut() {
1756 if let Some(deadline) = entry.timer_deadline
1757 && now >= deadline
1758 {
1759 entry.timer_deadline = None;
1760 entry.restart_detected = false;
1761 entry.awaiting_rejoin = false;
1762 expired = true;
1763 }
1764 }
1765
1766 self.tracked_entries.retain(|peer_id, entry| {
1768 let is_currently_present = self
1769 .close_up_peers
1770 .iter()
1771 .any(|(_, present_peer)| present_peer == peer_id);
1772
1773 entry.awaiting_rejoin
1774 || entry.restart_detected
1775 || entry.timer_deadline.is_some()
1776 || is_currently_present
1777 });
1778
1779 expired
1780 }
1781
1782 fn is_distance_within_close_range(&self, distance: Distance) -> bool {
1783 if self.close_up_peers.len() < CLOSE_GROUP_TRACKING_LIMIT {
1785 return true;
1786 }
1787
1788 if let Some(&(farthest_distance, _)) = self.close_up_peers.iter().next_back() {
1790 distance < farthest_distance
1791 } else {
1792 true
1793 }
1794 }
1795
1796 fn distance_to_peer(&self, peer_id: PeerId) -> Distance {
1797 self.self_address.distance(&NetworkAddress::from(peer_id))
1798 }
1799}
1800
1801#[derive(Debug, Default)]
1802struct BehaviourEntry {
1803 awaiting_rejoin: bool,
1804 restart_detected: bool,
1805 timer_deadline: Option<Instant>,
1806}
1807
1808#[derive(Debug, PartialEq, Eq)]
1809enum ReplicationDirective {
1810 Trigger,
1811 Skip,
1812 Ignore,
1813}
1814
1815impl ReplicationDirective {
1816 fn should_trigger(&self) -> bool {
1817 matches!(self, Self::Trigger)
1818 }
1819}
1820
1821#[cfg(test)]
1822mod close_group_tracker_tests {
1823 use super::*;
1824 use std::time::Duration;
1825
1826 fn random_peer() -> PeerId {
1827 let keypair = libp2p::identity::Keypair::generate_ed25519();
1828 PeerId::from(keypair.public())
1829 }
1830
1831 #[test]
1832 fn new_peer_triggers_replication() {
1833 let mut tracker = CloseGroupTracker::new(random_peer());
1834 let peer = random_peer();
1835
1836 assert_eq!(
1837 tracker.record_peer_added(peer),
1838 ReplicationDirective::Trigger
1839 );
1840
1841 assert_eq!(
1842 tracker.record_peer_removed(peer),
1843 ReplicationDirective::Trigger
1844 );
1845 }
1846
1847 #[test]
1848 fn restart_detection_skips_replication() {
1849 let mut tracker = CloseGroupTracker::new(random_peer());
1850 let peer = random_peer();
1851
1852 assert_eq!(
1853 tracker.record_peer_added(peer),
1854 ReplicationDirective::Trigger
1855 );
1856
1857 assert_eq!(
1858 tracker.record_peer_removed(peer),
1859 ReplicationDirective::Trigger
1860 );
1861
1862 assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1863
1864 assert_eq!(
1865 tracker.record_peer_removed(peer),
1866 ReplicationDirective::Skip
1867 );
1868
1869 assert!(tracker.handle_timer_expiry(
1870 Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1)
1871 ));
1872
1873 assert_eq!(
1874 tracker.record_peer_added(peer),
1875 ReplicationDirective::Trigger
1876 );
1877 }
1878
1879 #[test]
1880 fn peer_outside_close_group_is_ignored() {
1881 let mut tracker = CloseGroupTracker::new(random_peer());
1882
1883 let mut added_peers = Vec::new();
1885 for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1886 let peer = random_peer();
1887 let result = tracker.record_peer_added(peer);
1888 assert_eq!(result, ReplicationDirective::Trigger);
1890 added_peers.push(peer);
1891 }
1892
1893 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1894
1895 let mut ignored_count = 0;
1897 let mut trigger_count = 0;
1898 for _ in 0..50 {
1899 let peer = random_peer();
1900 match tracker.record_peer_added(peer) {
1901 ReplicationDirective::Ignore => ignored_count += 1,
1902 ReplicationDirective::Trigger => trigger_count += 1,
1903 _ => {}
1904 }
1905 }
1906
1907 assert!(
1910 ignored_count > 0 || trigger_count > 0,
1911 "Expected some peers to be processed"
1912 );
1913
1914 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1916 }
1917
1918 #[test]
1919 fn removal_of_untracked_peer_is_ignored() {
1920 let mut tracker = CloseGroupTracker::new(random_peer());
1921
1922 let unknown_peer = random_peer();
1924 assert_eq!(
1925 tracker.record_peer_removed(unknown_peer),
1926 ReplicationDirective::Ignore
1927 );
1928 }
1929
1930 #[test]
1931 fn timer_expiry_resets_restart_state() {
1932 let mut tracker = CloseGroupTracker::new(random_peer());
1933 let peer = random_peer();
1934
1935 assert_eq!(
1937 tracker.record_peer_added(peer),
1938 ReplicationDirective::Trigger
1939 );
1940 assert_eq!(
1941 tracker.record_peer_removed(peer),
1942 ReplicationDirective::Trigger
1943 );
1944
1945 assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1947
1948 assert_eq!(
1950 tracker.record_peer_removed(peer),
1951 ReplicationDirective::Skip
1952 );
1953
1954 assert!(!tracker.handle_timer_expiry(Instant::now()));
1956
1957 let after_expiry =
1959 Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1960 assert!(tracker.handle_timer_expiry(after_expiry));
1961
1962 assert_eq!(
1964 tracker.record_peer_added(peer),
1965 ReplicationDirective::Trigger
1966 );
1967 }
1968
1969 #[test]
1970 fn eviction_maintains_closest_peers() {
1971 let self_peer = random_peer();
1972 let mut tracker = CloseGroupTracker::new(self_peer);
1973
1974 let mut peers_with_distances: Vec<(PeerId, Distance)> = Vec::new();
1976 for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1977 let peer = random_peer();
1978 let distance = tracker.distance_to_peer(peer);
1979 let _ = tracker.record_peer_added(peer);
1980 peers_with_distances.push((peer, distance));
1981 }
1982
1983 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1984
1985 peers_with_distances.sort_by_key(|(_, d)| *d);
1987 let farthest_distance = peers_with_distances.last().map(|(_, d)| *d).unwrap();
1988
1989 let mut found_closer = false;
1992 for _ in 0..100 {
1993 let new_peer = random_peer();
1994 let new_distance = tracker.distance_to_peer(new_peer);
1995 if new_distance < farthest_distance {
1996 let result = tracker.record_peer_added(new_peer);
1997 assert_eq!(result, ReplicationDirective::Trigger);
1998 assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
2000 found_closer = true;
2001 break;
2002 }
2003 }
2004
2005 assert!(
2008 found_closer,
2009 "Could not find a peer closer than the farthest in 100 attempts"
2010 );
2011 }
2012
2013 #[test]
2014 fn tracked_entries_cleaned_up_on_timer_expiry() {
2015 let mut tracker = CloseGroupTracker::new(random_peer());
2016 let peer = random_peer();
2017
2018 let _ = tracker.record_peer_added(peer);
2020 let _ = tracker.record_peer_removed(peer);
2021 let _ = tracker.record_peer_added(peer);
2022 let _ = tracker.record_peer_removed(peer);
2023
2024 assert!(tracker.tracked_entries.contains_key(&peer));
2026
2027 let after_expiry =
2029 Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
2030 let _ = tracker.handle_timer_expiry(after_expiry);
2031
2032 assert!(!tracker.tracked_entries.contains_key(&peer));
2035 }
2036}
2037
2038async fn scoring_peer(
2039 network: Network,
2040 peer: (PeerId, Addresses),
2041 request: Request,
2042 expected_proofs: HashMap<NetworkAddress, ChunkProof>,
2043) -> usize {
2044 let peer_id = peer.0;
2045 let start = Instant::now();
2046 let responses = network
2047 .send_and_get_responses(&[peer], &request, true)
2048 .await;
2049
2050 if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
2051 responses.get(&peer_id)
2052 {
2053 if answers.is_empty() {
2054 info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
2055 return 0;
2056 }
2057 let elapsed = start.elapsed();
2058
2059 let mut received_proofs = vec![];
2060 for (addr, proof) in answers {
2061 if let Ok(proof) = proof {
2062 received_proofs.push((addr.clone(), proof.clone()));
2063 }
2064 }
2065
2066 let score = mark_peer(elapsed, received_proofs, &expected_proofs);
2067 info!(
2068 "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
2069 answers.len()
2070 );
2071 score
2072 } else {
2073 info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
2074 0
2075 }
2076}
2077
2078fn mark_peer(
2167 duration: Duration,
2168 answers: Vec<(NetworkAddress, ChunkProof)>,
2169 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
2170) -> usize {
2171 let duration_score = duration_score_scheme(duration);
2172 let challenge_score = challenge_score_scheme(answers, expected_proofs);
2173
2174 duration_score * challenge_score
2175}
2176
2177fn duration_score_scheme(duration: Duration) -> usize {
2194 let in_ms = if let Some(value) = duration.as_millis().to_usize() {
2195 value
2196 } else {
2197 info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
2198 HIGHEST_SCORE * TIME_STEP
2199 };
2200
2201 let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
2202 HIGHEST_SCORE - step
2203}
2204
2205fn challenge_score_scheme(
2217 answers: Vec<(NetworkAddress, ChunkProof)>,
2218 expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
2219) -> usize {
2220 let mut correct_answers = 0;
2221 for (addr, chunk_proof) in answers {
2222 if let Some(expected_proof) = expected_proofs.get(&addr) {
2223 if expected_proof.verify(&chunk_proof) {
2224 correct_answers += 1;
2225 } else {
2226 info!("Spot a false answer to the challenge regarding {addr:?}");
2227 return 0;
2230 }
2231 }
2232 }
2233 std::cmp::min(
2236 HIGHEST_SCORE,
2237 HIGHEST_SCORE * correct_answers / expected_proofs.len(),
2238 )
2239}
2240
2241#[cfg(test)]
2242mod tests {
2243 use super::*;
2244 use std::str::FromStr;
2245
2246 #[test]
2247 fn test_no_local_peers() {
2248 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
2249 let target = NetworkAddress::from(PeerId::random());
2250 let num_of_peers = Some(5);
2251 let range = None;
2252 let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
2253
2254 assert_eq!(result, vec![]);
2255 }
2256
2257 #[test]
2258 fn test_fewer_local_peers_than_num_of_peers() {
2259 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2260 (
2261 PeerId::random(),
2262 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2263 ),
2264 (
2265 PeerId::random(),
2266 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2267 ),
2268 (
2269 PeerId::random(),
2270 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2271 ),
2272 ];
2273 let target = NetworkAddress::from(PeerId::random());
2274 let num_of_peers = Some(2);
2275 let range = None;
2276 let result = Node::calculate_get_closest_peers(
2277 local_peers.clone(),
2278 target.clone(),
2279 num_of_peers,
2280 range,
2281 );
2282
2283 let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2285 .iter()
2286 .map(|(peer_id, multi_addrs)| {
2287 let addr = NetworkAddress::from(*peer_id);
2288 (addr, multi_addrs.clone())
2289 })
2290 .collect();
2291 expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
2292 let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
2293
2294 assert_eq!(expected_result, result);
2295 }
2296
2297 #[test]
2298 fn test_with_range_and_num_of_peers() {
2299 let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2300 (
2301 PeerId::random(),
2302 vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2303 ),
2304 (
2305 PeerId::random(),
2306 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2307 ),
2308 (
2309 PeerId::random(),
2310 vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2311 ),
2312 ];
2313 let target = NetworkAddress::from(PeerId::random());
2314 let num_of_peers = Some(0);
2315 let range_value = [128; 32];
2316 let range = Some(range_value);
2317 let result = Node::calculate_get_closest_peers(
2318 local_peers.clone(),
2319 target.clone(),
2320 num_of_peers,
2321 range,
2322 );
2323
2324 let distance = U256::from_big_endian(&range_value);
2326 let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2327 .into_iter()
2328 .filter_map(|(peer_id, multi_addrs)| {
2329 let addr = NetworkAddress::from(peer_id);
2330 if target.distance(&addr).0 <= distance {
2331 Some((addr, multi_addrs.clone()))
2332 } else {
2333 None
2334 }
2335 })
2336 .collect();
2337
2338 assert_eq!(expected_result, result);
2339 }
2340
2341 mod merkle_payment_tests {
2342 use super::*;
2343
2344 #[test]
2346 fn test_timestamp_validation_accepts_valid_timestamp() {
2347 let now = std::time::SystemTime::now()
2348 .duration_since(std::time::UNIX_EPOCH)
2349 .unwrap()
2350 .as_secs();
2351
2352 let valid_timestamp = now - 3600;
2354
2355 let age = now.saturating_sub(valid_timestamp);
2357
2358 assert!(
2359 valid_timestamp <= now,
2360 "Valid timestamp should not be in the future"
2361 );
2362 assert!(
2363 age <= MERKLE_PAYMENT_EXPIRATION,
2364 "Valid timestamp should not be expired"
2365 );
2366 }
2367
2368 #[test]
2370 fn test_timestamp_validation_rejects_future_timestamp() {
2371 let now = std::time::SystemTime::now()
2372 .duration_since(std::time::UNIX_EPOCH)
2373 .unwrap()
2374 .as_secs();
2375
2376 let future_timestamp = now + 3600;
2378
2379 assert!(
2381 future_timestamp > now,
2382 "Future timestamp should be rejected"
2383 );
2384 }
2385
2386 #[test]
2388 fn test_timestamp_validation_rejects_expired_timestamp() {
2389 let now = std::time::SystemTime::now()
2390 .duration_since(std::time::UNIX_EPOCH)
2391 .unwrap()
2392 .as_secs();
2393
2394 let expired_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 86400);
2396
2397 let age = now.saturating_sub(expired_timestamp);
2399
2400 assert!(
2402 age > MERKLE_PAYMENT_EXPIRATION,
2403 "Expired timestamp should be rejected"
2404 );
2405 }
2406
2407 #[test]
2409 fn test_timestamp_validation_at_expiration_boundary() {
2410 let now = std::time::SystemTime::now()
2411 .duration_since(std::time::UNIX_EPOCH)
2412 .unwrap()
2413 .as_secs();
2414
2415 let boundary_timestamp = now - MERKLE_PAYMENT_EXPIRATION;
2417
2418 let age = now.saturating_sub(boundary_timestamp);
2419
2420 assert_eq!(age, MERKLE_PAYMENT_EXPIRATION);
2422 assert!(
2424 age <= MERKLE_PAYMENT_EXPIRATION,
2425 "Timestamp exactly at boundary should not be rejected"
2426 );
2427 }
2428
2429 #[test]
2431 fn test_timestamp_validation_beyond_expiration_boundary() {
2432 let now = std::time::SystemTime::now()
2433 .duration_since(std::time::UNIX_EPOCH)
2434 .unwrap()
2435 .as_secs();
2436
2437 let beyond_boundary_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 1);
2439
2440 let age = now.saturating_sub(beyond_boundary_timestamp);
2441
2442 assert!(
2443 age > MERKLE_PAYMENT_EXPIRATION,
2444 "Timestamp beyond boundary should be rejected"
2445 );
2446 }
2447
2448 #[test]
2450 fn test_timestamp_validation_at_current_time() {
2451 let now = std::time::SystemTime::now()
2452 .duration_since(std::time::UNIX_EPOCH)
2453 .unwrap()
2454 .as_secs();
2455
2456 let current_timestamp = now;
2458
2459 let age = now.saturating_sub(current_timestamp);
2460
2461 assert!(
2462 current_timestamp <= now,
2463 "Current timestamp should not be in future"
2464 );
2465 assert!(
2466 age <= MERKLE_PAYMENT_EXPIRATION,
2467 "Current timestamp should not be expired"
2468 );
2469 assert_eq!(age, 0, "Age should be 0 for current timestamp");
2470 }
2471
2472 #[test]
2474 fn test_timestamp_validation_near_future_boundary() {
2475 let now = std::time::SystemTime::now()
2476 .duration_since(std::time::UNIX_EPOCH)
2477 .unwrap()
2478 .as_secs();
2479
2480 let near_future_timestamp = now + 1;
2482
2483 assert!(
2484 near_future_timestamp > now,
2485 "Near-future timestamp should be rejected"
2486 );
2487 }
2488
2489 #[test]
2491 fn test_merkle_payment_expiration_constant() {
2492 const SEVEN_DAYS_IN_SECONDS: u64 = 7 * 24 * 60 * 60;
2493 assert_eq!(
2494 MERKLE_PAYMENT_EXPIRATION, SEVEN_DAYS_IN_SECONDS,
2495 "MERKLE_PAYMENT_EXPIRATION should be 7 days"
2496 );
2497 }
2498 }
2499}