ant_node/
node.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use super::{
10    Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkError, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::bootstrap::Bootstrap;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_evm::merkle_payments::MERKLE_PAYMENT_EXPIRATION;
22use ant_protocol::{
23    CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
24    error::Error as ProtocolError,
25    messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
26    storage::ValidationType,
27};
28use bytes::Bytes;
29use itertools::Itertools;
30use libp2p::{
31    Multiaddr, PeerId,
32    identity::Keypair,
33    kad::{Record, U256},
34    request_response::OutboundFailure,
35};
36use num_traits::cast::ToPrimitive;
37use rand::{
38    Rng, SeedableRng,
39    rngs::{OsRng, StdRng},
40    thread_rng,
41};
42use std::{
43    collections::HashMap,
44    net::SocketAddr,
45    path::PathBuf,
46    sync::{
47        Arc,
48        atomic::{AtomicUsize, Ordering},
49    },
50    time::{Duration, Instant},
51};
52use tokio::sync::watch;
53use tokio::{
54    sync::mpsc::Receiver,
55    task::{JoinSet, spawn},
56};
57
58/// Interval to trigger replication of all records to all peers.
59/// This is the max time it should take. Minimum interval at any node will be half this
60pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
61
62/// Interval to trigger storage challenge.
63/// This is the max time it should take. Minimum interval at any node will be half this
64const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
65
66/// Interval to update the nodes uptime metric
67const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
68
69/// Interval to clean up unrelevant records
70/// This is the max time it should take. Minimum interval at any node will be half this
71const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
72
73/// Highest score to achieve from each metric sub-sector during StorageChallenge.
74const HIGHEST_SCORE: usize = 100;
75
76/// Any nodes bearing a score below this shall be considered as bad.
77/// Max is to be 100 * 100
78const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
79
80/// in ms, expecting average StorageChallenge complete time to be around 250ms.
81const TIME_STEP: usize = 20;
82
83/// Helper to build and run a Node
84pub struct NodeBuilder {
85    addr: SocketAddr,
86    bootstrap: Bootstrap,
87    evm_address: RewardsAddress,
88    evm_network: EvmNetwork,
89    identity_keypair: Keypair,
90    local: bool,
91    #[cfg(feature = "open-metrics")]
92    /// Set to Some to enable the metrics server
93    metrics_server_port: Option<u16>,
94    no_upnp: bool,
95    relay_client: bool,
96    root_dir: PathBuf,
97}
98
99impl NodeBuilder {
100    /// Instantiate the builder. The initial peers can either be supplied via the `initial_peers` method
101    /// or fetched from the bootstrap cache set using `bootstrap_cache` method.
102    pub fn new(
103        identity_keypair: Keypair,
104        bootstrap_flow: Bootstrap,
105        evm_address: RewardsAddress,
106        evm_network: EvmNetwork,
107        addr: SocketAddr,
108        root_dir: PathBuf,
109    ) -> Self {
110        Self {
111            addr,
112            bootstrap: bootstrap_flow,
113            evm_address,
114            evm_network,
115            identity_keypair,
116            local: false,
117            #[cfg(feature = "open-metrics")]
118            metrics_server_port: None,
119            no_upnp: false,
120            relay_client: false,
121            root_dir,
122        }
123    }
124
125    /// Set the flag to indicate if the node is running in local mode
126    pub fn local(&mut self, local: bool) {
127        self.local = local;
128    }
129
130    #[cfg(feature = "open-metrics")]
131    /// Set the port for the OpenMetrics server. Defaults to a random port if not set
132    pub fn metrics_server_port(&mut self, port: Option<u16>) {
133        self.metrics_server_port = port;
134    }
135
136    /// Set the flag to make the node act as a relay client
137    pub fn relay_client(&mut self, relay_client: bool) {
138        self.relay_client = relay_client;
139    }
140
141    /// Set the flag to disable UPnP for the node
142    pub fn no_upnp(&mut self, no_upnp: bool) {
143        self.no_upnp = no_upnp;
144    }
145
146    /// Asynchronously runs a new node instance, setting up the swarm driver,
147    /// creating a data storage, and handling network events. Returns the
148    /// created `RunningNode` which contains a `NodeEventsChannel` for listening
149    /// to node-related events.
150    ///
151    /// # Returns
152    ///
153    /// A `RunningNode` instance.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if there is a problem initializing the Network.
158    pub fn build_and_run(self) -> Result<RunningNode> {
159        // setup metrics
160        #[cfg(feature = "open-metrics")]
161        let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
162            // metadata registry
163            let mut metrics_registries = MetricsRegistries::default();
164            let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
165
166            (Some(metrics_recorder), metrics_registries)
167        } else {
168            (None, MetricsRegistries::default())
169        };
170
171        // create a shutdown signal channel
172        let (shutdown_tx, shutdown_rx) = watch::channel(false);
173
174        // init network
175        let network_config = NetworkConfig {
176            keypair: self.identity_keypair,
177            local: self.local,
178            listen_addr: self.addr,
179            root_dir: self.root_dir.clone(),
180            shutdown_rx: shutdown_rx.clone(),
181            bootstrap: self.bootstrap,
182            no_upnp: self.no_upnp,
183            relay_client: self.relay_client,
184            custom_request_timeout: None,
185            #[cfg(feature = "open-metrics")]
186            metrics_registries,
187            #[cfg(feature = "open-metrics")]
188            metrics_server_port: self.metrics_server_port,
189        };
190        let (network, network_event_receiver) = Network::init(network_config)?;
191
192        // init node
193        let node_events_channel = NodeEventsChannel::default();
194        let node = NodeInner {
195            network: network.clone(),
196            events_channel: node_events_channel.clone(),
197            reward_address: self.evm_address,
198            #[cfg(feature = "open-metrics")]
199            metrics_recorder,
200            evm_network: self.evm_network,
201        };
202        let node = Node {
203            inner: Arc::new(node),
204        };
205
206        // Run the node
207        node.run(network_event_receiver, shutdown_rx);
208        let running_node = RunningNode {
209            shutdown_sender: shutdown_tx,
210            network,
211            node_events_channel,
212            root_dir_path: self.root_dir,
213            rewards_address: self.evm_address,
214        };
215
216        Ok(running_node)
217    }
218}
219
220/// `Node` represents a single node in the distributed network. It handles
221/// network events, processes incoming requests, interacts with the data
222/// storage, and broadcasts node-related events.
223#[derive(Clone)]
224pub(crate) struct Node {
225    inner: Arc<NodeInner>,
226}
227
228/// The actual implementation of the Node. The other is just a wrapper around this, so that we don't expose
229/// the Arc from the interface.
230struct NodeInner {
231    events_channel: NodeEventsChannel,
232    network: Network,
233    #[cfg(feature = "open-metrics")]
234    metrics_recorder: Option<NodeMetricsRecorder>,
235    reward_address: RewardsAddress,
236    evm_network: EvmNetwork,
237}
238
239impl Node {
240    /// Returns the NodeEventsChannel
241    pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
242        &self.inner.events_channel
243    }
244
245    /// Returns the instance of Network
246    pub(crate) fn network(&self) -> &Network {
247        &self.inner.network
248    }
249
250    #[cfg(feature = "open-metrics")]
251    /// Returns a reference to the NodeMetricsRecorder if the `open-metrics` feature flag is enabled
252    /// This is used to record various metrics for the node.
253    pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
254        self.inner.metrics_recorder.as_ref()
255    }
256
257    /// Returns the reward address of the node
258    pub(crate) fn reward_address(&self) -> &RewardsAddress {
259        &self.inner.reward_address
260    }
261
262    pub(crate) fn evm_network(&self) -> &EvmNetwork {
263        &self.inner.evm_network
264    }
265
266    /// Spawns a task to process for `NetworkEvents`.
267    /// Returns both tasks as JoinHandle<()>.
268    fn run(
269        self,
270        mut network_event_receiver: Receiver<NetworkEvent>,
271        mut shutdown_rx: watch::Receiver<bool>,
272    ) {
273        let mut rng = StdRng::from_entropy();
274
275        let peers_connected = Arc::new(AtomicUsize::new(0));
276
277        let _node_task = spawn(async move {
278            // use a random activity timeout to ensure that the nodes do not sync when messages
279            // are being transmitted.
280            let replication_interval: u64 = rng.gen_range(
281                PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
282            );
283            let replication_interval_time = Duration::from_secs(replication_interval);
284            debug!("Replication interval set to {replication_interval_time:?}");
285
286            let mut replication_interval = tokio::time::interval(replication_interval_time);
287            let _ = replication_interval.tick().await; // first tick completes immediately
288
289            let mut uptime_metrics_update_interval =
290                tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
291            let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately
292
293            // use a random activity timeout to ensure that the nodes do not sync on work,
294            // causing an overall CPU spike.
295            let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
296                UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
297                    ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
298            );
299            let irrelevant_records_cleanup_interval_time =
300                Duration::from_secs(irrelevant_records_cleanup_interval);
301            let mut irrelevant_records_cleanup_interval =
302                tokio::time::interval(irrelevant_records_cleanup_interval_time);
303            let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately
304
305            // use a random neighbour storage challenge ticker to ensure
306            // neighbours do not carryout challenges at the same time
307            let storage_challenge_interval: u64 =
308                rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
309            let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
310            debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
311
312            let mut storage_challenge_interval =
313                tokio::time::interval(storage_challenge_interval_time);
314            let _ = storage_challenge_interval.tick().await; // first tick completes immediately
315
316            loop {
317                let peers_connected = &peers_connected;
318
319                tokio::select! {
320                    // Check for a shutdown command.
321                    result = shutdown_rx.changed() => {
322                        if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
323                            info!("Shutdown signal received or sender dropped. Exiting network events loop.");
324                            break;
325                        }
326                    },
327                    net_event = network_event_receiver.recv() => {
328                        match net_event {
329                            Some(event) => {
330                                let start = Instant::now();
331                                let event_string = format!("{event:?}");
332
333                                self.handle_network_event(event, peers_connected);
334                                trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
335
336                            }
337                            None => {
338                                error!("The `NetworkEvent` channel is closed");
339                                self.events_channel().broadcast(NodeEvent::ChannelClosed);
340                                break;
341                            }
342                        }
343                    }
344                    // runs every replication_interval time
345                    _ = replication_interval.tick() => {
346                        let start = Instant::now();
347                        let network = self.network().clone();
348                        self.record_metrics(Marker::IntervalReplicationTriggered);
349
350                        let _handle = spawn(async move {
351                            Self::try_interval_replication(network);
352                            trace!("Periodic replication took {:?}", start.elapsed());
353                        });
354                    }
355                    _ = uptime_metrics_update_interval.tick() => {
356                        #[cfg(feature = "open-metrics")]
357                        if let Some(metrics_recorder) = self.metrics_recorder() {
358                            let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
359                        }
360                    }
361                    _ = irrelevant_records_cleanup_interval.tick() => {
362                        let network = self.network().clone();
363
364                        let _handle = spawn(async move {
365                            Self::trigger_irrelevant_record_cleanup(network);
366                        });
367                    }
368                    // runs every storage_challenge_interval time
369                    _ = storage_challenge_interval.tick() => {
370                        let start = Instant::now();
371                        debug!("Periodic storage challenge triggered");
372                        let network = self.network().clone();
373
374                        let _handle = spawn(async move {
375                            Self::storage_challenge(network).await;
376                            trace!("Periodic storage challenge took {:?}", start.elapsed());
377                        });
378                    }
379                }
380            }
381        });
382    }
383
384    /// Calls Marker::log() to insert the marker into the log files.
385    /// Also calls NodeMetrics::record() to record the metric if the `open-metrics` feature flag is enabled.
386    pub(crate) fn record_metrics(&self, marker: Marker) {
387        marker.log();
388        #[cfg(feature = "open-metrics")]
389        if let Some(metrics_recorder) = self.metrics_recorder() {
390            metrics_recorder.record(marker)
391        }
392    }
393
394    // **** Private helpers *****
395
396    /// Handle a network event.
397    /// Spawns a thread for any likely long running tasks
398    fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
399        let start = Instant::now();
400        let event_string = format!("{event:?}");
401        let event_header;
402
403        // Reducing non-mandatory logging
404        if let NetworkEvent::QueryRequestReceived {
405            query: Query::GetVersion { .. },
406            ..
407        } = event
408        {
409            trace!("Handling NetworkEvent {event_string}");
410        } else {
411            debug!("Handling NetworkEvent {event_string}");
412        }
413
414        match event {
415            NetworkEvent::PeerAdded(peer_id, connected_peers) => {
416                event_header = "PeerAdded";
417                // increment peers_connected and send ConnectedToNetwork event if have connected to K_VALUE peers
418                let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
419                if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
420                    self.events_channel()
421                        .broadcast(NodeEvent::ConnectedToNetwork);
422                }
423
424                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
425                self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
426
427                // try query peer version
428                let network = self.network().clone();
429                let _handle = spawn(async move {
430                    Self::try_query_peer_version(network, peer_id, Default::default()).await;
431                });
432
433                // try replication here
434                let network = self.network().clone();
435                self.record_metrics(Marker::IntervalReplicationTriggered);
436                let _handle = spawn(async move {
437                    Self::try_interval_replication(network);
438                });
439            }
440            NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
441                event_header = "PeerRemoved";
442                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
443                self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
444
445                let self_id = self.network().peer_id();
446                let distance =
447                    NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
448                info!(
449                    "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
450                    distance.ilog2()
451                );
452
453                let network = self.network().clone();
454                self.record_metrics(Marker::IntervalReplicationTriggered);
455                let _handle = spawn(async move {
456                    Self::try_interval_replication(network);
457                });
458            }
459            NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
460                event_header = "PeerWithUnsupportedProtocol";
461            }
462            NetworkEvent::NewListenAddr(_) => {
463                event_header = "NewListenAddr";
464            }
465            NetworkEvent::ResponseReceived { res } => {
466                event_header = "ResponseReceived";
467                if let Err(err) = self.handle_response(res) {
468                    error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
469                }
470            }
471            NetworkEvent::KeysToFetchForReplication(keys) => {
472                event_header = "KeysToFetchForReplication";
473                self.record_metrics(Marker::fetching_keys_for_replication(&keys));
474
475                if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
476                    error!("Error while trying to fetch replicated data {err:?}");
477                }
478            }
479            NetworkEvent::QueryRequestReceived { query, channel } => {
480                event_header = "QueryRequestReceived";
481                let node = self.clone();
482                let payment_address = *self.reward_address();
483
484                let _handle = spawn(async move {
485                    let network = node.network().clone();
486                    let res = Self::handle_query(node, query, payment_address).await;
487
488                    // Reducing non-mandatory logging
489                    if let Response::Query(QueryResponse::GetVersion { .. }) = res {
490                        trace!("Sending response {res:?}");
491                    } else {
492                        debug!("Sending response {res:?}");
493                    }
494
495                    network.send_response(res, channel);
496                });
497            }
498            NetworkEvent::UnverifiedRecord(record) => {
499                event_header = "UnverifiedRecord";
500                // queries can be long running and require validation, so we spawn a task to handle them
501                let self_clone = self.clone();
502                let _handle = spawn(async move {
503                    let key = PrettyPrintRecordKey::from(&record.key).into_owned();
504                    match self_clone.validate_and_store_record(record).await {
505                        Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
506                        Err(err) => {
507                            self_clone.record_metrics(Marker::RecordRejected(&key, &err));
508                        }
509                    }
510                });
511            }
512            NetworkEvent::TerminateNode { reason } => {
513                event_header = "TerminateNode";
514                error!("Received termination from swarm_driver due to {reason:?}");
515                self.events_channel()
516                    .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
517            }
518            NetworkEvent::FailedToFetchHolders(bad_nodes) => {
519                event_header = "FailedToFetchHolders";
520                let network = self.network().clone();
521                let pretty_log: Vec<_> = bad_nodes
522                    .iter()
523                    .map(|(peer_id, record_key)| {
524                        let pretty_key = PrettyPrintRecordKey::from(record_key);
525                        (peer_id, pretty_key)
526                    })
527                    .collect();
528                // Note: this log will be checked in CI, and expecting `not appear`.
529                //       any change to the keyword `failed to fetch` shall incur
530                //       correspondent CI script change as well.
531                debug!(
532                    "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
533                );
534                let _handle = spawn(async move {
535                    for (peer_id, record_key) in bad_nodes {
536                        // Obsoleted fetch request (due to flooded in fresh replicates) could result
537                        // in peer to be claimed as bad, as local copy blocks the entry to be cleared.
538                        if let Ok(false) = network.is_record_key_present_locally(&record_key).await
539                        {
540                            error!(
541                                "From peer {peer_id:?}, failed to fetch record {:?}",
542                                PrettyPrintRecordKey::from(&record_key)
543                            );
544                            network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
545                        }
546                    }
547                });
548            }
549            NetworkEvent::QuoteVerification { quotes } => {
550                event_header = "QuoteVerification";
551                let network = self.network().clone();
552
553                let _handle = spawn(async move {
554                    quotes_verification(&network, quotes).await;
555                });
556            }
557            NetworkEvent::FreshReplicateToFetch { holder, keys } => {
558                event_header = "FreshReplicateToFetch";
559                self.fresh_replicate_to_fetch(holder, keys);
560            }
561            NetworkEvent::PeersForVersionQuery(peers) => {
562                event_header = "PeersForVersionQuery";
563                let network = self.network().clone();
564                let _handle = spawn(async move {
565                    Self::query_peers_version(network, peers).await;
566                });
567            }
568            NetworkEvent::NetworkWideReplication { keys } => {
569                event_header = "NetworkWideReplication";
570                self.perform_network_wide_replication(keys);
571            }
572        }
573
574        trace!(
575            "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
576            start.elapsed()
577        );
578    }
579
580    // Handle the response that was not awaited at the call site
581    fn handle_response(&self, response: Response) -> Result<()> {
582        match response {
583            Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
584                // This should actually have been short-circuted when received
585                warn!("Mishandled replicate response, should be handled earlier");
586            }
587            Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
588                error!(
589                    "Response to replication shall be handled by called not by common handler, {resp:?}"
590                );
591            }
592            Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
593                // No need to handle
594            }
595            other => {
596                warn!("handle_response not implemented for {other:?}");
597            }
598        };
599
600        Ok(())
601    }
602
603    async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
604        let network = node.network();
605        let resp: QueryResponse = match query {
606            Query::GetStoreQuote {
607                key,
608                data_type,
609                data_size,
610                nonce,
611                difficulty,
612            } => {
613                let record_key = key.to_record_key();
614                let self_id = network.peer_id();
615
616                let maybe_quoting_metrics = network
617                    .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
618                    .await;
619
620                let storage_proofs = if let Some(nonce) = nonce {
621                    Self::respond_x_closest_record_proof(
622                        network,
623                        key.clone(),
624                        nonce,
625                        difficulty,
626                        false,
627                    )
628                    .await
629                } else {
630                    vec![]
631                };
632
633                match maybe_quoting_metrics {
634                    Ok((quoting_metrics, is_already_stored)) => {
635                        if is_already_stored {
636                            QueryResponse::GetStoreQuote {
637                                quote: Err(ProtocolError::RecordExists(
638                                    PrettyPrintRecordKey::from(&record_key).into_owned(),
639                                )),
640                                peer_address: NetworkAddress::from(self_id),
641                                storage_proofs,
642                            }
643                        } else {
644                            QueryResponse::GetStoreQuote {
645                                quote: Self::create_quote_for_storecost(
646                                    network,
647                                    &key,
648                                    &quoting_metrics,
649                                    &payment_address,
650                                ),
651                                peer_address: NetworkAddress::from(self_id),
652                                storage_proofs,
653                            }
654                        }
655                    }
656                    Err(err) => {
657                        warn!("GetStoreQuote failed for {key:?}: {err}");
658                        QueryResponse::GetStoreQuote {
659                            quote: Err(ProtocolError::GetStoreQuoteFailed),
660                            peer_address: NetworkAddress::from(self_id),
661                            storage_proofs,
662                        }
663                    }
664                }
665            }
666            Query::GetReplicatedRecord { requester: _, key } => {
667                let our_address = NetworkAddress::from(network.peer_id());
668                let record_key = key.to_record_key();
669
670                let result = match network.get_local_record(&record_key).await {
671                    Ok(Some(record)) => Ok((our_address, Bytes::from(record.value))),
672                    Ok(None) => Err(ProtocolError::ReplicatedRecordNotFound {
673                        holder: Box::new(our_address),
674                        key: Box::new(key.clone()),
675                    }),
676                    // Use `PutRecordFailed` as place holder
677                    Err(err) => Err(ProtocolError::PutRecordFailed(format!(
678                        "Error to fetch local record for GetReplicatedRecord {err:?}"
679                    ))),
680                };
681
682                QueryResponse::GetReplicatedRecord(result)
683            }
684            Query::GetChunkExistenceProof {
685                key,
686                nonce,
687                difficulty,
688            } => QueryResponse::GetChunkExistenceProof(
689                Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
690            ),
691            Query::CheckNodeInProblem(target_address) => {
692                debug!("Got CheckNodeInProblem for peer {target_address:?}");
693
694                let is_in_trouble =
695                    if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
696                        result
697                    } else {
698                        debug!("Could not get status of {target_address:?}.");
699                        false
700                    };
701
702                QueryResponse::CheckNodeInProblem {
703                    reporter_address: NetworkAddress::from(network.peer_id()),
704                    target_address,
705                    is_in_trouble,
706                }
707            }
708            Query::GetClosestPeers {
709                key,
710                num_of_peers,
711                range,
712                sign_result,
713            } => {
714                debug!(
715                    "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
716                );
717                Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
718                    .await
719            }
720            Query::GetVersion(_) => QueryResponse::GetVersion {
721                peer: NetworkAddress::from(network.peer_id()),
722                version: ant_build_info::package_version(),
723            },
724            Query::PutRecord {
725                holder,
726                address,
727                serialized_record,
728            } => {
729                let record = Record {
730                    key: address.to_record_key(),
731                    value: serialized_record,
732                    publisher: None,
733                    expires: None,
734                };
735
736                let key = PrettyPrintRecordKey::from(&record.key).into_owned();
737                let result = match node.validate_and_store_record(record).await {
738                    Ok(()) => Ok(()),
739                    Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
740                        node.record_metrics(Marker::RecordRejected(
741                            &key,
742                            &PutValidationError::OutdatedRecordCounter { counter, expected },
743                        ));
744                        Err(ProtocolError::OutdatedRecordCounter { counter, expected })
745                    }
746                    Err(PutValidationError::TopologyVerificationFailed {
747                        target_address,
748                        valid_count,
749                        total_paid,
750                        closest_count,
751                        node_peers,
752                        paid_peers,
753                    }) => {
754                        node.record_metrics(Marker::RecordRejected(
755                            &key,
756                            &PutValidationError::TopologyVerificationFailed {
757                                target_address: target_address.clone(),
758                                valid_count,
759                                total_paid,
760                                closest_count,
761                                node_peers: node_peers.clone(),
762                                paid_peers: paid_peers.clone(),
763                            },
764                        ));
765                        Err(ProtocolError::TopologyVerificationFailed {
766                            target_address: Box::new(target_address),
767                            valid_count,
768                            total_paid,
769                            closest_count,
770                            node_peers,
771                            paid_peers,
772                        })
773                    }
774                    Err(err) => {
775                        node.record_metrics(Marker::RecordRejected(&key, &err));
776                        Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
777                    }
778                };
779                QueryResponse::PutRecord {
780                    result,
781                    peer_address: holder,
782                    record_addr: address,
783                }
784            }
785            Query::GetMerkleCandidateQuote {
786                key,
787                data_type,
788                data_size,
789                merkle_payment_timestamp,
790            } => {
791                Self::respond_merkle_candidate_quote(
792                    network,
793                    key,
794                    data_type,
795                    data_size,
796                    merkle_payment_timestamp,
797                    payment_address,
798                )
799                .await
800            }
801        };
802        Response::Query(resp)
803    }
804
805    async fn respond_get_closest_peers(
806        network: &Network,
807        target: NetworkAddress,
808        num_of_peers: Option<usize>,
809        range: Option<[u8; 32]>,
810        sign_result: bool,
811    ) -> QueryResponse {
812        let local_peers = network.get_local_peers_with_multiaddr().await;
813        let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
814            Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
815        } else {
816            vec![]
817        };
818
819        let signature = if sign_result {
820            let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
821            bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
822            network.sign(&bytes).ok()
823        } else {
824            None
825        };
826
827        QueryResponse::GetClosestPeers {
828            target,
829            peers,
830            signature,
831        }
832    }
833
834    fn calculate_get_closest_peers(
835        peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
836        target: NetworkAddress,
837        num_of_peers: Option<usize>,
838        range: Option<[u8; 32]>,
839    ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
840        match (num_of_peers, range) {
841            (_, Some(value)) => {
842                let distance = U256::from_big_endian(&value);
843                peer_addrs
844                    .iter()
845                    .filter_map(|(peer_id, multi_addrs)| {
846                        let addr = NetworkAddress::from(*peer_id);
847                        if target.distance(&addr).0 <= distance {
848                            Some((addr, multi_addrs.clone()))
849                        } else {
850                            None
851                        }
852                    })
853                    .collect()
854            }
855            (Some(num_of_peers), _) => {
856                let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
857                    .iter()
858                    .map(|(peer_id, multi_addrs)| {
859                        let addr = NetworkAddress::from(*peer_id);
860                        (addr, multi_addrs.clone())
861                    })
862                    .collect();
863                result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
864                result.into_iter().take(num_of_peers).collect()
865            }
866            (None, None) => vec![],
867        }
868    }
869
870    /// Handle GetMerkleCandidateQuote query
871    /// Returns a signed MerklePaymentCandidateNode containing the node's current quoting metrics,
872    /// reward address, and timestamp commitment
873    async fn respond_merkle_candidate_quote(
874        network: &Network,
875        key: NetworkAddress,
876        data_type: u32,
877        data_size: usize,
878        merkle_payment_timestamp: u64,
879        payment_address: RewardsAddress,
880    ) -> QueryResponse {
881        debug!(
882            "merkle payment: GetMerkleCandidateQuote for target {key:?}, timestamp: {merkle_payment_timestamp}, data_type: {data_type}, data_size: {data_size}"
883        );
884
885        // Validate timestamp before signing to prevent committing to invalid times.
886        // Nodes will reject proofs with expired/future timestamps during payment verification,
887        // so signing such timestamps would create useless quotes that can't be used.
888        //
889        // Allow ±24 hours tolerance to handle timezone differences and clock skew between
890        // clients and nodes. This prevents valid payments from being rejected due to minor
891        // time differences while still catching truly expired/invalid timestamps.
892        const TIMESTAMP_TOLERANCE: u64 = 24 * 60 * 60; // 24 hours
893
894        let now = std::time::SystemTime::now()
895            .duration_since(std::time::UNIX_EPOCH)
896            .unwrap_or_default()
897            .as_secs();
898
899        // Reject future timestamps (with 24h tolerance for clock skew)
900        let future_threshold = now + TIMESTAMP_TOLERANCE;
901        if merkle_payment_timestamp > future_threshold {
902            let error_msg = format!(
903                "Rejected future timestamp {merkle_payment_timestamp} (current time: {now}, threshold: {future_threshold})"
904            );
905            warn!("{error_msg} for {key:?}");
906            return QueryResponse::GetMerkleCandidateQuote(Err(
907                ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
908            ));
909        }
910
911        // Reject expired timestamps (with 24h tolerance for clock skew)
912        let expiration_threshold = MERKLE_PAYMENT_EXPIRATION + TIMESTAMP_TOLERANCE;
913        let age = now.saturating_sub(merkle_payment_timestamp);
914        if age > expiration_threshold {
915            let error_msg = format!(
916                "Rejected expired timestamp {merkle_payment_timestamp} (age: {age}s, max: {expiration_threshold}s)",
917            );
918            warn!("{error_msg} for {key:?}");
919            return QueryResponse::GetMerkleCandidateQuote(Err(
920                ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
921            ));
922        }
923
924        // Get node's current quoting metrics
925        let record_key = key.to_record_key();
926        let (quoting_metrics, _is_already_stored) = match network
927            .get_local_quoting_metrics(record_key, data_type, data_size)
928            .await
929        {
930            Ok(metrics) => metrics,
931            Err(err) => {
932                let error_msg = format!("Failed to get quoting metrics for {key:?}: {err}");
933                warn!("{error_msg}");
934                return QueryResponse::GetMerkleCandidateQuote(Err(
935                    ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
936                ));
937            }
938        };
939
940        // Create the MerklePaymentCandidateNode with node's signed commitment
941        let pub_key = network.get_pub_key();
942        let reward_address = payment_address;
943        let bytes = ant_evm::merkle_payments::MerklePaymentCandidateNode::bytes_to_sign(
944            &quoting_metrics,
945            &reward_address,
946            merkle_payment_timestamp,
947        );
948        let signature = match network.sign(&bytes) {
949            Ok(sig) => sig,
950            Err(e) => {
951                let error_msg = format!("Failed to sign candidate node for {key:?}: {e}");
952                error!("{error_msg}");
953                return QueryResponse::GetMerkleCandidateQuote(Err(
954                    ProtocolError::FailedToSignMerkleCandidate(error_msg),
955                ));
956            }
957        };
958
959        let candidate = ant_evm::merkle_payments::MerklePaymentCandidateNode {
960            quoting_metrics,
961            reward_address,
962            merkle_payment_timestamp,
963            pub_key,
964            signature,
965        };
966        QueryResponse::GetMerkleCandidateQuote(Ok(candidate))
967    }
968
969    // Nodes only check ChunkProof each other, to avoid `multi-version` issue
970    // Client check proof against all records, as have to fetch from network anyway.
971    async fn respond_x_closest_record_proof(
972        network: &Network,
973        key: NetworkAddress,
974        nonce: Nonce,
975        difficulty: usize,
976        chunk_only: bool,
977    ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
978        let start = Instant::now();
979        let mut results = vec![];
980        if difficulty == 1 {
981            // Client checking existence of published chunk.
982            let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
983            if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
984                let proof = ChunkProof::new(&record.value, nonce);
985                debug!("Chunk proof for {key:?} is {proof:?}");
986                result = Ok(proof)
987            } else {
988                debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
989            }
990
991            results.push((key.clone(), result));
992        } else {
993            let all_local_records = network.get_all_local_record_addresses().await;
994
995            if let Ok(all_local_records) = all_local_records {
996                let mut all_chunk_addrs: Vec<_> = if chunk_only {
997                    all_local_records
998                        .iter()
999                        .filter_map(|(addr, record_type)| {
1000                            if *record_type == ValidationType::Chunk {
1001                                Some(addr.clone())
1002                            } else {
1003                                None
1004                            }
1005                        })
1006                        .collect()
1007                } else {
1008                    all_local_records.keys().cloned().collect()
1009                };
1010
1011                // Sort by distance and only take first X closest entries
1012                all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
1013
1014                // TODO: this shall be deduced from resource usage dynamically
1015                let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
1016
1017                for addr in all_chunk_addrs.iter().take(workload_factor) {
1018                    if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
1019                    {
1020                        let proof = ChunkProof::new(&record.value, nonce);
1021                        debug!("Chunk proof for {key:?} is {proof:?}");
1022                        results.push((addr.clone(), Ok(proof)));
1023                    }
1024                }
1025            }
1026
1027            info!(
1028                "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
1029                results.len(),
1030                start.elapsed()
1031            );
1032        }
1033
1034        results
1035    }
1036
1037    /// Check among all chunk type records that we have,
1038    /// and randomly pick one as the verification candidate.
1039    /// This will challenge all closest peers at once.
1040    async fn storage_challenge(network: Network) {
1041        let start = Instant::now();
1042        let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
1043            network.get_k_closest_local_peers_to_the_target(None).await
1044        {
1045            closest_peers
1046                .into_iter()
1047                .take(CLOSE_GROUP_SIZE)
1048                .collect_vec()
1049        } else {
1050            error!("Cannot get local neighbours");
1051            return;
1052        };
1053        if closest_peers.len() < CLOSE_GROUP_SIZE {
1054            debug!(
1055                "Not enough neighbours ({}/{}) to carry out storage challenge.",
1056                closest_peers.len(),
1057                CLOSE_GROUP_SIZE
1058            );
1059            return;
1060        }
1061
1062        let mut verify_candidates: Vec<NetworkAddress> =
1063            if let Ok(all_keys) = network.get_all_local_record_addresses().await {
1064                all_keys
1065                    .iter()
1066                    .filter_map(|(addr, record_type)| {
1067                        if ValidationType::Chunk == *record_type {
1068                            Some(addr.clone())
1069                        } else {
1070                            None
1071                        }
1072                    })
1073                    .collect()
1074            } else {
1075                error!("Failed to get local record addresses.");
1076                return;
1077            };
1078        let num_of_targets = verify_candidates.len();
1079        if num_of_targets < 50 {
1080            debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
1081            return;
1082        }
1083
1084        // To ensure the neighbours sharing same knowledge as to us,
1085        // The target is choosen to be not far from us.
1086        let self_addr = NetworkAddress::from(network.peer_id());
1087        verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
1088        let index: usize = OsRng.gen_range(0..num_of_targets / 2);
1089        let target = verify_candidates[index].clone();
1090        // TODO: workload shall be dynamically deduced from resource usage
1091        let difficulty = CLOSE_GROUP_SIZE;
1092        verify_candidates.sort_by_key(|addr| target.distance(addr));
1093        let expected_targets = verify_candidates.into_iter().take(difficulty);
1094        let nonce: Nonce = thread_rng().r#gen::<u64>();
1095        let mut expected_proofs = HashMap::new();
1096        for addr in expected_targets {
1097            if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
1098                let expected_proof = ChunkProof::new(&record.value, nonce);
1099                let _ = expected_proofs.insert(addr, expected_proof);
1100            } else {
1101                error!("Local record {addr:?} cann't be loaded from disk.");
1102            }
1103        }
1104        let request = Request::Query(Query::GetChunkExistenceProof {
1105            key: target.clone(),
1106            nonce,
1107            difficulty,
1108        });
1109
1110        let mut tasks = JoinSet::new();
1111        for (peer_id, addresses) in closest_peers {
1112            if peer_id == network.peer_id() {
1113                continue;
1114            }
1115            let network_clone = network.clone();
1116            let request_clone = request.clone();
1117            let expected_proofs_clone = expected_proofs.clone();
1118            let _ = tasks.spawn(async move {
1119                let res = scoring_peer(
1120                    network_clone,
1121                    (peer_id, addresses),
1122                    request_clone,
1123                    expected_proofs_clone,
1124                )
1125                .await;
1126                (peer_id, res)
1127            });
1128        }
1129
1130        let mut peer_scores = vec![];
1131        while let Some(res) = tasks.join_next().await {
1132            match res {
1133                Ok((peer_id, score)) => {
1134                    let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
1135                    if !is_healthy {
1136                        info!(
1137                            "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
1138                        );
1139                        // TODO: shall the challenge failure immediately triggers the node to be removed?
1140                        network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
1141                    }
1142                    peer_scores.push((peer_id, is_healthy));
1143                }
1144                Err(e) => {
1145                    info!("StorageChallenge task completed with error {e:?}");
1146                }
1147            }
1148        }
1149        if !peer_scores.is_empty() {
1150            network.notify_peer_scores(peer_scores);
1151        }
1152
1153        info!(
1154            "Completed node StorageChallenge against neighbours in {:?}!",
1155            start.elapsed()
1156        );
1157    }
1158
1159    /// Query peers' versions and update local knowledge.
1160    async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1161        // To avoid choking, carry out the queries one by one
1162        for (peer_id, addrs) in peers {
1163            Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1164        }
1165    }
1166
1167    /// Query peer's version and update local knowledge.
1168    async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1169        let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1170        // We can skip passing `addrs` here as the new peer should be part of the kad::RT and swarm can get the addr.
1171        let version = match network.send_request(request, peer, addrs).await {
1172            Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1173                trace!("Fetched peer version {peer:?} as {version:?}");
1174                version
1175            }
1176            Ok(other) => {
1177                info!("Not a fetched peer version {peer:?}, {other:?}");
1178                "none".to_string()
1179            }
1180            Err(err) => {
1181                info!("Failed to fetch peer version {peer:?} with error {err:?}");
1182                // Failed version fetch (which contains dial then re-attempt by itself)
1183                // with error of `DialFailure` indicates the peer could be dead with high chance.
1184                // In that case, the peer shall be removed from the routing table.
1185                if let NetworkError::OutboundError(OutboundFailure::DialFailure) = err {
1186                    network.remove_peer(peer);
1187                    return;
1188                }
1189                "old".to_string()
1190            }
1191        };
1192        network.notify_node_version(peer, version);
1193    }
1194}
1195
1196async fn scoring_peer(
1197    network: Network,
1198    peer: (PeerId, Addresses),
1199    request: Request,
1200    expected_proofs: HashMap<NetworkAddress, ChunkProof>,
1201) -> usize {
1202    let peer_id = peer.0;
1203    let start = Instant::now();
1204    let responses = network
1205        .send_and_get_responses(&[peer], &request, true)
1206        .await;
1207
1208    if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
1209        responses.get(&peer_id)
1210    {
1211        if answers.is_empty() {
1212            info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
1213            return 0;
1214        }
1215        let elapsed = start.elapsed();
1216
1217        let mut received_proofs = vec![];
1218        for (addr, proof) in answers {
1219            if let Ok(proof) = proof {
1220                received_proofs.push((addr.clone(), proof.clone()));
1221            }
1222        }
1223
1224        let score = mark_peer(elapsed, received_proofs, &expected_proofs);
1225        info!(
1226            "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
1227            answers.len()
1228        );
1229        score
1230    } else {
1231        info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
1232        0
1233    }
1234}
1235
1236// Based on following metrics:
1237//   * the duration
1238//   * is there false answer
1239//   * percentage of correct answers among the expected closest
1240// The higher the score, the better confidence on the peer
1241fn mark_peer(
1242    duration: Duration,
1243    answers: Vec<(NetworkAddress, ChunkProof)>,
1244    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1245) -> usize {
1246    let duration_score = duration_score_scheme(duration);
1247    let challenge_score = challenge_score_scheme(answers, expected_proofs);
1248
1249    duration_score * challenge_score
1250}
1251
1252// Less duration shall get higher score
1253fn duration_score_scheme(duration: Duration) -> usize {
1254    // So far just a simple stepped scheme, capped by HIGHEST_SCORE
1255    let in_ms = if let Some(value) = duration.as_millis().to_usize() {
1256        value
1257    } else {
1258        info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
1259        1000
1260    };
1261
1262    let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
1263    HIGHEST_SCORE - step
1264}
1265
1266// Any false answer shall result in 0 score immediately
1267fn challenge_score_scheme(
1268    answers: Vec<(NetworkAddress, ChunkProof)>,
1269    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1270) -> usize {
1271    let mut correct_answers = 0;
1272    for (addr, chunk_proof) in answers {
1273        if let Some(expected_proof) = expected_proofs.get(&addr) {
1274            if expected_proof.verify(&chunk_proof) {
1275                correct_answers += 1;
1276            } else {
1277                info!("Spot a false answer to the challenge regarding {addr:?}");
1278                // Any false answer shall result in 0 score immediately
1279                return 0;
1280            }
1281        }
1282    }
1283    // TODO: For those answers not among the expected_proofs,
1284    //       it could be due to having different knowledge of records to us.
1285    //       shall we:
1286    //         * set the target being close to us, so that neighbours sharing same knowledge in higher chance
1287    //         * fetch from local to testify
1288    //         * fetch from network to testify
1289    std::cmp::min(
1290        HIGHEST_SCORE,
1291        HIGHEST_SCORE * correct_answers / expected_proofs.len(),
1292    )
1293}
1294
1295#[cfg(test)]
1296mod tests {
1297    use super::*;
1298    use std::str::FromStr;
1299
1300    #[test]
1301    fn test_no_local_peers() {
1302        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
1303        let target = NetworkAddress::from(PeerId::random());
1304        let num_of_peers = Some(5);
1305        let range = None;
1306        let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
1307
1308        assert_eq!(result, vec![]);
1309    }
1310
1311    #[test]
1312    fn test_fewer_local_peers_than_num_of_peers() {
1313        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1314            (
1315                PeerId::random(),
1316                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1317            ),
1318            (
1319                PeerId::random(),
1320                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1321            ),
1322            (
1323                PeerId::random(),
1324                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1325            ),
1326        ];
1327        let target = NetworkAddress::from(PeerId::random());
1328        let num_of_peers = Some(2);
1329        let range = None;
1330        let result = Node::calculate_get_closest_peers(
1331            local_peers.clone(),
1332            target.clone(),
1333            num_of_peers,
1334            range,
1335        );
1336
1337        // Result shall be sorted and truncated
1338        let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1339            .iter()
1340            .map(|(peer_id, multi_addrs)| {
1341                let addr = NetworkAddress::from(*peer_id);
1342                (addr, multi_addrs.clone())
1343            })
1344            .collect();
1345        expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
1346        let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
1347
1348        assert_eq!(expected_result, result);
1349    }
1350
1351    #[test]
1352    fn test_with_range_and_num_of_peers() {
1353        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1354            (
1355                PeerId::random(),
1356                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1357            ),
1358            (
1359                PeerId::random(),
1360                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1361            ),
1362            (
1363                PeerId::random(),
1364                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1365            ),
1366        ];
1367        let target = NetworkAddress::from(PeerId::random());
1368        let num_of_peers = Some(0);
1369        let range_value = [128; 32];
1370        let range = Some(range_value);
1371        let result = Node::calculate_get_closest_peers(
1372            local_peers.clone(),
1373            target.clone(),
1374            num_of_peers,
1375            range,
1376        );
1377
1378        // Range shall be preferred, i.e. the result peers shall all within the range
1379        let distance = U256::from_big_endian(&range_value);
1380        let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1381            .into_iter()
1382            .filter_map(|(peer_id, multi_addrs)| {
1383                let addr = NetworkAddress::from(peer_id);
1384                if target.distance(&addr).0 <= distance {
1385                    Some((addr, multi_addrs.clone()))
1386                } else {
1387                    None
1388                }
1389            })
1390            .collect();
1391
1392        assert_eq!(expected_result, result);
1393    }
1394
1395    mod merkle_payment_tests {
1396        use super::*;
1397
1398        /// Test that timestamp validation accepts valid timestamps (within the acceptable window)
1399        #[test]
1400        fn test_timestamp_validation_accepts_valid_timestamp() {
1401            let now = std::time::SystemTime::now()
1402                .duration_since(std::time::UNIX_EPOCH)
1403                .unwrap()
1404                .as_secs();
1405
1406            // Valid timestamp: 1 hour ago
1407            let valid_timestamp = now - 3600;
1408
1409            // Validate timestamp
1410            let age = now.saturating_sub(valid_timestamp);
1411
1412            assert!(
1413                valid_timestamp <= now,
1414                "Valid timestamp should not be in the future"
1415            );
1416            assert!(
1417                age <= MERKLE_PAYMENT_EXPIRATION,
1418                "Valid timestamp should not be expired"
1419            );
1420        }
1421
1422        /// Test that timestamp validation rejects future timestamps
1423        #[test]
1424        fn test_timestamp_validation_rejects_future_timestamp() {
1425            let now = std::time::SystemTime::now()
1426                .duration_since(std::time::UNIX_EPOCH)
1427                .unwrap()
1428                .as_secs();
1429
1430            // Future timestamp: 1 hour in the future
1431            let future_timestamp = now + 3600;
1432
1433            // Timestamp should be rejected
1434            assert!(
1435                future_timestamp > now,
1436                "Future timestamp should be rejected"
1437            );
1438        }
1439
1440        /// Test that timestamp validation rejects expired timestamps
1441        #[test]
1442        fn test_timestamp_validation_rejects_expired_timestamp() {
1443            let now = std::time::SystemTime::now()
1444                .duration_since(std::time::UNIX_EPOCH)
1445                .unwrap()
1446                .as_secs();
1447
1448            // Expired timestamp: 8 days ago (> 7 day expiration)
1449            let expired_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 86400);
1450
1451            // Calculate age
1452            let age = now.saturating_sub(expired_timestamp);
1453
1454            // Timestamp should be rejected
1455            assert!(
1456                age > MERKLE_PAYMENT_EXPIRATION,
1457                "Expired timestamp should be rejected"
1458            );
1459        }
1460
1461        /// Test timestamp at the exact expiration boundary (should be rejected)
1462        #[test]
1463        fn test_timestamp_validation_at_expiration_boundary() {
1464            let now = std::time::SystemTime::now()
1465                .duration_since(std::time::UNIX_EPOCH)
1466                .unwrap()
1467                .as_secs();
1468
1469            // Timestamp exactly at expiration boundary
1470            let boundary_timestamp = now - MERKLE_PAYMENT_EXPIRATION;
1471
1472            let age = now.saturating_sub(boundary_timestamp);
1473
1474            // At the boundary, age == MERKLE_PAYMENT_EXPIRATION
1475            assert_eq!(age, MERKLE_PAYMENT_EXPIRATION);
1476            // The validation uses >, so this should pass
1477            assert!(
1478                age <= MERKLE_PAYMENT_EXPIRATION,
1479                "Timestamp exactly at boundary should not be rejected"
1480            );
1481        }
1482
1483        /// Test timestamp just beyond expiration boundary (should be rejected)
1484        #[test]
1485        fn test_timestamp_validation_beyond_expiration_boundary() {
1486            let now = std::time::SystemTime::now()
1487                .duration_since(std::time::UNIX_EPOCH)
1488                .unwrap()
1489                .as_secs();
1490
1491            // Timestamp just beyond expiration boundary (1 second past)
1492            let beyond_boundary_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 1);
1493
1494            let age = now.saturating_sub(beyond_boundary_timestamp);
1495
1496            assert!(
1497                age > MERKLE_PAYMENT_EXPIRATION,
1498                "Timestamp beyond boundary should be rejected"
1499            );
1500        }
1501
1502        /// Test timestamp at current time (should be accepted)
1503        #[test]
1504        fn test_timestamp_validation_at_current_time() {
1505            let now = std::time::SystemTime::now()
1506                .duration_since(std::time::UNIX_EPOCH)
1507                .unwrap()
1508                .as_secs();
1509
1510            // Timestamp at current time
1511            let current_timestamp = now;
1512
1513            let age = now.saturating_sub(current_timestamp);
1514
1515            assert!(
1516                current_timestamp <= now,
1517                "Current timestamp should not be in future"
1518            );
1519            assert!(
1520                age <= MERKLE_PAYMENT_EXPIRATION,
1521                "Current timestamp should not be expired"
1522            );
1523            assert_eq!(age, 0, "Age should be 0 for current timestamp");
1524        }
1525
1526        /// Test timestamp near future boundary (1 second in future)
1527        #[test]
1528        fn test_timestamp_validation_near_future_boundary() {
1529            let now = std::time::SystemTime::now()
1530                .duration_since(std::time::UNIX_EPOCH)
1531                .unwrap()
1532                .as_secs();
1533
1534            // Timestamp 1 second in the future
1535            let near_future_timestamp = now + 1;
1536
1537            assert!(
1538                near_future_timestamp > now,
1539                "Near-future timestamp should be rejected"
1540            );
1541        }
1542
1543        /// Test expiration constant is set correctly (7 days = 604800 seconds)
1544        #[test]
1545        fn test_merkle_payment_expiration_constant() {
1546            const SEVEN_DAYS_IN_SECONDS: u64 = 7 * 24 * 60 * 60;
1547            assert_eq!(
1548                MERKLE_PAYMENT_EXPIRATION, SEVEN_DAYS_IN_SECONDS,
1549                "MERKLE_PAYMENT_EXPIRATION should be 7 days"
1550            );
1551        }
1552    }
1553}