ant_node/
node.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use super::{
10    Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkError, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::bootstrap::Bootstrap;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_evm::merkle_payments::MERKLE_PAYMENT_EXPIRATION;
22use ant_protocol::{
23    CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
24    error::Error as ProtocolError,
25    messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
26    storage::{Chunk, ValidationType, try_deserialize_record},
27};
28use bytes::Bytes;
29use futures::stream::{self, StreamExt};
30use itertools::Itertools;
31use libp2p::{
32    Multiaddr, PeerId,
33    identity::Keypair,
34    kad::{KBucketDistance as Distance, Record, U256},
35    request_response::OutboundFailure,
36};
37use num_traits::cast::ToPrimitive;
38use rand::{
39    Rng, SeedableRng,
40    rngs::{OsRng, StdRng},
41    seq::SliceRandom,
42    thread_rng,
43};
44use std::{
45    collections::{BTreeSet, HashMap, HashSet},
46    net::SocketAddr,
47    path::PathBuf,
48    sync::{
49        Arc,
50        atomic::{AtomicUsize, Ordering},
51    },
52    time::{Duration, Instant},
53};
54use tokio::sync::{Mutex, watch};
55use tokio::{
56    sync::mpsc::Receiver,
57    task::{JoinSet, spawn},
58};
59
60/// Interval to trigger replication of all records to all peers.
61/// This is the max time it should take. Minimum interval at any node will be half this
62pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
63
64/// Interval to trigger storage challenge.
65/// This is the max time it should take. Minimum interval at any node will be half this
66const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
67
68/// Interval to update the nodes uptime metric
69const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
70
71/// Interval to clean up unrelevant records
72/// This is the max time it should take. Minimum interval at any node will be half this
73const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
74
75/// Highest score to achieve from each metric sub-sector during StorageChallenge.
76const HIGHEST_SCORE: usize = 100;
77
78/// Any nodes bearing a score below this shall be considered as bad.
79/// Max is to be 100 * 100
80const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
81
82/// in ms, expecting average StorageChallenge complete time to be around 250ms.
83const TIME_STEP: usize = 20;
84
85/// Delay before retrying a failed replicated record fetch.
86const REPLICA_FETCH_RETRY_DELAY: Duration = Duration::from_secs(15 * 60);
87
88/// Number of peers to query for a replicated record health check.
89const REPLICA_FETCH_PEER_COUNT: usize = 5;
90
91/// Minimum number of successful replicas required to consider the record healthy.
92const MIN_HEALTHY_REPLICA_COUNT: usize = 3;
93
94/// Index (0-based, including self) used to derive the distance threshold for nearby records.
95const CLOSE_NEIGHBOUR_DISTANCE_INDEX: usize = 7;
96
97/// Maximum number of closest peers to track for replication behaviour detection.
98///
99/// Set to 20 (roughly 4x CLOSE_GROUP_SIZE) to cover peers that are close enough
100/// to affect data responsibility during churn. Tracking only the closest peers
101/// keeps memory bounded while still detecting restart patterns in the critical
102/// zone where replication matters most.
103const CLOSE_GROUP_TRACKING_LIMIT: usize = 20;
104
105/// Grace period before triggering replication after detecting a peer restart.
106///
107/// When a peer is detected as restarting (removed then quickly re-added),
108/// replication is suppressed for this duration to allow the peer to stabilize.
109/// Set to 90 seconds which is:
110/// - Long enough to cover typical node restart times (30-60s)
111/// - Short enough to ensure data availability isn't compromised
112/// - Approximately half of PERIODIC_REPLICATION_INTERVAL_MAX_S, so the periodic
113///   replication will catch any missed updates within a reasonable timeframe
114const CLOSE_GROUP_RESTART_SUPPRESSION: Duration = Duration::from_secs(90);
115
116/// Helper to build and run a Node
117pub struct NodeBuilder {
118    addr: SocketAddr,
119    bootstrap: Bootstrap,
120    evm_address: RewardsAddress,
121    evm_network: EvmNetwork,
122    identity_keypair: Keypair,
123    local: bool,
124    #[cfg(feature = "open-metrics")]
125    /// Set to Some to enable the metrics server
126    metrics_server_port: Option<u16>,
127    no_upnp: bool,
128    relay_client: bool,
129    root_dir: PathBuf,
130}
131
132impl NodeBuilder {
133    /// Instantiate the builder. The initial peers can either be supplied via the `initial_peers` method
134    /// or fetched from the bootstrap cache set using `bootstrap_cache` method.
135    pub fn new(
136        identity_keypair: Keypair,
137        bootstrap_flow: Bootstrap,
138        evm_address: RewardsAddress,
139        evm_network: EvmNetwork,
140        addr: SocketAddr,
141        root_dir: PathBuf,
142    ) -> Self {
143        Self {
144            addr,
145            bootstrap: bootstrap_flow,
146            evm_address,
147            evm_network,
148            identity_keypair,
149            local: false,
150            #[cfg(feature = "open-metrics")]
151            metrics_server_port: None,
152            no_upnp: false,
153            relay_client: false,
154            root_dir,
155        }
156    }
157
158    /// Set the flag to indicate if the node is running in local mode
159    pub fn local(&mut self, local: bool) {
160        self.local = local;
161    }
162
163    #[cfg(feature = "open-metrics")]
164    /// Set the port for the OpenMetrics server. Defaults to a random port if not set
165    pub fn metrics_server_port(&mut self, port: Option<u16>) {
166        self.metrics_server_port = port;
167    }
168
169    /// Set the flag to make the node act as a relay client
170    pub fn relay_client(&mut self, relay_client: bool) {
171        self.relay_client = relay_client;
172    }
173
174    /// Set the flag to disable UPnP for the node
175    pub fn no_upnp(&mut self, no_upnp: bool) {
176        self.no_upnp = no_upnp;
177    }
178
179    /// Asynchronously runs a new node instance, setting up the swarm driver,
180    /// creating a data storage, and handling network events. Returns the
181    /// created `RunningNode` which contains a `NodeEventsChannel` for listening
182    /// to node-related events.
183    ///
184    /// # Returns
185    ///
186    /// A `RunningNode` instance.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if there is a problem initializing the Network.
191    pub fn build_and_run(self) -> Result<RunningNode> {
192        // setup metrics
193        #[cfg(feature = "open-metrics")]
194        let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
195            // metadata registry
196            let mut metrics_registries = MetricsRegistries::default();
197            let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
198
199            (Some(metrics_recorder), metrics_registries)
200        } else {
201            (None, MetricsRegistries::default())
202        };
203
204        // create a shutdown signal channel
205        let (shutdown_tx, shutdown_rx) = watch::channel(false);
206
207        // init network
208        let network_config = NetworkConfig {
209            keypair: self.identity_keypair,
210            local: self.local,
211            listen_addr: self.addr,
212            root_dir: self.root_dir.clone(),
213            shutdown_rx: shutdown_rx.clone(),
214            bootstrap: self.bootstrap,
215            no_upnp: self.no_upnp,
216            relay_client: self.relay_client,
217            custom_request_timeout: None,
218            #[cfg(feature = "open-metrics")]
219            metrics_registries,
220            #[cfg(feature = "open-metrics")]
221            metrics_server_port: self.metrics_server_port,
222        };
223        let (network, network_event_receiver) = Network::init(network_config)?;
224
225        // init node
226        let node_events_channel = NodeEventsChannel::default();
227        let node = NodeInner {
228            network: network.clone(),
229            events_channel: node_events_channel.clone(),
230            close_group_tracker: Mutex::new(CloseGroupTracker::new(network.peer_id())),
231            reward_address: self.evm_address,
232            #[cfg(feature = "open-metrics")]
233            metrics_recorder,
234            evm_network: self.evm_network,
235        };
236        let node = Node {
237            inner: Arc::new(node),
238        };
239
240        // Run the node
241        node.run(network_event_receiver, shutdown_rx);
242        let running_node = RunningNode {
243            shutdown_sender: shutdown_tx,
244            network,
245            node_events_channel,
246            root_dir_path: self.root_dir,
247            rewards_address: self.evm_address,
248        };
249
250        Ok(running_node)
251    }
252}
253
254/// `Node` represents a single node in the distributed network. It handles
255/// network events, processes incoming requests, interacts with the data
256/// storage, and broadcasts node-related events.
257#[derive(Clone)]
258pub(crate) struct Node {
259    inner: Arc<NodeInner>,
260}
261
262/// The actual implementation of the Node. The other is just a wrapper around this, so that we don't expose
263/// the Arc from the interface.
264struct NodeInner {
265    events_channel: NodeEventsChannel,
266    network: Network,
267    close_group_tracker: Mutex<CloseGroupTracker>,
268    #[cfg(feature = "open-metrics")]
269    metrics_recorder: Option<NodeMetricsRecorder>,
270    reward_address: RewardsAddress,
271    evm_network: EvmNetwork,
272}
273
274impl Node {
275    /// Returns the NodeEventsChannel
276    pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
277        &self.inner.events_channel
278    }
279
280    /// Returns the instance of Network
281    pub(crate) fn network(&self) -> &Network {
282        &self.inner.network
283    }
284
285    fn close_group_tracker(&self) -> &Mutex<CloseGroupTracker> {
286        &self.inner.close_group_tracker
287    }
288
289    #[cfg(feature = "open-metrics")]
290    /// Returns a reference to the NodeMetricsRecorder if the `open-metrics` feature flag is enabled
291    /// This is used to record various metrics for the node.
292    pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
293        self.inner.metrics_recorder.as_ref()
294    }
295
296    /// Returns the reward address of the node
297    pub(crate) fn reward_address(&self) -> &RewardsAddress {
298        &self.inner.reward_address
299    }
300
301    pub(crate) fn evm_network(&self) -> &EvmNetwork {
302        &self.inner.evm_network
303    }
304
305    /// Spawns a task to process for `NetworkEvents`.
306    /// Returns both tasks as JoinHandle<()>.
307    fn run(
308        self,
309        mut network_event_receiver: Receiver<NetworkEvent>,
310        mut shutdown_rx: watch::Receiver<bool>,
311    ) {
312        let mut rng = StdRng::from_entropy();
313
314        let peers_connected = Arc::new(AtomicUsize::new(0));
315
316        let _node_task = spawn(async move {
317            // use a random activity timeout to ensure that the nodes do not sync when messages
318            // are being transmitted.
319            let replication_interval: u64 = rng.gen_range(
320                PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
321            );
322            let replication_interval_time = Duration::from_secs(replication_interval);
323            debug!("Replication interval set to {replication_interval_time:?}");
324
325            let mut replication_interval = tokio::time::interval(replication_interval_time);
326            let _ = replication_interval.tick().await; // first tick completes immediately
327
328            let mut uptime_metrics_update_interval =
329                tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
330            let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately
331
332            // use a random activity timeout to ensure that the nodes do not sync on work,
333            // causing an overall CPU spike.
334            let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
335                UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
336                    ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
337            );
338            let irrelevant_records_cleanup_interval_time =
339                Duration::from_secs(irrelevant_records_cleanup_interval);
340            let mut irrelevant_records_cleanup_interval =
341                tokio::time::interval(irrelevant_records_cleanup_interval_time);
342            let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately
343
344            // use a random neighbour storage challenge ticker to ensure
345            // neighbours do not carryout challenges at the same time
346            let storage_challenge_interval: u64 =
347                rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
348            let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
349            debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
350
351            let mut storage_challenge_interval =
352                tokio::time::interval(storage_challenge_interval_time);
353            let _ = storage_challenge_interval.tick().await; // first tick completes immediately
354
355            loop {
356                let peers_connected = &peers_connected;
357
358                tokio::select! {
359                    // Check for a shutdown command.
360                    result = shutdown_rx.changed() => {
361                        if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
362                            info!("Shutdown signal received or sender dropped. Exiting network events loop.");
363                            break;
364                        }
365                    },
366                    net_event = network_event_receiver.recv() => {
367                        match net_event {
368                            Some(event) => {
369                                let start = Instant::now();
370                                let event_string = format!("{event:?}");
371
372                                self.handle_network_event(event, peers_connected).await;
373                                trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
374
375                            }
376                            None => {
377                                error!("The `NetworkEvent` channel is closed");
378                                self.events_channel().broadcast(NodeEvent::ChannelClosed);
379                                break;
380                            }
381                        }
382                    }
383                    // runs every replication_interval time
384                    _ = replication_interval.tick() => {
385                        let now = Instant::now();
386
387                        {
388                            let mut tracker = self.close_group_tracker().lock().await;
389                            if tracker.handle_timer_expiry(now) {
390                                trace!("Replication timer expired for tracked peers; periodic run will cover it");
391                            }
392                        }
393
394                        let start = now;
395                        let network = self.network().clone();
396                        self.record_metrics(Marker::IntervalReplicationTriggered);
397
398                        let _handle = spawn(async move {
399                            Self::try_interval_replication(network);
400                            trace!("Periodic replication took {:?}", start.elapsed());
401                        });
402                    }
403                    _ = uptime_metrics_update_interval.tick() => {
404                        #[cfg(feature = "open-metrics")]
405                        if let Some(metrics_recorder) = self.metrics_recorder() {
406                            let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
407                        }
408                    }
409                    _ = irrelevant_records_cleanup_interval.tick() => {
410                        let network = self.network().clone();
411
412                        let _handle = spawn(async move {
413                            Self::trigger_irrelevant_record_cleanup(network);
414                        });
415                    }
416                    // runs every storage_challenge_interval time
417                    _ = storage_challenge_interval.tick() => {
418                        let start = Instant::now();
419                        debug!("Periodic storage challenge triggered");
420                        let network = self.network().clone();
421
422                        let _handle = spawn(async move {
423                            Self::storage_challenge(network).await;
424                            trace!("Periodic storage challenge took {:?}", start.elapsed());
425                        });
426                    }
427                }
428            }
429        });
430    }
431
432    /// Calls Marker::log() to insert the marker into the log files.
433    /// Also calls NodeMetrics::record() to record the metric if the `open-metrics` feature flag is enabled.
434    pub(crate) fn record_metrics(&self, marker: Marker) {
435        marker.log();
436        #[cfg(feature = "open-metrics")]
437        if let Some(metrics_recorder) = self.metrics_recorder() {
438            metrics_recorder.record(marker)
439        }
440    }
441
442    // **** Private helpers *****
443
444    /// Handle a network event.
445    /// Spawns a thread for any likely long running tasks
446    async fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
447        let start = Instant::now();
448        let event_string = format!("{event:?}");
449        let event_header;
450
451        // Reducing non-mandatory logging
452        if let NetworkEvent::QueryRequestReceived {
453            query: Query::GetVersion { .. },
454            ..
455        } = event
456        {
457            trace!("Handling NetworkEvent {event_string}");
458        } else {
459            debug!("Handling NetworkEvent {event_string}");
460        }
461
462        match event {
463            NetworkEvent::PeerAdded(peer_id, connected_peers) => {
464                event_header = "PeerAdded";
465                // increment peers_connected and send ConnectedToNetwork event if have connected to K_VALUE peers
466                let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
467                if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
468                    self.events_channel()
469                        .broadcast(NodeEvent::ConnectedToNetwork);
470                }
471
472                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
473                self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
474
475                // try query peer version
476                let network = self.network().clone();
477                let _handle = spawn(async move {
478                    Self::try_query_peer_version(network, peer_id, Default::default()).await;
479                });
480
481                let replication_decision = {
482                    let mut tracker = self.close_group_tracker().lock().await;
483                    tracker.record_peer_added(peer_id)
484                };
485
486                if replication_decision.should_trigger() {
487                    let network = self.network().clone();
488                    self.record_metrics(Marker::IntervalReplicationTriggered);
489                    let _handle = spawn(async move {
490                        Self::try_interval_replication(network);
491                    });
492                } else if matches!(replication_decision, ReplicationDirective::Skip) {
493                    trace!(
494                        "Replication skipped for {peer_id:?} addition due to restart behaviour tracking"
495                    );
496                }
497            }
498            NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
499                event_header = "PeerRemoved";
500                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
501                self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
502
503                let self_id = self.network().peer_id();
504                let distance =
505                    NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
506                info!(
507                    "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
508                    distance.ilog2()
509                );
510
511                let replication_decision = {
512                    let mut tracker = self.close_group_tracker().lock().await;
513                    tracker.record_peer_removed(peer_id)
514                };
515
516                if replication_decision.should_trigger() {
517                    let network = self.network().clone();
518                    self.record_metrics(Marker::IntervalReplicationTriggered);
519                    let _handle = spawn(async move {
520                        Self::try_interval_replication(network);
521                    });
522                } else if matches!(replication_decision, ReplicationDirective::Skip) {
523                    trace!(
524                        "Replication skipped for {peer_id:?} removal due to restart behaviour tracking"
525                    );
526                }
527            }
528            NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
529                event_header = "PeerWithUnsupportedProtocol";
530            }
531            NetworkEvent::NewListenAddr(_) => {
532                event_header = "NewListenAddr";
533            }
534            NetworkEvent::ResponseReceived { res } => {
535                event_header = "ResponseReceived";
536                if let Err(err) = self.handle_response(res) {
537                    error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
538                }
539            }
540            NetworkEvent::KeysToFetchForReplication(keys) => {
541                event_header = "KeysToFetchForReplication";
542                self.record_metrics(Marker::fetching_keys_for_replication(&keys));
543
544                if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
545                    error!("Error while trying to fetch replicated data {err:?}");
546                }
547            }
548            NetworkEvent::QueryRequestReceived { query, channel } => {
549                event_header = "QueryRequestReceived";
550                let node = self.clone();
551                let payment_address = *self.reward_address();
552
553                let _handle = spawn(async move {
554                    let network = node.network().clone();
555                    let res = Self::handle_query(node, query, payment_address).await;
556
557                    // Reducing non-mandatory logging
558                    if let Response::Query(QueryResponse::GetVersion { .. }) = res {
559                        trace!("Sending response {res:?}");
560                    } else {
561                        debug!("Sending response {res:?}");
562                    }
563
564                    network.send_response(res, channel);
565                });
566            }
567            NetworkEvent::UnverifiedRecord(record) => {
568                event_header = "UnverifiedRecord";
569                // queries can be long running and require validation, so we spawn a task to handle them
570                let self_clone = self.clone();
571                let _handle = spawn(async move {
572                    let key = PrettyPrintRecordKey::from(&record.key).into_owned();
573                    match self_clone.validate_and_store_record(record).await {
574                        Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
575                        Err(err) => {
576                            self_clone.record_metrics(Marker::RecordRejected(&key, &err));
577                        }
578                    }
579                });
580            }
581            NetworkEvent::TerminateNode { reason } => {
582                event_header = "TerminateNode";
583                error!("Received termination from swarm_driver due to {reason:?}");
584                self.events_channel()
585                    .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
586            }
587            NetworkEvent::FailedToFetchHolders(bad_nodes) => {
588                event_header = "FailedToFetchHolders";
589                let network = self.network().clone();
590                let pretty_log: Vec<_> = bad_nodes
591                    .iter()
592                    .map(|(peer_id, record_key)| {
593                        let pretty_key = PrettyPrintRecordKey::from(record_key);
594                        (peer_id, pretty_key)
595                    })
596                    .collect();
597                // Note: this log will be checked in CI, and expecting `not appear`.
598                //       any change to the keyword `failed to fetch` shall incur
599                //       correspondent CI script change as well.
600                debug!(
601                    "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
602                );
603                let _handle = spawn(async move {
604                    for (peer_id, record_key) in bad_nodes {
605                        // Obsoleted fetch request (due to flooded in fresh replicates) could result
606                        // in peer to be claimed as bad, as local copy blocks the entry to be cleared.
607                        if let Ok(false) = network.is_record_key_present_locally(&record_key).await
608                        {
609                            error!(
610                                "From peer {peer_id:?}, failed to fetch record {:?}",
611                                PrettyPrintRecordKey::from(&record_key)
612                            );
613                            network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
614                        }
615                    }
616                });
617            }
618            NetworkEvent::QuoteVerification { quotes } => {
619                event_header = "QuoteVerification";
620                let network = self.network().clone();
621
622                let _handle = spawn(async move {
623                    quotes_verification(&network, quotes).await;
624                });
625            }
626            NetworkEvent::FreshReplicateToFetch { holder, keys } => {
627                event_header = "FreshReplicateToFetch";
628                self.fresh_replicate_to_fetch(holder, keys);
629            }
630            NetworkEvent::PeersForVersionQuery(peers) => {
631                event_header = "PeersForVersionQuery";
632                let network = self.network().clone();
633                let _handle = spawn(async move {
634                    Self::query_peers_version(network, peers).await;
635                });
636            }
637            NetworkEvent::NetworkWideReplication { keys } => {
638                event_header = "NetworkWideReplication";
639                self.perform_network_wide_replication(keys);
640            }
641        }
642
643        trace!(
644            "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
645            start.elapsed()
646        );
647    }
648
649    // Handle the response that was not awaited at the call site
650    fn handle_response(&self, response: Response) -> Result<()> {
651        match response {
652            Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
653                // This should actually have been short-circuted when received
654                warn!("Mishandled replicate response, should be handled earlier");
655            }
656            Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
657                error!(
658                    "Response to replication shall be handled by called not by common handler, {resp:?}"
659                );
660            }
661            Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
662                // No need to handle
663            }
664            other => {
665                warn!("handle_response not implemented for {other:?}");
666            }
667        };
668
669        Ok(())
670    }
671
672    async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
673        let network = node.network();
674        let resp: QueryResponse = match query {
675            Query::GetStoreQuote {
676                key,
677                data_type,
678                data_size,
679                nonce,
680                difficulty,
681            } => {
682                let record_key = key.to_record_key();
683                let self_id = network.peer_id();
684
685                let maybe_quoting_metrics = network
686                    .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
687                    .await;
688
689                let storage_proofs = if let Some(nonce) = nonce {
690                    Self::respond_x_closest_record_proof(
691                        network,
692                        key.clone(),
693                        nonce,
694                        difficulty,
695                        false,
696                    )
697                    .await
698                } else {
699                    vec![]
700                };
701
702                match maybe_quoting_metrics {
703                    Ok((quoting_metrics, is_already_stored)) => {
704                        if is_already_stored {
705                            QueryResponse::GetStoreQuote {
706                                quote: Err(ProtocolError::RecordExists(
707                                    PrettyPrintRecordKey::from(&record_key).into_owned(),
708                                )),
709                                peer_address: NetworkAddress::from(self_id),
710                                storage_proofs,
711                            }
712                        } else {
713                            QueryResponse::GetStoreQuote {
714                                quote: Self::create_quote_for_storecost(
715                                    network,
716                                    &key,
717                                    &quoting_metrics,
718                                    &payment_address,
719                                ),
720                                peer_address: NetworkAddress::from(self_id),
721                                storage_proofs,
722                            }
723                        }
724                    }
725                    Err(err) => {
726                        warn!("GetStoreQuote failed for {key:?}: {err}");
727                        QueryResponse::GetStoreQuote {
728                            quote: Err(ProtocolError::GetStoreQuoteFailed),
729                            peer_address: NetworkAddress::from(self_id),
730                            storage_proofs,
731                        }
732                    }
733                }
734            }
735            Query::GetReplicatedRecord { requester: _, key } => {
736                let our_address = NetworkAddress::from(network.peer_id());
737                let record_key = key.to_record_key();
738
739                let result = match network.get_local_record(&record_key).await {
740                    Ok(Some(record)) => Ok((our_address, Bytes::from(record.value))),
741                    Ok(None) => Err(ProtocolError::ReplicatedRecordNotFound {
742                        holder: Box::new(our_address),
743                        key: Box::new(key.clone()),
744                    }),
745                    // Use `PutRecordFailed` as place holder
746                    Err(err) => Err(ProtocolError::PutRecordFailed(format!(
747                        "Error to fetch local record for GetReplicatedRecord {err:?}"
748                    ))),
749                };
750
751                QueryResponse::GetReplicatedRecord(result)
752            }
753            Query::GetChunkExistenceProof {
754                key,
755                nonce,
756                difficulty,
757            } => QueryResponse::GetChunkExistenceProof(
758                Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
759            ),
760            Query::CheckNodeInProblem(target_address) => {
761                debug!("Got CheckNodeInProblem for peer {target_address:?}");
762
763                let is_in_trouble =
764                    if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
765                        result
766                    } else {
767                        debug!("Could not get status of {target_address:?}.");
768                        false
769                    };
770
771                QueryResponse::CheckNodeInProblem {
772                    reporter_address: NetworkAddress::from(network.peer_id()),
773                    target_address,
774                    is_in_trouble,
775                }
776            }
777            Query::GetClosestPeers {
778                key,
779                num_of_peers,
780                range,
781                sign_result,
782            } => {
783                debug!(
784                    "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
785                );
786                Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
787                    .await
788            }
789            Query::GetVersion(_) => QueryResponse::GetVersion {
790                peer: NetworkAddress::from(network.peer_id()),
791                version: ant_build_info::package_version(),
792            },
793            Query::PutRecord {
794                holder,
795                address,
796                serialized_record,
797            } => {
798                let record = Record {
799                    key: address.to_record_key(),
800                    value: serialized_record,
801                    publisher: None,
802                    expires: None,
803                };
804
805                let key = PrettyPrintRecordKey::from(&record.key).into_owned();
806                let result = match node.validate_and_store_record(record).await {
807                    Ok(()) => Ok(()),
808                    Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
809                        node.record_metrics(Marker::RecordRejected(
810                            &key,
811                            &PutValidationError::OutdatedRecordCounter { counter, expected },
812                        ));
813                        Err(ProtocolError::OutdatedRecordCounter { counter, expected })
814                    }
815                    Err(PutValidationError::TopologyVerificationFailed {
816                        target_address,
817                        valid_count,
818                        total_paid,
819                        closest_count,
820                        node_peers,
821                        paid_peers,
822                    }) => {
823                        node.record_metrics(Marker::RecordRejected(
824                            &key,
825                            &PutValidationError::TopologyVerificationFailed {
826                                target_address: target_address.clone(),
827                                valid_count,
828                                total_paid,
829                                closest_count,
830                                node_peers: node_peers.clone(),
831                                paid_peers: paid_peers.clone(),
832                            },
833                        ));
834                        Err(ProtocolError::TopologyVerificationFailed {
835                            target_address: Box::new(target_address),
836                            valid_count,
837                            total_paid,
838                            closest_count,
839                            node_peers,
840                            paid_peers,
841                        })
842                    }
843                    Err(err) => {
844                        node.record_metrics(Marker::RecordRejected(&key, &err));
845                        Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
846                    }
847                };
848                QueryResponse::PutRecord {
849                    result,
850                    peer_address: holder,
851                    record_addr: address,
852                }
853            }
854            Query::GetMerkleCandidateQuote {
855                key,
856                data_type,
857                data_size,
858                merkle_payment_timestamp,
859            } => {
860                Self::respond_merkle_candidate_quote(
861                    network,
862                    key,
863                    data_type,
864                    data_size,
865                    merkle_payment_timestamp,
866                    payment_address,
867                )
868                .await
869            }
870        };
871        Response::Query(resp)
872    }
873
874    async fn respond_get_closest_peers(
875        network: &Network,
876        target: NetworkAddress,
877        num_of_peers: Option<usize>,
878        range: Option<[u8; 32]>,
879        sign_result: bool,
880    ) -> QueryResponse {
881        let local_peers = network.get_local_peers_with_multiaddr().await;
882        let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
883            Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
884        } else {
885            vec![]
886        };
887
888        let signature = if sign_result {
889            let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
890            bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
891            network.sign(&bytes).ok()
892        } else {
893            None
894        };
895
896        QueryResponse::GetClosestPeers {
897            target,
898            peers,
899            signature,
900        }
901    }
902
903    fn calculate_get_closest_peers(
904        peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
905        target: NetworkAddress,
906        num_of_peers: Option<usize>,
907        range: Option<[u8; 32]>,
908    ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
909        match (num_of_peers, range) {
910            (_, Some(value)) => {
911                let distance = U256::from_big_endian(&value);
912                peer_addrs
913                    .iter()
914                    .filter_map(|(peer_id, multi_addrs)| {
915                        let addr = NetworkAddress::from(*peer_id);
916                        if target.distance(&addr).0 <= distance {
917                            Some((addr, multi_addrs.clone()))
918                        } else {
919                            None
920                        }
921                    })
922                    .collect()
923            }
924            (Some(num_of_peers), _) => {
925                let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
926                    .iter()
927                    .map(|(peer_id, multi_addrs)| {
928                        let addr = NetworkAddress::from(*peer_id);
929                        (addr, multi_addrs.clone())
930                    })
931                    .collect();
932                result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
933                result.into_iter().take(num_of_peers).collect()
934            }
935            (None, None) => vec![],
936        }
937    }
938
939    /// Handle GetMerkleCandidateQuote query
940    /// Returns a signed MerklePaymentCandidateNode containing the node's current quoting metrics,
941    /// reward address, and timestamp commitment
942    async fn respond_merkle_candidate_quote(
943        network: &Network,
944        key: NetworkAddress,
945        data_type: u32,
946        data_size: usize,
947        merkle_payment_timestamp: u64,
948        payment_address: RewardsAddress,
949    ) -> QueryResponse {
950        debug!(
951            "merkle payment: GetMerkleCandidateQuote for target {key:?}, timestamp: {merkle_payment_timestamp}, data_type: {data_type}, data_size: {data_size}"
952        );
953
954        // Validate timestamp before signing to prevent committing to invalid times.
955        // Nodes will reject proofs with expired/future timestamps during payment verification,
956        // so signing such timestamps would create useless quotes that can't be used.
957        //
958        // Allow ±24 hours tolerance to handle timezone differences and clock skew between
959        // clients and nodes. This prevents valid payments from being rejected due to minor
960        // time differences while still catching truly expired/invalid timestamps.
961        const TIMESTAMP_TOLERANCE: u64 = 24 * 60 * 60; // 24 hours
962
963        let now = std::time::SystemTime::now()
964            .duration_since(std::time::UNIX_EPOCH)
965            .unwrap_or_default()
966            .as_secs();
967
968        // Reject future timestamps (with 24h tolerance for clock skew)
969        let future_threshold = now + TIMESTAMP_TOLERANCE;
970        if merkle_payment_timestamp > future_threshold {
971            let error_msg = format!(
972                "Rejected future timestamp {merkle_payment_timestamp} (current time: {now}, threshold: {future_threshold})"
973            );
974            warn!("{error_msg} for {key:?}");
975            return QueryResponse::GetMerkleCandidateQuote(Err(
976                ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
977            ));
978        }
979
980        // Reject expired timestamps (with 24h tolerance for clock skew)
981        let expiration_threshold = MERKLE_PAYMENT_EXPIRATION + TIMESTAMP_TOLERANCE;
982        let age = now.saturating_sub(merkle_payment_timestamp);
983        if age > expiration_threshold {
984            let error_msg = format!(
985                "Rejected expired timestamp {merkle_payment_timestamp} (age: {age}s, max: {expiration_threshold}s)",
986            );
987            warn!("{error_msg} for {key:?}");
988            return QueryResponse::GetMerkleCandidateQuote(Err(
989                ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
990            ));
991        }
992
993        // Get node's current quoting metrics
994        let record_key = key.to_record_key();
995        let (quoting_metrics, _is_already_stored) = match network
996            .get_local_quoting_metrics(record_key, data_type, data_size)
997            .await
998        {
999            Ok(metrics) => metrics,
1000            Err(err) => {
1001                let error_msg = format!("Failed to get quoting metrics for {key:?}: {err}");
1002                warn!("{error_msg}");
1003                return QueryResponse::GetMerkleCandidateQuote(Err(
1004                    ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1005                ));
1006            }
1007        };
1008
1009        // Create the MerklePaymentCandidateNode with node's signed commitment
1010        let pub_key = network.get_pub_key();
1011        let reward_address = payment_address;
1012        let bytes = ant_evm::merkle_payments::MerklePaymentCandidateNode::bytes_to_sign(
1013            &quoting_metrics,
1014            &reward_address,
1015            merkle_payment_timestamp,
1016        );
1017        let signature = match network.sign(&bytes) {
1018            Ok(sig) => sig,
1019            Err(e) => {
1020                let error_msg = format!("Failed to sign candidate node for {key:?}: {e}");
1021                error!("{error_msg}");
1022                return QueryResponse::GetMerkleCandidateQuote(Err(
1023                    ProtocolError::FailedToSignMerkleCandidate(error_msg),
1024                ));
1025            }
1026        };
1027
1028        let candidate = ant_evm::merkle_payments::MerklePaymentCandidateNode {
1029            quoting_metrics,
1030            reward_address,
1031            merkle_payment_timestamp,
1032            pub_key,
1033            signature,
1034        };
1035        QueryResponse::GetMerkleCandidateQuote(Ok(candidate))
1036    }
1037
1038    // Nodes only check ChunkProof each other, to avoid `multi-version` issue
1039    // Client check proof against all records, as have to fetch from network anyway.
1040    async fn respond_x_closest_record_proof(
1041        network: &Network,
1042        key: NetworkAddress,
1043        nonce: Nonce,
1044        difficulty: usize,
1045        chunk_only: bool,
1046    ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
1047        let start = Instant::now();
1048        let mut results = vec![];
1049        if difficulty == 1 {
1050            // Client checking existence of published chunk.
1051            let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
1052            if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
1053                let proof = ChunkProof::new(&record.value, nonce);
1054                debug!("Chunk proof for {key:?} is {proof:?}");
1055                result = Ok(proof)
1056            } else {
1057                debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
1058            }
1059
1060            results.push((key.clone(), result));
1061        } else {
1062            let all_local_records = network.get_all_local_record_addresses().await;
1063
1064            if let Ok(all_local_records) = all_local_records {
1065                let mut all_chunk_addrs: Vec<_> = if chunk_only {
1066                    all_local_records
1067                        .iter()
1068                        .filter_map(|(addr, record_type)| {
1069                            if *record_type == ValidationType::Chunk {
1070                                Some(addr.clone())
1071                            } else {
1072                                None
1073                            }
1074                        })
1075                        .collect()
1076                } else {
1077                    all_local_records.keys().cloned().collect()
1078                };
1079
1080                // Sort by distance and only take first X closest entries
1081                all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
1082
1083                // TODO: this shall be deduced from resource usage dynamically
1084                let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
1085
1086                for addr in all_chunk_addrs.iter().take(workload_factor) {
1087                    if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
1088                    {
1089                        let proof = ChunkProof::new(&record.value, nonce);
1090                        debug!("Chunk proof for {key:?} is {proof:?}");
1091                        results.push((addr.clone(), Ok(proof)));
1092                    }
1093                }
1094            }
1095
1096            info!(
1097                "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
1098                results.len(),
1099                start.elapsed()
1100            );
1101        }
1102
1103        results
1104    }
1105
1106    /// Check among all chunk type records that we have,
1107    /// and randomly pick one as the verification candidate.
1108    /// This will challenge all closest peers at once.
1109    async fn storage_challenge(network: Network) {
1110        let start = Instant::now();
1111        let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
1112            network.get_k_closest_local_peers_to_the_target(None).await
1113        {
1114            closest_peers
1115                .into_iter()
1116                .take(CLOSE_GROUP_SIZE)
1117                .collect_vec()
1118        } else {
1119            error!("Cannot get local neighbours");
1120            return;
1121        };
1122        if closest_peers.len() < CLOSE_GROUP_SIZE {
1123            debug!(
1124                "Not enough neighbours ({}/{}) to carry out storage challenge.",
1125                closest_peers.len(),
1126                CLOSE_GROUP_SIZE
1127            );
1128            return;
1129        }
1130
1131        let mut verify_candidates: Vec<NetworkAddress> =
1132            if let Ok(all_keys) = network.get_all_local_record_addresses().await {
1133                all_keys
1134                    .iter()
1135                    .filter_map(|(addr, record_type)| {
1136                        if ValidationType::Chunk == *record_type {
1137                            Some(addr.clone())
1138                        } else {
1139                            None
1140                        }
1141                    })
1142                    .collect()
1143            } else {
1144                error!("Failed to get local record addresses.");
1145                return;
1146            };
1147        let num_of_targets = verify_candidates.len();
1148        if num_of_targets < 50 {
1149            debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
1150            return;
1151        }
1152
1153        // To ensure the neighbours sharing same knowledge as to us,
1154        // The target is choosen to be not far from us.
1155        let self_addr = NetworkAddress::from(network.peer_id());
1156        verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
1157        let index: usize = OsRng.gen_range(0..num_of_targets / 2);
1158        let target = verify_candidates[index].clone();
1159        // TODO: workload shall be dynamically deduced from resource usage
1160        let difficulty = CLOSE_GROUP_SIZE;
1161        verify_candidates.sort_by_key(|addr| target.distance(addr));
1162        let expected_targets = verify_candidates.into_iter().take(difficulty);
1163        let nonce: Nonce = thread_rng().r#gen::<u64>();
1164        let mut expected_proofs = HashMap::new();
1165        for addr in expected_targets {
1166            if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
1167                let expected_proof = ChunkProof::new(&record.value, nonce);
1168                let _ = expected_proofs.insert(addr, expected_proof);
1169            } else {
1170                error!("Local record {addr:?} cann't be loaded from disk.");
1171            }
1172        }
1173        let request = Request::Query(Query::GetChunkExistenceProof {
1174            key: target.clone(),
1175            nonce,
1176            difficulty,
1177        });
1178
1179        let mut tasks = JoinSet::new();
1180        for (peer_id, addresses) in closest_peers {
1181            if peer_id == network.peer_id() {
1182                continue;
1183            }
1184            let network_clone = network.clone();
1185            let request_clone = request.clone();
1186            let expected_proofs_clone = expected_proofs.clone();
1187            let _ = tasks.spawn(async move {
1188                let res = scoring_peer(
1189                    network_clone,
1190                    (peer_id, addresses),
1191                    request_clone,
1192                    expected_proofs_clone,
1193                )
1194                .await;
1195                (peer_id, res)
1196            });
1197        }
1198
1199        let mut peer_scores = vec![];
1200        while let Some(res) = tasks.join_next().await {
1201            match res {
1202                Ok((peer_id, score)) => {
1203                    let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
1204                    if !is_healthy {
1205                        info!(
1206                            "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
1207                        );
1208                        // TODO: shall the challenge failure immediately triggers the node to be removed?
1209                        network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
1210                    }
1211                    peer_scores.push((peer_id, is_healthy));
1212                }
1213                Err(e) => {
1214                    info!("StorageChallenge task completed with error {e:?}");
1215                }
1216            }
1217        }
1218        if !peer_scores.is_empty() {
1219            network.notify_peer_scores(peer_scores);
1220        }
1221
1222        Self::verify_local_replication_health(network.clone()).await;
1223
1224        info!(
1225            "Completed node StorageChallenge against neighbours in {:?}!",
1226            start.elapsed()
1227        );
1228    }
1229
1230    /// Perform a direct replicated record fetch from nearby peers to verify replication health.
1231    async fn verify_local_replication_health(network: Network) {
1232        let closest_peers = match network.get_k_closest_local_peers_to_the_target(None).await {
1233            Ok(peers) => peers,
1234            Err(err) => {
1235                warn!("Cannot fetch closest peers for replica verification: {err:?}");
1236                return;
1237            }
1238        };
1239
1240        if closest_peers.len() <= CLOSE_NEIGHBOUR_DISTANCE_INDEX {
1241            debug!(
1242                "Skipping replica verification as we only know {} neighbours (need > {}).",
1243                closest_peers.len(),
1244                CLOSE_NEIGHBOUR_DISTANCE_INDEX
1245            );
1246            return;
1247        }
1248
1249        let self_address = NetworkAddress::from(network.peer_id());
1250        let Some((threshold_peer, _)) = closest_peers.get(CLOSE_NEIGHBOUR_DISTANCE_INDEX) else {
1251            debug!("Unable to determine distance threshold for replica verification.");
1252            return;
1253        };
1254
1255        let threshold_distance = self_address.distance(&NetworkAddress::from(*threshold_peer));
1256        let Some(threshold_ilog2) = threshold_distance.ilog2() else {
1257            debug!("Threshold distance lacks ilog2; cannot proceed with replica verification.");
1258            return;
1259        };
1260
1261        let local_records = match network.get_all_local_record_addresses().await {
1262            Ok(records) => records,
1263            Err(err) => {
1264                warn!("Failed to list local records for replica verification: {err:?}");
1265                return;
1266            }
1267        };
1268
1269        let mut nearby_records: Vec<NetworkAddress> = local_records
1270            .into_iter()
1271            .filter_map(|(address, record_type)| {
1272                if record_type != ValidationType::Chunk {
1273                    return None;
1274                }
1275                let distance = self_address.distance(&address);
1276                distance.ilog2().and_then(|record_ilog2| {
1277                    if record_ilog2 <= threshold_ilog2 {
1278                        Some(address)
1279                    } else {
1280                        None
1281                    }
1282                })
1283            })
1284            .collect();
1285
1286        nearby_records.shuffle(&mut thread_rng());
1287        let target_record = if let Some(entry) = nearby_records.first().cloned() {
1288            entry
1289        } else {
1290            debug!("No nearby chunk records available for replica verification.");
1291            return;
1292        };
1293
1294        let pretty_key = PrettyPrintRecordKey::from(&target_record.to_record_key()).into_owned();
1295
1296        let candidate_peers = match network
1297            .get_k_closest_local_peers_to_the_target(Some(target_record.clone()))
1298            .await
1299        {
1300            Ok(peers) => peers
1301                .into_iter()
1302                .filter(|(peer_id, _)| peer_id != &network.peer_id())
1303                .take(REPLICA_FETCH_PEER_COUNT)
1304                .collect::<Vec<_>>(),
1305            Err(err) => {
1306                warn!(
1307                    "Cannot fetch record-specific closest peers for replica verification: {err:?}"
1308                );
1309                return;
1310            }
1311        };
1312
1313        if candidate_peers.len() < REPLICA_FETCH_PEER_COUNT {
1314            debug!(
1315                "Only {} peers available for replica verification (need at least {}).",
1316                candidate_peers.len(),
1317                REPLICA_FETCH_PEER_COUNT
1318            );
1319            return;
1320        }
1321
1322        debug!(
1323            "Verifying replicated record {pretty_key:?} against {} closest peers.",
1324            candidate_peers.len()
1325        );
1326
1327        let (successful_peers, failed_peers) = Self::fetch_record_from_peers_with_addresses(
1328            network.clone(),
1329            target_record.clone(),
1330            candidate_peers,
1331        )
1332        .await;
1333
1334        if failed_peers.is_empty() {
1335            debug!("All peers returned record {pretty_key:?} during replica verification.");
1336            return;
1337        }
1338
1339        if successful_peers.len() < MIN_HEALTHY_REPLICA_COUNT {
1340            warn!(
1341                "Replica verification fetched only {} copies of {pretty_key:?}. Record is unhealthy; skipping peer classification.",
1342                successful_peers.len()
1343            );
1344            return;
1345        }
1346
1347        if !failed_peers.is_empty() {
1348            info!(
1349                "Scheduling replica verification retry for {} peers on record {pretty_key:?}.",
1350                failed_peers.len()
1351            );
1352            Self::schedule_record_fetch_retry(network.clone(), target_record.clone(), failed_peers)
1353                .await;
1354        }
1355    }
1356
1357    /// Fetch a replicated record from the provided peers and return the success/failure split.
1358    async fn fetch_record_from_peers_with_addresses(
1359        network: Network,
1360        record_address: NetworkAddress,
1361        peers: Vec<(PeerId, Addresses)>,
1362    ) -> (Vec<PeerId>, Vec<PeerId>) {
1363        let request = Request::Query(Query::GetReplicatedRecord {
1364            requester: NetworkAddress::from(network.peer_id()),
1365            key: record_address.clone(),
1366        });
1367        let expected_key = record_address.to_record_key();
1368        let pretty_key = PrettyPrintRecordKey::from(&expected_key).into_owned();
1369
1370        let mut successes = Vec::new();
1371        let mut failures = Vec::new();
1372        let concurrency = peers.len();
1373        let results = stream::iter(peers.into_iter().map(|(peer_id, addrs)| {
1374            let network_clone = network.clone();
1375            let request_clone = request.clone();
1376            async move {
1377                let result = network_clone
1378                    .send_request(request_clone, peer_id, addrs.clone())
1379                    .await;
1380                (peer_id, addrs, result)
1381            }
1382        }))
1383        .buffer_unordered(concurrency)
1384        .collect::<Vec<_>>()
1385        .await;
1386
1387        for res in results {
1388            match res {
1389                (
1390                    peer_id,
1391                    _addrs,
1392                    Ok((Response::Query(QueryResponse::GetReplicatedRecord(result)), _)),
1393                ) => {
1394                    match result {
1395                        Ok((_holder, value)) => {
1396                            // Try to deserialize the record and ensure the Chunk content matches
1397                            let record =
1398                                Record::new(record_address.to_record_key(), value.to_vec());
1399                            if let Ok(chunk) = try_deserialize_record::<Chunk>(&record)
1400                                && chunk.network_address().to_record_key() == expected_key
1401                            {
1402                                successes.push(peer_id);
1403                            } else {
1404                                warn!(
1405                                    "Peer {peer_id:?} responded with an incorrect chunk copy of {pretty_key:?}."
1406                                );
1407                                failures.push(peer_id);
1408                            }
1409                        }
1410                        Err(err) => {
1411                            info!(
1412                                "Peer {peer_id:?} responded with error {err:?} for replicated record {pretty_key:?}."
1413                            );
1414                            failures.push(peer_id);
1415                        }
1416                    }
1417                }
1418                (peer_id, _addrs, Ok((other_response, _))) => {
1419                    warn!(
1420                        "Peer {peer_id:?} responded with unexpected message {other_response:?} for {pretty_key:?}."
1421                    );
1422                    failures.push(peer_id);
1423                }
1424                (peer_id, _addrs, Err(err)) => {
1425                    info!(
1426                        "Failed to reach peer {peer_id:?} for replicated record {pretty_key:?}: {err:?}"
1427                    );
1428                    failures.push(peer_id);
1429                }
1430            }
1431        }
1432
1433        (successes, failures)
1434    }
1435
1436    /// Schedule a future retry for peers that failed to return the replicated record.
1437    async fn schedule_record_fetch_retry(
1438        network: Network,
1439        record_address: NetworkAddress,
1440        failed_peers: Vec<PeerId>,
1441    ) {
1442        let retry_peers: HashSet<_> = failed_peers.into_iter().collect();
1443        if retry_peers.is_empty() {
1444            return;
1445        }
1446
1447        let record_clone = record_address.clone();
1448        let network_clone = network.clone();
1449
1450        tokio::time::sleep(REPLICA_FETCH_RETRY_DELAY).await;
1451        let pretty_key = PrettyPrintRecordKey::from(&record_clone.to_record_key()).into_owned();
1452
1453        let refreshed_candidates =
1454            Self::refresh_retry_candidate_addresses(&network_clone, &record_clone, &retry_peers)
1455                .await;
1456
1457        if refreshed_candidates.is_empty() {
1458            info!(
1459                "Skipping replica retry for {pretty_key:?}; no tracked peers remain close to the record."
1460            );
1461            return;
1462        }
1463
1464        let (_, still_failed) = Self::fetch_record_from_peers_with_addresses(
1465            network_clone.clone(),
1466            record_clone.clone(),
1467            refreshed_candidates,
1468        )
1469        .await;
1470
1471        if still_failed.is_empty() {
1472            info!("All peers successfully returned {pretty_key:?} during replica retry.");
1473            return;
1474        }
1475
1476        warn!(
1477            "{} peers still failed to provide {pretty_key:?}; evicting and blacklisting.",
1478            still_failed.len()
1479        );
1480        for peer_id in still_failed {
1481            Self::evict_and_blacklist_peer(network_clone.clone(), peer_id);
1482        }
1483    }
1484
1485    /// Get the latest address information for tracked peers via a DHT query.
1486    async fn refresh_retry_candidate_addresses(
1487        network: &Network,
1488        record_address: &NetworkAddress,
1489        retry_peers: &HashSet<PeerId>,
1490    ) -> Vec<(PeerId, Addresses)> {
1491        let pretty_key = PrettyPrintRecordKey::from(&record_address.to_record_key()).into_owned();
1492        let Ok(closest_peers) = network.get_closest_peers(record_address).await else {
1493            warn!(
1494                "Failed to refresh peer addresses for {pretty_key:?}; unable to retry replica fetch."
1495            );
1496            return Vec::new();
1497        };
1498
1499        let closest_map: HashMap<PeerId, Addresses> = closest_peers.into_iter().collect();
1500
1501        retry_peers
1502            .iter()
1503            .filter_map(|peer_id| {
1504                closest_map
1505                    .get(peer_id)
1506                    .cloned()
1507                    .map(|addrs| (*peer_id, addrs))
1508            })
1509            .collect()
1510    }
1511
1512    /// Remove the peer from the routing table and add it to the blacklist.
1513    fn evict_and_blacklist_peer(network: Network, peer_id: PeerId) {
1514        network.remove_peer(peer_id);
1515        network.add_peer_to_blocklist(peer_id);
1516    }
1517
1518    /// Query peers' versions and update local knowledge.
1519    async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1520        // To avoid choking, carry out the queries one by one
1521        for (peer_id, addrs) in peers {
1522            Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1523        }
1524    }
1525
1526    /// Query peer's version and update local knowledge.
1527    async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1528        let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1529        // We can skip passing `addrs` here as the new peer should be part of the kad::RT and swarm can get the addr.
1530        let version = match network.send_request(request, peer, addrs).await {
1531            Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1532                trace!("Fetched peer version {peer:?} as {version:?}");
1533                version
1534            }
1535            Ok(other) => {
1536                info!("Not a fetched peer version {peer:?}, {other:?}");
1537                "none".to_string()
1538            }
1539            Err(err) => {
1540                info!("Failed to fetch peer version {peer:?} with error {err:?}");
1541                // Failed version fetch (which contains dial then re-attempt by itself)
1542                // with error of `DialFailure` indicates the peer could be dead with high chance.
1543                // In that case, the peer shall be removed from the routing table.
1544                if let NetworkError::OutboundError(OutboundFailure::DialFailure) = err {
1545                    network.remove_peer(peer);
1546                    return;
1547                }
1548                "old".to_string()
1549            }
1550        };
1551        network.notify_node_version(peer, version);
1552    }
1553}
1554
1555#[derive(Debug)]
1556struct CloseGroupTracker {
1557    self_address: NetworkAddress,
1558    /// Tracks the closest N peers by (distance, peer_id), sorted by distance to self.
1559    /// Using a tuple key handles the theoretical case where two peers have identical
1560    /// XOR distance to self.
1561    close_up_peers: BTreeSet<(Distance, PeerId)>,
1562    tracked_entries: HashMap<PeerId, BehaviourEntry>,
1563}
1564
1565impl CloseGroupTracker {
1566    fn new(self_peer_id: PeerId) -> Self {
1567        Self {
1568            self_address: NetworkAddress::from(self_peer_id),
1569            close_up_peers: BTreeSet::new(),
1570            tracked_entries: HashMap::new(),
1571        }
1572    }
1573
1574    fn record_peer_added(&mut self, peer_id: PeerId) -> ReplicationDirective {
1575        let distance = self.distance_to_peer(peer_id);
1576
1577        // Check if this peer was already present
1578        let was_present = self.close_up_peers.contains(&(distance, peer_id));
1579
1580        // Determine if the peer is close enough to track
1581        let is_close_enough = self.is_distance_within_close_range(distance);
1582
1583        if is_close_enough {
1584            // Add to close_up_peers
1585            let _ = self.close_up_peers.insert((distance, peer_id));
1586
1587            // If we exceed the limit, remove the farthest peer
1588            if self.close_up_peers.len() > CLOSE_GROUP_TRACKING_LIMIT
1589                && let Some(&(farthest_distance, farthest_peer)) =
1590                    self.close_up_peers.iter().next_back()
1591            {
1592                let _ = self
1593                    .close_up_peers
1594                    .remove(&(farthest_distance, farthest_peer));
1595            }
1596        } else {
1597            // Too far, don't track and clean up any existing entry
1598            let _ = self.tracked_entries.remove(&peer_id);
1599            return ReplicationDirective::Ignore;
1600        }
1601
1602        use std::collections::hash_map::Entry;
1603        match self.tracked_entries.entry(peer_id) {
1604            Entry::Vacant(vacant_entry) => {
1605                let _ = vacant_entry.insert(BehaviourEntry::default());
1606                if !was_present {
1607                    ReplicationDirective::Trigger
1608                } else {
1609                    ReplicationDirective::Ignore
1610                }
1611            }
1612            Entry::Occupied(mut occupied_entry) => {
1613                let entry = occupied_entry.get_mut();
1614                if entry.awaiting_rejoin || entry.timer_deadline.is_some() {
1615                    entry.restart_detected = true;
1616                    entry.awaiting_rejoin = false;
1617                    entry.timer_deadline = None;
1618                    ReplicationDirective::Skip
1619                } else if !was_present {
1620                    entry.restart_detected = false;
1621                    ReplicationDirective::Trigger
1622                } else {
1623                    ReplicationDirective::Ignore
1624                }
1625            }
1626        }
1627    }
1628
1629    fn record_peer_removed(&mut self, peer_id: PeerId) -> ReplicationDirective {
1630        let distance = self.distance_to_peer(peer_id);
1631
1632        // Check if this peer is in our close_up_peers
1633        let was_tracked = self.close_up_peers.contains(&(distance, peer_id));
1634
1635        if !was_tracked {
1636            // Not being tracked, i.e. being out-of-range, hence Ignore
1637            return ReplicationDirective::Ignore;
1638        }
1639
1640        // Remove from close_up_peers
1641        let _ = self.close_up_peers.remove(&(distance, peer_id));
1642
1643        let entry = self.tracked_entries.entry(peer_id).or_default();
1644
1645        if entry.restart_detected {
1646            entry.timer_deadline = Some(Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION);
1647            entry.awaiting_rejoin = false;
1648            ReplicationDirective::Skip
1649        } else {
1650            entry.awaiting_rejoin = true;
1651            entry.timer_deadline = None;
1652            ReplicationDirective::Trigger
1653        }
1654    }
1655
1656    fn handle_timer_expiry(&mut self, now: Instant) -> bool {
1657        let mut expired = false;
1658        for entry in self.tracked_entries.values_mut() {
1659            if let Some(deadline) = entry.timer_deadline
1660                && now >= deadline
1661            {
1662                entry.timer_deadline = None;
1663                entry.restart_detected = false;
1664                entry.awaiting_rejoin = false;
1665                expired = true;
1666            }
1667        }
1668
1669        // Keep only entries that are still relevant
1670        self.tracked_entries.retain(|peer_id, entry| {
1671            let is_currently_present = self
1672                .close_up_peers
1673                .iter()
1674                .any(|(_, present_peer)| present_peer == peer_id);
1675
1676            entry.awaiting_rejoin
1677                || entry.restart_detected
1678                || entry.timer_deadline.is_some()
1679                || is_currently_present
1680        });
1681
1682        expired
1683    }
1684
1685    fn is_distance_within_close_range(&self, distance: Distance) -> bool {
1686        // If we haven't reached the limit yet, any peer is close enough
1687        if self.close_up_peers.len() < CLOSE_GROUP_TRACKING_LIMIT {
1688            return true;
1689        }
1690
1691        // Otherwise, check if this distance is closer than the farthest tracked peer
1692        if let Some(&(farthest_distance, _)) = self.close_up_peers.iter().next_back() {
1693            distance < farthest_distance
1694        } else {
1695            true
1696        }
1697    }
1698
1699    fn distance_to_peer(&self, peer_id: PeerId) -> Distance {
1700        self.self_address.distance(&NetworkAddress::from(peer_id))
1701    }
1702}
1703
1704#[derive(Debug, Default)]
1705struct BehaviourEntry {
1706    awaiting_rejoin: bool,
1707    restart_detected: bool,
1708    timer_deadline: Option<Instant>,
1709}
1710
1711#[derive(Debug, PartialEq, Eq)]
1712enum ReplicationDirective {
1713    Trigger,
1714    Skip,
1715    Ignore,
1716}
1717
1718impl ReplicationDirective {
1719    fn should_trigger(&self) -> bool {
1720        matches!(self, Self::Trigger)
1721    }
1722}
1723
1724#[cfg(test)]
1725mod close_group_tracker_tests {
1726    use super::*;
1727    use std::time::Duration;
1728
1729    fn random_peer() -> PeerId {
1730        let keypair = libp2p::identity::Keypair::generate_ed25519();
1731        PeerId::from(keypair.public())
1732    }
1733
1734    #[test]
1735    fn new_peer_triggers_replication() {
1736        let mut tracker = CloseGroupTracker::new(random_peer());
1737        let peer = random_peer();
1738
1739        assert_eq!(
1740            tracker.record_peer_added(peer),
1741            ReplicationDirective::Trigger
1742        );
1743
1744        assert_eq!(
1745            tracker.record_peer_removed(peer),
1746            ReplicationDirective::Trigger
1747        );
1748    }
1749
1750    #[test]
1751    fn restart_detection_skips_replication() {
1752        let mut tracker = CloseGroupTracker::new(random_peer());
1753        let peer = random_peer();
1754
1755        assert_eq!(
1756            tracker.record_peer_added(peer),
1757            ReplicationDirective::Trigger
1758        );
1759
1760        assert_eq!(
1761            tracker.record_peer_removed(peer),
1762            ReplicationDirective::Trigger
1763        );
1764
1765        assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1766
1767        assert_eq!(
1768            tracker.record_peer_removed(peer),
1769            ReplicationDirective::Skip
1770        );
1771
1772        assert!(tracker.handle_timer_expiry(
1773            Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1)
1774        ));
1775
1776        assert_eq!(
1777            tracker.record_peer_added(peer),
1778            ReplicationDirective::Trigger
1779        );
1780    }
1781
1782    #[test]
1783    fn peer_outside_close_group_is_ignored() {
1784        let mut tracker = CloseGroupTracker::new(random_peer());
1785
1786        // Fill the tracker to capacity with random peers
1787        let mut added_peers = Vec::new();
1788        for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1789            let peer = random_peer();
1790            let result = tracker.record_peer_added(peer);
1791            // All should trigger since we're under capacity
1792            assert_eq!(result, ReplicationDirective::Trigger);
1793            added_peers.push(peer);
1794        }
1795
1796        assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1797
1798        // Now add more peers - some will be ignored if they're farther than existing ones
1799        let mut ignored_count = 0;
1800        let mut trigger_count = 0;
1801        for _ in 0..50 {
1802            let peer = random_peer();
1803            match tracker.record_peer_added(peer) {
1804                ReplicationDirective::Ignore => ignored_count += 1,
1805                ReplicationDirective::Trigger => trigger_count += 1,
1806                _ => {}
1807            }
1808        }
1809
1810        // At capacity, new peers are only accepted if closer than the farthest
1811        // Some should be ignored (too far), some should trigger (closer than farthest)
1812        assert!(
1813            ignored_count > 0 || trigger_count > 0,
1814            "Expected some peers to be processed"
1815        );
1816
1817        // Tracker should still only have CLOSE_GROUP_TRACKING_LIMIT peers
1818        assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1819    }
1820
1821    #[test]
1822    fn removal_of_untracked_peer_is_ignored() {
1823        let mut tracker = CloseGroupTracker::new(random_peer());
1824
1825        // Try to remove a peer that was never added
1826        let unknown_peer = random_peer();
1827        assert_eq!(
1828            tracker.record_peer_removed(unknown_peer),
1829            ReplicationDirective::Ignore
1830        );
1831    }
1832
1833    #[test]
1834    fn timer_expiry_resets_restart_state() {
1835        let mut tracker = CloseGroupTracker::new(random_peer());
1836        let peer = random_peer();
1837
1838        // Add and remove peer (normal behavior)
1839        assert_eq!(
1840            tracker.record_peer_added(peer),
1841            ReplicationDirective::Trigger
1842        );
1843        assert_eq!(
1844            tracker.record_peer_removed(peer),
1845            ReplicationDirective::Trigger
1846        );
1847
1848        // Peer rejoins quickly - detected as restart, skipped
1849        assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1850
1851        // Remove again - still in restart mode, skipped with timer set
1852        assert_eq!(
1853            tracker.record_peer_removed(peer),
1854            ReplicationDirective::Skip
1855        );
1856
1857        // Timer hasn't expired yet - no change
1858        assert!(!tracker.handle_timer_expiry(Instant::now()));
1859
1860        // Timer expires
1861        let after_expiry =
1862            Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1863        assert!(tracker.handle_timer_expiry(after_expiry));
1864
1865        // Now the peer should trigger replication again (state reset)
1866        assert_eq!(
1867            tracker.record_peer_added(peer),
1868            ReplicationDirective::Trigger
1869        );
1870    }
1871
1872    #[test]
1873    fn eviction_maintains_closest_peers() {
1874        let self_peer = random_peer();
1875        let mut tracker = CloseGroupTracker::new(self_peer);
1876
1877        // Add exactly CLOSE_GROUP_TRACKING_LIMIT peers
1878        let mut peers_with_distances: Vec<(PeerId, Distance)> = Vec::new();
1879        for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1880            let peer = random_peer();
1881            let distance = tracker.distance_to_peer(peer);
1882            let _ = tracker.record_peer_added(peer);
1883            peers_with_distances.push((peer, distance));
1884        }
1885
1886        assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1887
1888        // Sort by distance to know which is farthest
1889        peers_with_distances.sort_by_key(|(_, d)| *d);
1890        let farthest_distance = peers_with_distances.last().map(|(_, d)| *d).unwrap();
1891
1892        // Add a new peer that's closer than the farthest
1893        // Keep trying until we find one (random, so may take a few tries)
1894        let mut found_closer = false;
1895        for _ in 0..100 {
1896            let new_peer = random_peer();
1897            let new_distance = tracker.distance_to_peer(new_peer);
1898            if new_distance < farthest_distance {
1899                let result = tracker.record_peer_added(new_peer);
1900                assert_eq!(result, ReplicationDirective::Trigger);
1901                // Should still have exactly CLOSE_GROUP_TRACKING_LIMIT peers
1902                assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1903                found_closer = true;
1904                break;
1905            }
1906        }
1907
1908        // Note: This assertion might occasionally fail due to randomness,
1909        // but with 100 attempts it's extremely unlikely
1910        assert!(
1911            found_closer,
1912            "Could not find a peer closer than the farthest in 100 attempts"
1913        );
1914    }
1915
1916    #[test]
1917    fn tracked_entries_cleaned_up_on_timer_expiry() {
1918        let mut tracker = CloseGroupTracker::new(random_peer());
1919        let peer = random_peer();
1920
1921        // Add, remove, re-add (restart detected), remove again (timer set)
1922        let _ = tracker.record_peer_added(peer);
1923        let _ = tracker.record_peer_removed(peer);
1924        let _ = tracker.record_peer_added(peer);
1925        let _ = tracker.record_peer_removed(peer);
1926
1927        // Peer should have a tracked entry with timer
1928        assert!(tracker.tracked_entries.contains_key(&peer));
1929
1930        // After timer expiry, entry should be cleaned up since peer is not present
1931        let after_expiry =
1932            Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1933        let _ = tracker.handle_timer_expiry(after_expiry);
1934
1935        // Entry should be removed since peer is not in close_up_peers
1936        // and no longer has active flags
1937        assert!(!tracker.tracked_entries.contains_key(&peer));
1938    }
1939}
1940
1941async fn scoring_peer(
1942    network: Network,
1943    peer: (PeerId, Addresses),
1944    request: Request,
1945    expected_proofs: HashMap<NetworkAddress, ChunkProof>,
1946) -> usize {
1947    let peer_id = peer.0;
1948    let start = Instant::now();
1949    let responses = network
1950        .send_and_get_responses(&[peer], &request, true)
1951        .await;
1952
1953    if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
1954        responses.get(&peer_id)
1955    {
1956        if answers.is_empty() {
1957            info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
1958            return 0;
1959        }
1960        let elapsed = start.elapsed();
1961
1962        let mut received_proofs = vec![];
1963        for (addr, proof) in answers {
1964            if let Ok(proof) = proof {
1965                received_proofs.push((addr.clone(), proof.clone()));
1966            }
1967        }
1968
1969        let score = mark_peer(elapsed, received_proofs, &expected_proofs);
1970        info!(
1971            "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
1972            answers.len()
1973        );
1974        score
1975    } else {
1976        info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
1977        0
1978    }
1979}
1980
1981// Based on following metrics:
1982//   * the duration
1983//   * is there false answer
1984//   * percentage of correct answers among the expected closest
1985// The higher the score, the better confidence on the peer
1986fn mark_peer(
1987    duration: Duration,
1988    answers: Vec<(NetworkAddress, ChunkProof)>,
1989    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1990) -> usize {
1991    let duration_score = duration_score_scheme(duration);
1992    let challenge_score = challenge_score_scheme(answers, expected_proofs);
1993
1994    duration_score * challenge_score
1995}
1996
1997// Less duration shall get higher score
1998fn duration_score_scheme(duration: Duration) -> usize {
1999    // So far just a simple stepped scheme, capped by HIGHEST_SCORE
2000    let in_ms = if let Some(value) = duration.as_millis().to_usize() {
2001        value
2002    } else {
2003        info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
2004        1000
2005    };
2006
2007    let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
2008    HIGHEST_SCORE - step
2009}
2010
2011// Any false answer shall result in 0 score immediately
2012fn challenge_score_scheme(
2013    answers: Vec<(NetworkAddress, ChunkProof)>,
2014    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
2015) -> usize {
2016    let mut correct_answers = 0;
2017    for (addr, chunk_proof) in answers {
2018        if let Some(expected_proof) = expected_proofs.get(&addr) {
2019            if expected_proof.verify(&chunk_proof) {
2020                correct_answers += 1;
2021            } else {
2022                info!("Spot a false answer to the challenge regarding {addr:?}");
2023                // Any false answer shall result in 0 score immediately
2024                return 0;
2025            }
2026        }
2027    }
2028    // TODO: For those answers not among the expected_proofs,
2029    //       it could be due to having different knowledge of records to us.
2030    //       shall we:
2031    //         * set the target being close to us, so that neighbours sharing same knowledge in higher chance
2032    //         * fetch from local to testify
2033    //         * fetch from network to testify
2034    std::cmp::min(
2035        HIGHEST_SCORE,
2036        HIGHEST_SCORE * correct_answers / expected_proofs.len(),
2037    )
2038}
2039
2040#[cfg(test)]
2041mod tests {
2042    use super::*;
2043    use std::str::FromStr;
2044
2045    #[test]
2046    fn test_no_local_peers() {
2047        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
2048        let target = NetworkAddress::from(PeerId::random());
2049        let num_of_peers = Some(5);
2050        let range = None;
2051        let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
2052
2053        assert_eq!(result, vec![]);
2054    }
2055
2056    #[test]
2057    fn test_fewer_local_peers_than_num_of_peers() {
2058        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2059            (
2060                PeerId::random(),
2061                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2062            ),
2063            (
2064                PeerId::random(),
2065                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2066            ),
2067            (
2068                PeerId::random(),
2069                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2070            ),
2071        ];
2072        let target = NetworkAddress::from(PeerId::random());
2073        let num_of_peers = Some(2);
2074        let range = None;
2075        let result = Node::calculate_get_closest_peers(
2076            local_peers.clone(),
2077            target.clone(),
2078            num_of_peers,
2079            range,
2080        );
2081
2082        // Result shall be sorted and truncated
2083        let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2084            .iter()
2085            .map(|(peer_id, multi_addrs)| {
2086                let addr = NetworkAddress::from(*peer_id);
2087                (addr, multi_addrs.clone())
2088            })
2089            .collect();
2090        expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
2091        let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
2092
2093        assert_eq!(expected_result, result);
2094    }
2095
2096    #[test]
2097    fn test_with_range_and_num_of_peers() {
2098        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2099            (
2100                PeerId::random(),
2101                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2102            ),
2103            (
2104                PeerId::random(),
2105                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2106            ),
2107            (
2108                PeerId::random(),
2109                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2110            ),
2111        ];
2112        let target = NetworkAddress::from(PeerId::random());
2113        let num_of_peers = Some(0);
2114        let range_value = [128; 32];
2115        let range = Some(range_value);
2116        let result = Node::calculate_get_closest_peers(
2117            local_peers.clone(),
2118            target.clone(),
2119            num_of_peers,
2120            range,
2121        );
2122
2123        // Range shall be preferred, i.e. the result peers shall all within the range
2124        let distance = U256::from_big_endian(&range_value);
2125        let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2126            .into_iter()
2127            .filter_map(|(peer_id, multi_addrs)| {
2128                let addr = NetworkAddress::from(peer_id);
2129                if target.distance(&addr).0 <= distance {
2130                    Some((addr, multi_addrs.clone()))
2131                } else {
2132                    None
2133                }
2134            })
2135            .collect();
2136
2137        assert_eq!(expected_result, result);
2138    }
2139
2140    mod merkle_payment_tests {
2141        use super::*;
2142
2143        /// Test that timestamp validation accepts valid timestamps (within the acceptable window)
2144        #[test]
2145        fn test_timestamp_validation_accepts_valid_timestamp() {
2146            let now = std::time::SystemTime::now()
2147                .duration_since(std::time::UNIX_EPOCH)
2148                .unwrap()
2149                .as_secs();
2150
2151            // Valid timestamp: 1 hour ago
2152            let valid_timestamp = now - 3600;
2153
2154            // Validate timestamp
2155            let age = now.saturating_sub(valid_timestamp);
2156
2157            assert!(
2158                valid_timestamp <= now,
2159                "Valid timestamp should not be in the future"
2160            );
2161            assert!(
2162                age <= MERKLE_PAYMENT_EXPIRATION,
2163                "Valid timestamp should not be expired"
2164            );
2165        }
2166
2167        /// Test that timestamp validation rejects future timestamps
2168        #[test]
2169        fn test_timestamp_validation_rejects_future_timestamp() {
2170            let now = std::time::SystemTime::now()
2171                .duration_since(std::time::UNIX_EPOCH)
2172                .unwrap()
2173                .as_secs();
2174
2175            // Future timestamp: 1 hour in the future
2176            let future_timestamp = now + 3600;
2177
2178            // Timestamp should be rejected
2179            assert!(
2180                future_timestamp > now,
2181                "Future timestamp should be rejected"
2182            );
2183        }
2184
2185        /// Test that timestamp validation rejects expired timestamps
2186        #[test]
2187        fn test_timestamp_validation_rejects_expired_timestamp() {
2188            let now = std::time::SystemTime::now()
2189                .duration_since(std::time::UNIX_EPOCH)
2190                .unwrap()
2191                .as_secs();
2192
2193            // Expired timestamp: 8 days ago (> 7 day expiration)
2194            let expired_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 86400);
2195
2196            // Calculate age
2197            let age = now.saturating_sub(expired_timestamp);
2198
2199            // Timestamp should be rejected
2200            assert!(
2201                age > MERKLE_PAYMENT_EXPIRATION,
2202                "Expired timestamp should be rejected"
2203            );
2204        }
2205
2206        /// Test timestamp at the exact expiration boundary (should be rejected)
2207        #[test]
2208        fn test_timestamp_validation_at_expiration_boundary() {
2209            let now = std::time::SystemTime::now()
2210                .duration_since(std::time::UNIX_EPOCH)
2211                .unwrap()
2212                .as_secs();
2213
2214            // Timestamp exactly at expiration boundary
2215            let boundary_timestamp = now - MERKLE_PAYMENT_EXPIRATION;
2216
2217            let age = now.saturating_sub(boundary_timestamp);
2218
2219            // At the boundary, age == MERKLE_PAYMENT_EXPIRATION
2220            assert_eq!(age, MERKLE_PAYMENT_EXPIRATION);
2221            // The validation uses >, so this should pass
2222            assert!(
2223                age <= MERKLE_PAYMENT_EXPIRATION,
2224                "Timestamp exactly at boundary should not be rejected"
2225            );
2226        }
2227
2228        /// Test timestamp just beyond expiration boundary (should be rejected)
2229        #[test]
2230        fn test_timestamp_validation_beyond_expiration_boundary() {
2231            let now = std::time::SystemTime::now()
2232                .duration_since(std::time::UNIX_EPOCH)
2233                .unwrap()
2234                .as_secs();
2235
2236            // Timestamp just beyond expiration boundary (1 second past)
2237            let beyond_boundary_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 1);
2238
2239            let age = now.saturating_sub(beyond_boundary_timestamp);
2240
2241            assert!(
2242                age > MERKLE_PAYMENT_EXPIRATION,
2243                "Timestamp beyond boundary should be rejected"
2244            );
2245        }
2246
2247        /// Test timestamp at current time (should be accepted)
2248        #[test]
2249        fn test_timestamp_validation_at_current_time() {
2250            let now = std::time::SystemTime::now()
2251                .duration_since(std::time::UNIX_EPOCH)
2252                .unwrap()
2253                .as_secs();
2254
2255            // Timestamp at current time
2256            let current_timestamp = now;
2257
2258            let age = now.saturating_sub(current_timestamp);
2259
2260            assert!(
2261                current_timestamp <= now,
2262                "Current timestamp should not be in future"
2263            );
2264            assert!(
2265                age <= MERKLE_PAYMENT_EXPIRATION,
2266                "Current timestamp should not be expired"
2267            );
2268            assert_eq!(age, 0, "Age should be 0 for current timestamp");
2269        }
2270
2271        /// Test timestamp near future boundary (1 second in future)
2272        #[test]
2273        fn test_timestamp_validation_near_future_boundary() {
2274            let now = std::time::SystemTime::now()
2275                .duration_since(std::time::UNIX_EPOCH)
2276                .unwrap()
2277                .as_secs();
2278
2279            // Timestamp 1 second in the future
2280            let near_future_timestamp = now + 1;
2281
2282            assert!(
2283                near_future_timestamp > now,
2284                "Near-future timestamp should be rejected"
2285            );
2286        }
2287
2288        /// Test expiration constant is set correctly (7 days = 604800 seconds)
2289        #[test]
2290        fn test_merkle_payment_expiration_constant() {
2291            const SEVEN_DAYS_IN_SECONDS: u64 = 7 * 24 * 60 * 60;
2292            assert_eq!(
2293                MERKLE_PAYMENT_EXPIRATION, SEVEN_DAYS_IN_SECONDS,
2294                "MERKLE_PAYMENT_EXPIRATION should be 7 days"
2295            );
2296        }
2297    }
2298}