ant_node/
node.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use super::{
10    Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkError, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::bootstrap::Bootstrap;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_evm::merkle_payments::MERKLE_PAYMENT_EXPIRATION;
22use ant_protocol::{
23    CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
24    error::Error as ProtocolError,
25    messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
26    storage::{try_deserialize_record, Chunk, ValidationType},
27};
28use bytes::Bytes;
29use futures::stream::{self, StreamExt};
30use itertools::Itertools;
31use libp2p::{
32    Multiaddr, PeerId,
33    identity::Keypair,
34    kad::{KBucketDistance as Distance, Record, U256},
35    request_response::OutboundFailure,
36};
37use num_traits::cast::ToPrimitive;
38use rand::{
39    Rng, SeedableRng,
40    rngs::{OsRng, StdRng},
41    seq::SliceRandom,
42    thread_rng,
43};
44use std::{
45    collections::{BTreeSet, HashMap, HashSet},
46    net::SocketAddr,
47    path::PathBuf,
48    sync::{
49        Arc,
50        atomic::{AtomicUsize, Ordering},
51    },
52    time::{Duration, Instant},
53};
54use tokio::sync::{Mutex, watch};
55use tokio::{
56    sync::mpsc::Receiver,
57    task::{JoinSet, spawn},
58};
59
60/// Interval to trigger replication of all records to all peers.
61/// This is the max time it should take. Minimum interval at any node will be half this
62pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
63
64/// Interval to trigger storage challenge.
65/// This is the max time it should take. Minimum interval at any node will be half this
66const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
67
68/// Interval to update the nodes uptime metric
69const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
70
71/// Interval to clean up unrelevant records
72/// This is the max time it should take. Minimum interval at any node will be half this
73const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
74
75/// Highest score to achieve from each metric sub-sector during StorageChallenge.
76const HIGHEST_SCORE: usize = 100;
77
78/// Any nodes bearing a score below this shall be considered as bad.
79/// Max is to be 100 * 100
80const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
81
82/// in ms, expecting average StorageChallenge complete time to be around 250ms.
83const TIME_STEP: usize = 20;
84
85/// Delay before retrying a failed replicated record fetch.
86const REPLICA_FETCH_RETRY_DELAY: Duration = Duration::from_secs(15 * 60);
87
88/// Number of peers to query for a replicated record health check.
89const REPLICA_FETCH_PEER_COUNT: usize = 5;
90
91/// Minimum number of successful replicas required to consider the record healthy.
92const MIN_HEALTHY_REPLICA_COUNT: usize = 3;
93
94/// Index (0-based, including self) used to derive the distance threshold for nearby records.
95const CLOSE_NEIGHBOUR_DISTANCE_INDEX: usize = 7;
96
97/// Maximum number of closest peers to track for replication behaviour detection.
98///
99/// Set to 20 (roughly 4x CLOSE_GROUP_SIZE) to cover peers that are close enough
100/// to affect data responsibility during churn. Tracking only the closest peers
101/// keeps memory bounded while still detecting restart patterns in the critical
102/// zone where replication matters most.
103const CLOSE_GROUP_TRACKING_LIMIT: usize = 20;
104
105/// Grace period before triggering replication after detecting a peer restart.
106///
107/// When a peer is detected as restarting (removed then quickly re-added),
108/// replication is suppressed for this duration to allow the peer to stabilize.
109/// Set to 90 seconds which is:
110/// - Long enough to cover typical node restart times (30-60s)
111/// - Short enough to ensure data availability isn't compromised
112/// - Approximately half of PERIODIC_REPLICATION_INTERVAL_MAX_S, so the periodic
113///   replication will catch any missed updates within a reasonable timeframe
114const CLOSE_GROUP_RESTART_SUPPRESSION: Duration = Duration::from_secs(90);
115
116/// Helper to build and run a Node
117pub struct NodeBuilder {
118    addr: SocketAddr,
119    bootstrap: Bootstrap,
120    evm_address: RewardsAddress,
121    evm_network: EvmNetwork,
122    identity_keypair: Keypair,
123    local: bool,
124    #[cfg(feature = "open-metrics")]
125    /// Set to Some to enable the metrics server
126    metrics_server_port: Option<u16>,
127    no_upnp: bool,
128    relay_client: bool,
129    root_dir: PathBuf,
130}
131
132impl NodeBuilder {
133    /// Instantiate the builder. The initial peers can either be supplied via the `initial_peers` method
134    /// or fetched from the bootstrap cache set using `bootstrap_cache` method.
135    pub fn new(
136        identity_keypair: Keypair,
137        bootstrap_flow: Bootstrap,
138        evm_address: RewardsAddress,
139        evm_network: EvmNetwork,
140        addr: SocketAddr,
141        root_dir: PathBuf,
142    ) -> Self {
143        Self {
144            addr,
145            bootstrap: bootstrap_flow,
146            evm_address,
147            evm_network,
148            identity_keypair,
149            local: false,
150            #[cfg(feature = "open-metrics")]
151            metrics_server_port: None,
152            no_upnp: false,
153            relay_client: false,
154            root_dir,
155        }
156    }
157
158    /// Set the flag to indicate if the node is running in local mode
159    pub fn local(&mut self, local: bool) {
160        self.local = local;
161    }
162
163    #[cfg(feature = "open-metrics")]
164    /// Set the port for the OpenMetrics server. Defaults to a random port if not set
165    pub fn metrics_server_port(&mut self, port: Option<u16>) {
166        self.metrics_server_port = port;
167    }
168
169    /// Set the flag to make the node act as a relay client
170    pub fn relay_client(&mut self, relay_client: bool) {
171        self.relay_client = relay_client;
172    }
173
174    /// Set the flag to disable UPnP for the node
175    pub fn no_upnp(&mut self, no_upnp: bool) {
176        self.no_upnp = no_upnp;
177    }
178
179    /// Asynchronously runs a new node instance, setting up the swarm driver,
180    /// creating a data storage, and handling network events. Returns the
181    /// created `RunningNode` which contains a `NodeEventsChannel` for listening
182    /// to node-related events.
183    ///
184    /// # Returns
185    ///
186    /// A `RunningNode` instance.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if there is a problem initializing the Network.
191    pub fn build_and_run(self) -> Result<RunningNode> {
192        // setup metrics
193        #[cfg(feature = "open-metrics")]
194        let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
195            // metadata registry
196            let mut metrics_registries = MetricsRegistries::default();
197            let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
198
199            (Some(metrics_recorder), metrics_registries)
200        } else {
201            (None, MetricsRegistries::default())
202        };
203
204        // create a shutdown signal channel
205        let (shutdown_tx, shutdown_rx) = watch::channel(false);
206
207        // init network
208        let network_config = NetworkConfig {
209            keypair: self.identity_keypair,
210            local: self.local,
211            listen_addr: self.addr,
212            root_dir: self.root_dir.clone(),
213            shutdown_rx: shutdown_rx.clone(),
214            bootstrap: self.bootstrap,
215            no_upnp: self.no_upnp,
216            relay_client: self.relay_client,
217            custom_request_timeout: None,
218            #[cfg(feature = "open-metrics")]
219            metrics_registries,
220            #[cfg(feature = "open-metrics")]
221            metrics_server_port: self.metrics_server_port,
222        };
223        let (network, network_event_receiver) = Network::init(network_config)?;
224
225        // init node
226        let node_events_channel = NodeEventsChannel::default();
227        let node = NodeInner {
228            network: network.clone(),
229            events_channel: node_events_channel.clone(),
230            close_group_tracker: Mutex::new(CloseGroupTracker::new(network.peer_id())),
231            reward_address: self.evm_address,
232            #[cfg(feature = "open-metrics")]
233            metrics_recorder,
234            evm_network: self.evm_network,
235        };
236        let node = Node {
237            inner: Arc::new(node),
238        };
239
240        // Run the node
241        node.run(network_event_receiver, shutdown_rx);
242        let running_node = RunningNode {
243            shutdown_sender: shutdown_tx,
244            network,
245            node_events_channel,
246            root_dir_path: self.root_dir,
247            rewards_address: self.evm_address,
248        };
249
250        Ok(running_node)
251    }
252}
253
254/// `Node` represents a single node in the distributed network. It handles
255/// network events, processes incoming requests, interacts with the data
256/// storage, and broadcasts node-related events.
257#[derive(Clone)]
258pub(crate) struct Node {
259    inner: Arc<NodeInner>,
260}
261
262/// The actual implementation of the Node. The other is just a wrapper around this, so that we don't expose
263/// the Arc from the interface.
264struct NodeInner {
265    events_channel: NodeEventsChannel,
266    network: Network,
267    close_group_tracker: Mutex<CloseGroupTracker>,
268    #[cfg(feature = "open-metrics")]
269    metrics_recorder: Option<NodeMetricsRecorder>,
270    reward_address: RewardsAddress,
271    evm_network: EvmNetwork,
272}
273
274impl Node {
275    /// Returns the NodeEventsChannel
276    pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
277        &self.inner.events_channel
278    }
279
280    /// Returns the instance of Network
281    pub(crate) fn network(&self) -> &Network {
282        &self.inner.network
283    }
284
285    fn close_group_tracker(&self) -> &Mutex<CloseGroupTracker> {
286        &self.inner.close_group_tracker
287    }
288
289    #[cfg(feature = "open-metrics")]
290    /// Returns a reference to the NodeMetricsRecorder if the `open-metrics` feature flag is enabled
291    /// This is used to record various metrics for the node.
292    pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
293        self.inner.metrics_recorder.as_ref()
294    }
295
296    /// Returns the reward address of the node
297    pub(crate) fn reward_address(&self) -> &RewardsAddress {
298        &self.inner.reward_address
299    }
300
301    pub(crate) fn evm_network(&self) -> &EvmNetwork {
302        &self.inner.evm_network
303    }
304
305    /// Spawns a task to process for `NetworkEvents`.
306    /// Returns both tasks as JoinHandle<()>.
307    fn run(
308        self,
309        mut network_event_receiver: Receiver<NetworkEvent>,
310        mut shutdown_rx: watch::Receiver<bool>,
311    ) {
312        let mut rng = StdRng::from_entropy();
313
314        let peers_connected = Arc::new(AtomicUsize::new(0));
315
316        let _node_task = spawn(async move {
317            // use a random activity timeout to ensure that the nodes do not sync when messages
318            // are being transmitted.
319            let replication_interval: u64 = rng.gen_range(
320                PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
321            );
322            let replication_interval_time = Duration::from_secs(replication_interval);
323            debug!("Replication interval set to {replication_interval_time:?}");
324
325            let mut replication_interval = tokio::time::interval(replication_interval_time);
326            let _ = replication_interval.tick().await; // first tick completes immediately
327
328            let mut uptime_metrics_update_interval =
329                tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
330            let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately
331
332            // use a random activity timeout to ensure that the nodes do not sync on work,
333            // causing an overall CPU spike.
334            let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
335                UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
336                    ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
337            );
338            let irrelevant_records_cleanup_interval_time =
339                Duration::from_secs(irrelevant_records_cleanup_interval);
340            let mut irrelevant_records_cleanup_interval =
341                tokio::time::interval(irrelevant_records_cleanup_interval_time);
342            let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately
343
344            // use a random neighbour storage challenge ticker to ensure
345            // neighbours do not carryout challenges at the same time
346            let storage_challenge_interval: u64 =
347                rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
348            let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
349            debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
350
351            let mut storage_challenge_interval =
352                tokio::time::interval(storage_challenge_interval_time);
353            let _ = storage_challenge_interval.tick().await; // first tick completes immediately
354
355            loop {
356                let peers_connected = &peers_connected;
357
358                tokio::select! {
359                    // Check for a shutdown command.
360                    result = shutdown_rx.changed() => {
361                        if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
362                            info!("Shutdown signal received or sender dropped. Exiting network events loop.");
363                            break;
364                        }
365                    },
366                    net_event = network_event_receiver.recv() => {
367                        match net_event {
368                            Some(event) => {
369                                let start = Instant::now();
370                                let event_string = format!("{event:?}");
371
372                                self.handle_network_event(event, peers_connected).await;
373                                trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
374
375                            }
376                            None => {
377                                error!("The `NetworkEvent` channel is closed");
378                                self.events_channel().broadcast(NodeEvent::ChannelClosed);
379                                break;
380                            }
381                        }
382                    }
383                    // runs every replication_interval time
384                    _ = replication_interval.tick() => {
385                        let now = Instant::now();
386
387                        {
388                            let mut tracker = self.close_group_tracker().lock().await;
389                            if tracker.handle_timer_expiry(now) {
390                                trace!("Replication timer expired for tracked peers; periodic run will cover it");
391                            }
392                        }
393
394                        let start = now;
395                        let network = self.network().clone();
396                        self.record_metrics(Marker::IntervalReplicationTriggered);
397
398                        let _handle = spawn(async move {
399                            Self::try_interval_replication(network);
400                            trace!("Periodic replication took {:?}", start.elapsed());
401                        });
402                    }
403                    _ = uptime_metrics_update_interval.tick() => {
404                        #[cfg(feature = "open-metrics")]
405                        if let Some(metrics_recorder) = self.metrics_recorder() {
406                            let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
407                        }
408                    }
409                    _ = irrelevant_records_cleanup_interval.tick() => {
410                        let network = self.network().clone();
411
412                        let _handle = spawn(async move {
413                            Self::trigger_irrelevant_record_cleanup(network);
414                        });
415                    }
416                    // runs every storage_challenge_interval time
417                    _ = storage_challenge_interval.tick() => {
418                        let start = Instant::now();
419                        debug!("Periodic storage challenge triggered");
420                        let network = self.network().clone();
421
422                        let _handle = spawn(async move {
423                            Self::storage_challenge(network).await;
424                            trace!("Periodic storage challenge took {:?}", start.elapsed());
425                        });
426                    }
427                }
428            }
429        });
430    }
431
432    /// Calls Marker::log() to insert the marker into the log files.
433    /// Also calls NodeMetrics::record() to record the metric if the `open-metrics` feature flag is enabled.
434    pub(crate) fn record_metrics(&self, marker: Marker) {
435        marker.log();
436        #[cfg(feature = "open-metrics")]
437        if let Some(metrics_recorder) = self.metrics_recorder() {
438            metrics_recorder.record(marker)
439        }
440    }
441
442    // **** Private helpers *****
443
444    /// Handle a network event.
445    /// Spawns a thread for any likely long running tasks
446    async fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
447        let start = Instant::now();
448        let event_string = format!("{event:?}");
449        let event_header;
450
451        // Reducing non-mandatory logging
452        if let NetworkEvent::QueryRequestReceived {
453            query: Query::GetVersion { .. },
454            ..
455        } = event
456        {
457            trace!("Handling NetworkEvent {event_string}");
458        } else {
459            debug!("Handling NetworkEvent {event_string}");
460        }
461
462        match event {
463            NetworkEvent::PeerAdded(peer_id, connected_peers) => {
464                event_header = "PeerAdded";
465                // increment peers_connected and send ConnectedToNetwork event if have connected to K_VALUE peers
466                let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
467                if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
468                    self.events_channel()
469                        .broadcast(NodeEvent::ConnectedToNetwork);
470                }
471
472                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
473                self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
474
475                // try query peer version
476                let network = self.network().clone();
477                let _handle = spawn(async move {
478                    Self::try_query_peer_version(network, peer_id, Default::default()).await;
479                });
480
481                let replication_decision = {
482                    let mut tracker = self.close_group_tracker().lock().await;
483                    tracker.record_peer_added(peer_id)
484                };
485
486                if replication_decision.should_trigger() {
487                    let network = self.network().clone();
488                    self.record_metrics(Marker::IntervalReplicationTriggered);
489                    let _handle = spawn(async move {
490                        Self::try_interval_replication(network);
491                    });
492                } else if matches!(replication_decision, ReplicationDirective::Skip) {
493                    trace!(
494                        "Replication skipped for {peer_id:?} addition due to restart behaviour tracking"
495                    );
496                }
497            }
498            NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
499                event_header = "PeerRemoved";
500                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
501                self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
502
503                let self_id = self.network().peer_id();
504                let distance =
505                    NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
506                info!(
507                    "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
508                    distance.ilog2()
509                );
510
511                let replication_decision = {
512                    let mut tracker = self.close_group_tracker().lock().await;
513                    tracker.record_peer_removed(peer_id)
514                };
515
516                if replication_decision.should_trigger() {
517                    let network = self.network().clone();
518                    self.record_metrics(Marker::IntervalReplicationTriggered);
519                    let _handle = spawn(async move {
520                        Self::try_interval_replication(network);
521                    });
522                } else if matches!(replication_decision, ReplicationDirective::Skip) {
523                    trace!(
524                        "Replication skipped for {peer_id:?} removal due to restart behaviour tracking"
525                    );
526                }
527            }
528            NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
529                event_header = "PeerWithUnsupportedProtocol";
530            }
531            NetworkEvent::NewListenAddr(_) => {
532                event_header = "NewListenAddr";
533            }
534            NetworkEvent::ResponseReceived { res } => {
535                event_header = "ResponseReceived";
536                if let Err(err) = self.handle_response(res) {
537                    error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
538                }
539            }
540            NetworkEvent::KeysToFetchForReplication(keys) => {
541                event_header = "KeysToFetchForReplication";
542                self.record_metrics(Marker::fetching_keys_for_replication(&keys));
543
544                if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
545                    error!("Error while trying to fetch replicated data {err:?}");
546                }
547            }
548            NetworkEvent::QueryRequestReceived { query, channel } => {
549                event_header = "QueryRequestReceived";
550                let node = self.clone();
551                let payment_address = *self.reward_address();
552
553                let _handle = spawn(async move {
554                    let network = node.network().clone();
555                    let res = Self::handle_query(node, query, payment_address).await;
556
557                    // Reducing non-mandatory logging
558                    if let Response::Query(QueryResponse::GetVersion { .. }) = res {
559                        trace!("Sending response {res:?}");
560                    } else {
561                        debug!("Sending response {res:?}");
562                    }
563
564                    network.send_response(res, channel);
565                });
566            }
567            NetworkEvent::UnverifiedRecord(record) => {
568                event_header = "UnverifiedRecord";
569                // queries can be long running and require validation, so we spawn a task to handle them
570                let self_clone = self.clone();
571                let _handle = spawn(async move {
572                    let key = PrettyPrintRecordKey::from(&record.key).into_owned();
573                    match self_clone.validate_and_store_record(record).await {
574                        Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
575                        Err(err) => {
576                            self_clone.record_metrics(Marker::RecordRejected(&key, &err));
577                        }
578                    }
579                });
580            }
581            NetworkEvent::TerminateNode { reason } => {
582                event_header = "TerminateNode";
583                error!("Received termination from swarm_driver due to {reason:?}");
584                self.events_channel()
585                    .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
586            }
587            NetworkEvent::FailedToFetchHolders(bad_nodes) => {
588                event_header = "FailedToFetchHolders";
589                let network = self.network().clone();
590                let pretty_log: Vec<_> = bad_nodes
591                    .iter()
592                    .map(|(peer_id, record_key)| {
593                        let pretty_key = PrettyPrintRecordKey::from(record_key);
594                        (peer_id, pretty_key)
595                    })
596                    .collect();
597                // Note: this log will be checked in CI, and expecting `not appear`.
598                //       any change to the keyword `failed to fetch` shall incur
599                //       correspondent CI script change as well.
600                debug!(
601                    "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
602                );
603                let _handle = spawn(async move {
604                    for (peer_id, record_key) in bad_nodes {
605                        // Obsoleted fetch request (due to flooded in fresh replicates) could result
606                        // in peer to be claimed as bad, as local copy blocks the entry to be cleared.
607                        if let Ok(false) = network.is_record_key_present_locally(&record_key).await
608                        {
609                            error!(
610                                "From peer {peer_id:?}, failed to fetch record {:?}",
611                                PrettyPrintRecordKey::from(&record_key)
612                            );
613                            network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
614                        }
615                    }
616                });
617            }
618            NetworkEvent::QuoteVerification { quotes } => {
619                event_header = "QuoteVerification";
620                let network = self.network().clone();
621
622                let _handle = spawn(async move {
623                    quotes_verification(&network, quotes).await;
624                });
625            }
626            NetworkEvent::FreshReplicateToFetch { holder, keys } => {
627                event_header = "FreshReplicateToFetch";
628                self.fresh_replicate_to_fetch(holder, keys);
629            }
630            NetworkEvent::PeersForVersionQuery(peers) => {
631                event_header = "PeersForVersionQuery";
632                let network = self.network().clone();
633                let _handle = spawn(async move {
634                    Self::query_peers_version(network, peers).await;
635                });
636            }
637            NetworkEvent::NetworkWideReplication { keys } => {
638                event_header = "NetworkWideReplication";
639                self.perform_network_wide_replication(keys);
640            }
641        }
642
643        trace!(
644            "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
645            start.elapsed()
646        );
647    }
648
649    // Handle the response that was not awaited at the call site
650    fn handle_response(&self, response: Response) -> Result<()> {
651        match response {
652            Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
653                // This should actually have been short-circuted when received
654                warn!("Mishandled replicate response, should be handled earlier");
655            }
656            Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
657                error!(
658                    "Response to replication shall be handled by called not by common handler, {resp:?}"
659                );
660            }
661            Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
662                // No need to handle
663            }
664            other => {
665                warn!("handle_response not implemented for {other:?}");
666            }
667        };
668
669        Ok(())
670    }
671
672    async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
673        let network = node.network();
674        let resp: QueryResponse = match query {
675            Query::GetStoreQuote {
676                key,
677                data_type,
678                data_size,
679                nonce,
680                difficulty,
681            } => {
682                let record_key = key.to_record_key();
683                let self_id = network.peer_id();
684
685                let maybe_quoting_metrics = network
686                    .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
687                    .await;
688
689                let storage_proofs = if let Some(nonce) = nonce {
690                    Self::respond_x_closest_record_proof(
691                        network,
692                        key.clone(),
693                        nonce,
694                        difficulty,
695                        false,
696                    )
697                    .await
698                } else {
699                    vec![]
700                };
701
702                match maybe_quoting_metrics {
703                    Ok((quoting_metrics, is_already_stored)) => {
704                        if is_already_stored {
705                            QueryResponse::GetStoreQuote {
706                                quote: Err(ProtocolError::RecordExists(
707                                    PrettyPrintRecordKey::from(&record_key).into_owned(),
708                                )),
709                                peer_address: NetworkAddress::from(self_id),
710                                storage_proofs,
711                            }
712                        } else {
713                            QueryResponse::GetStoreQuote {
714                                quote: Self::create_quote_for_storecost(
715                                    network,
716                                    &key,
717                                    &quoting_metrics,
718                                    &payment_address,
719                                ),
720                                peer_address: NetworkAddress::from(self_id),
721                                storage_proofs,
722                            }
723                        }
724                    }
725                    Err(err) => {
726                        warn!("GetStoreQuote failed for {key:?}: {err}");
727                        QueryResponse::GetStoreQuote {
728                            quote: Err(ProtocolError::GetStoreQuoteFailed),
729                            peer_address: NetworkAddress::from(self_id),
730                            storage_proofs,
731                        }
732                    }
733                }
734            }
735            Query::GetReplicatedRecord { requester: _, key } => {
736                let our_address = NetworkAddress::from(network.peer_id());
737                let record_key = key.to_record_key();
738
739                let result = match network.get_local_record(&record_key).await {
740                    Ok(Some(record)) => Ok((our_address, Bytes::from(record.value))),
741                    Ok(None) => Err(ProtocolError::ReplicatedRecordNotFound {
742                        holder: Box::new(our_address),
743                        key: Box::new(key.clone()),
744                    }),
745                    // Use `PutRecordFailed` as place holder
746                    Err(err) => Err(ProtocolError::PutRecordFailed(format!(
747                        "Error to fetch local record for GetReplicatedRecord {err:?}"
748                    ))),
749                };
750
751                QueryResponse::GetReplicatedRecord(result)
752            }
753            Query::GetChunkExistenceProof {
754                key,
755                nonce,
756                difficulty,
757            } => QueryResponse::GetChunkExistenceProof(
758                Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
759            ),
760            Query::CheckNodeInProblem(target_address) => {
761                debug!("Got CheckNodeInProblem for peer {target_address:?}");
762
763                let is_in_trouble =
764                    if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
765                        result
766                    } else {
767                        debug!("Could not get status of {target_address:?}.");
768                        false
769                    };
770
771                QueryResponse::CheckNodeInProblem {
772                    reporter_address: NetworkAddress::from(network.peer_id()),
773                    target_address,
774                    is_in_trouble,
775                }
776            }
777            Query::GetClosestPeers {
778                key,
779                num_of_peers,
780                range,
781                sign_result,
782            } => {
783                debug!(
784                    "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
785                );
786                Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
787                    .await
788            }
789            Query::GetVersion(_) => QueryResponse::GetVersion {
790                peer: NetworkAddress::from(network.peer_id()),
791                version: ant_build_info::package_version(),
792            },
793            Query::PutRecord {
794                holder,
795                address,
796                serialized_record,
797            } => {
798                let record = Record {
799                    key: address.to_record_key(),
800                    value: serialized_record,
801                    publisher: None,
802                    expires: None,
803                };
804
805                let key = PrettyPrintRecordKey::from(&record.key).into_owned();
806                let result = match node.validate_and_store_record(record).await {
807                    Ok(()) => Ok(()),
808                    Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
809                        node.record_metrics(Marker::RecordRejected(
810                            &key,
811                            &PutValidationError::OutdatedRecordCounter { counter, expected },
812                        ));
813                        Err(ProtocolError::OutdatedRecordCounter { counter, expected })
814                    }
815                    Err(PutValidationError::TopologyVerificationFailed {
816                        target_address,
817                        valid_count,
818                        total_paid,
819                        closest_count,
820                        node_peers,
821                        paid_peers,
822                    }) => {
823                        node.record_metrics(Marker::RecordRejected(
824                            &key,
825                            &PutValidationError::TopologyVerificationFailed {
826                                target_address: target_address.clone(),
827                                valid_count,
828                                total_paid,
829                                closest_count,
830                                node_peers: node_peers.clone(),
831                                paid_peers: paid_peers.clone(),
832                            },
833                        ));
834                        Err(ProtocolError::TopologyVerificationFailed {
835                            target_address: Box::new(target_address),
836                            valid_count,
837                            total_paid,
838                            closest_count,
839                            node_peers,
840                            paid_peers,
841                        })
842                    }
843                    Err(err) => {
844                        node.record_metrics(Marker::RecordRejected(&key, &err));
845                        Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
846                    }
847                };
848                QueryResponse::PutRecord {
849                    result,
850                    peer_address: holder,
851                    record_addr: address,
852                }
853            }
854            Query::GetMerkleCandidateQuote {
855                key,
856                data_type,
857                data_size,
858                merkle_payment_timestamp,
859            } => {
860                Self::respond_merkle_candidate_quote(
861                    network,
862                    key,
863                    data_type,
864                    data_size,
865                    merkle_payment_timestamp,
866                    payment_address,
867                )
868                .await
869            }
870        };
871        Response::Query(resp)
872    }
873
874    async fn respond_get_closest_peers(
875        network: &Network,
876        target: NetworkAddress,
877        num_of_peers: Option<usize>,
878        range: Option<[u8; 32]>,
879        sign_result: bool,
880    ) -> QueryResponse {
881        let local_peers = network.get_local_peers_with_multiaddr().await;
882        let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
883            Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
884        } else {
885            vec![]
886        };
887
888        let signature = if sign_result {
889            let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
890            bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
891            network.sign(&bytes).ok()
892        } else {
893            None
894        };
895
896        QueryResponse::GetClosestPeers {
897            target,
898            peers,
899            signature,
900        }
901    }
902
903    fn calculate_get_closest_peers(
904        peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
905        target: NetworkAddress,
906        num_of_peers: Option<usize>,
907        range: Option<[u8; 32]>,
908    ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
909        match (num_of_peers, range) {
910            (_, Some(value)) => {
911                let distance = U256::from_big_endian(&value);
912                peer_addrs
913                    .iter()
914                    .filter_map(|(peer_id, multi_addrs)| {
915                        let addr = NetworkAddress::from(*peer_id);
916                        if target.distance(&addr).0 <= distance {
917                            Some((addr, multi_addrs.clone()))
918                        } else {
919                            None
920                        }
921                    })
922                    .collect()
923            }
924            (Some(num_of_peers), _) => {
925                let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
926                    .iter()
927                    .map(|(peer_id, multi_addrs)| {
928                        let addr = NetworkAddress::from(*peer_id);
929                        (addr, multi_addrs.clone())
930                    })
931                    .collect();
932                result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
933                result.into_iter().take(num_of_peers).collect()
934            }
935            (None, None) => vec![],
936        }
937    }
938
939    /// Handle GetMerkleCandidateQuote query
940    /// Returns a signed MerklePaymentCandidateNode containing the node's current quoting metrics,
941    /// reward address, and timestamp commitment
942    async fn respond_merkle_candidate_quote(
943        network: &Network,
944        key: NetworkAddress,
945        data_type: u32,
946        data_size: usize,
947        merkle_payment_timestamp: u64,
948        payment_address: RewardsAddress,
949    ) -> QueryResponse {
950        debug!(
951            "merkle payment: GetMerkleCandidateQuote for target {key:?}, timestamp: {merkle_payment_timestamp}, data_type: {data_type}, data_size: {data_size}"
952        );
953
954        // Validate timestamp before signing to prevent committing to invalid times.
955        // Nodes will reject proofs with expired/future timestamps during payment verification,
956        // so signing such timestamps would create useless quotes that can't be used.
957        //
958        // Allow ±24 hours tolerance to handle timezone differences and clock skew between
959        // clients and nodes. This prevents valid payments from being rejected due to minor
960        // time differences while still catching truly expired/invalid timestamps.
961        const TIMESTAMP_TOLERANCE: u64 = 24 * 60 * 60; // 24 hours
962
963        let now = std::time::SystemTime::now()
964            .duration_since(std::time::UNIX_EPOCH)
965            .unwrap_or_default()
966            .as_secs();
967
968        // Reject future timestamps (with 24h tolerance for clock skew)
969        let future_threshold = now + TIMESTAMP_TOLERANCE;
970        if merkle_payment_timestamp > future_threshold {
971            let error_msg = format!(
972                "Rejected future timestamp {merkle_payment_timestamp} (current time: {now}, threshold: {future_threshold})"
973            );
974            warn!("{error_msg} for {key:?}");
975            return QueryResponse::GetMerkleCandidateQuote(Err(
976                ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
977            ));
978        }
979
980        // Reject expired timestamps (with 24h tolerance for clock skew)
981        let expiration_threshold = MERKLE_PAYMENT_EXPIRATION + TIMESTAMP_TOLERANCE;
982        let age = now.saturating_sub(merkle_payment_timestamp);
983        if age > expiration_threshold {
984            let error_msg = format!(
985                "Rejected expired timestamp {merkle_payment_timestamp} (age: {age}s, max: {expiration_threshold}s)",
986            );
987            warn!("{error_msg} for {key:?}");
988            return QueryResponse::GetMerkleCandidateQuote(Err(
989                ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
990            ));
991        }
992
993        // Get node's current quoting metrics
994        let record_key = key.to_record_key();
995        let (quoting_metrics, _is_already_stored) = match network
996            .get_local_quoting_metrics(record_key, data_type, data_size)
997            .await
998        {
999            Ok(metrics) => metrics,
1000            Err(err) => {
1001                let error_msg = format!("Failed to get quoting metrics for {key:?}: {err}");
1002                warn!("{error_msg}");
1003                return QueryResponse::GetMerkleCandidateQuote(Err(
1004                    ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1005                ));
1006            }
1007        };
1008
1009        // Create the MerklePaymentCandidateNode with node's signed commitment
1010        let pub_key = network.get_pub_key();
1011        let reward_address = payment_address;
1012        let bytes = ant_evm::merkle_payments::MerklePaymentCandidateNode::bytes_to_sign(
1013            &quoting_metrics,
1014            &reward_address,
1015            merkle_payment_timestamp,
1016        );
1017        let signature = match network.sign(&bytes) {
1018            Ok(sig) => sig,
1019            Err(e) => {
1020                let error_msg = format!("Failed to sign candidate node for {key:?}: {e}");
1021                error!("{error_msg}");
1022                return QueryResponse::GetMerkleCandidateQuote(Err(
1023                    ProtocolError::FailedToSignMerkleCandidate(error_msg),
1024                ));
1025            }
1026        };
1027
1028        let candidate = ant_evm::merkle_payments::MerklePaymentCandidateNode {
1029            quoting_metrics,
1030            reward_address,
1031            merkle_payment_timestamp,
1032            pub_key,
1033            signature,
1034        };
1035        QueryResponse::GetMerkleCandidateQuote(Ok(candidate))
1036    }
1037
1038    // Nodes only check ChunkProof each other, to avoid `multi-version` issue
1039    // Client check proof against all records, as have to fetch from network anyway.
1040    async fn respond_x_closest_record_proof(
1041        network: &Network,
1042        key: NetworkAddress,
1043        nonce: Nonce,
1044        difficulty: usize,
1045        chunk_only: bool,
1046    ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
1047        let start = Instant::now();
1048        let mut results = vec![];
1049        if difficulty == 1 {
1050            // Client checking existence of published chunk.
1051            let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
1052            if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
1053                let proof = ChunkProof::new(&record.value, nonce);
1054                debug!("Chunk proof for {key:?} is {proof:?}");
1055                result = Ok(proof)
1056            } else {
1057                debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
1058            }
1059
1060            results.push((key.clone(), result));
1061        } else {
1062            let all_local_records = network.get_all_local_record_addresses().await;
1063
1064            if let Ok(all_local_records) = all_local_records {
1065                let mut all_chunk_addrs: Vec<_> = if chunk_only {
1066                    all_local_records
1067                        .iter()
1068                        .filter_map(|(addr, record_type)| {
1069                            if *record_type == ValidationType::Chunk {
1070                                Some(addr.clone())
1071                            } else {
1072                                None
1073                            }
1074                        })
1075                        .collect()
1076                } else {
1077                    all_local_records.keys().cloned().collect()
1078                };
1079
1080                // Sort by distance and only take first X closest entries
1081                all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
1082
1083                // TODO: this shall be deduced from resource usage dynamically
1084                let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
1085
1086                for addr in all_chunk_addrs.iter().take(workload_factor) {
1087                    if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
1088                    {
1089                        let proof = ChunkProof::new(&record.value, nonce);
1090                        debug!("Chunk proof for {key:?} is {proof:?}");
1091                        results.push((addr.clone(), Ok(proof)));
1092                    }
1093                }
1094            }
1095
1096            info!(
1097                "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
1098                results.len(),
1099                start.elapsed()
1100            );
1101        }
1102
1103        results
1104    }
1105
1106    /// Check among all chunk type records that we have,
1107    /// and randomly pick one as the verification candidate.
1108    /// This will challenge all closest peers at once.
1109    async fn storage_challenge(network: Network) {
1110        let start = Instant::now();
1111        let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
1112            network.get_k_closest_local_peers_to_the_target(None).await
1113        {
1114            closest_peers
1115                .into_iter()
1116                .take(CLOSE_GROUP_SIZE)
1117                .collect_vec()
1118        } else {
1119            error!("Cannot get local neighbours");
1120            return;
1121        };
1122        if closest_peers.len() < CLOSE_GROUP_SIZE {
1123            debug!(
1124                "Not enough neighbours ({}/{}) to carry out storage challenge.",
1125                closest_peers.len(),
1126                CLOSE_GROUP_SIZE
1127            );
1128            return;
1129        }
1130
1131        let mut verify_candidates: Vec<NetworkAddress> =
1132            if let Ok(all_keys) = network.get_all_local_record_addresses().await {
1133                all_keys
1134                    .iter()
1135                    .filter_map(|(addr, record_type)| {
1136                        if ValidationType::Chunk == *record_type {
1137                            Some(addr.clone())
1138                        } else {
1139                            None
1140                        }
1141                    })
1142                    .collect()
1143            } else {
1144                error!("Failed to get local record addresses.");
1145                return;
1146            };
1147        let num_of_targets = verify_candidates.len();
1148        if num_of_targets < 50 {
1149            debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
1150            return;
1151        }
1152
1153        // To ensure the neighbours sharing same knowledge as to us,
1154        // The target is choosen to be not far from us.
1155        let self_addr = NetworkAddress::from(network.peer_id());
1156        verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
1157        let index: usize = OsRng.gen_range(0..num_of_targets / 2);
1158        let target = verify_candidates[index].clone();
1159        // TODO: workload shall be dynamically deduced from resource usage
1160        let difficulty = CLOSE_GROUP_SIZE;
1161        verify_candidates.sort_by_key(|addr| target.distance(addr));
1162        let expected_targets = verify_candidates.into_iter().take(difficulty);
1163        let nonce: Nonce = thread_rng().r#gen::<u64>();
1164        let mut expected_proofs = HashMap::new();
1165        for addr in expected_targets {
1166            if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
1167                let expected_proof = ChunkProof::new(&record.value, nonce);
1168                let _ = expected_proofs.insert(addr, expected_proof);
1169            } else {
1170                error!("Local record {addr:?} cann't be loaded from disk.");
1171            }
1172        }
1173        let request = Request::Query(Query::GetChunkExistenceProof {
1174            key: target.clone(),
1175            nonce,
1176            difficulty,
1177        });
1178
1179        let mut tasks = JoinSet::new();
1180        for (peer_id, addresses) in closest_peers {
1181            if peer_id == network.peer_id() {
1182                continue;
1183            }
1184            let network_clone = network.clone();
1185            let request_clone = request.clone();
1186            let expected_proofs_clone = expected_proofs.clone();
1187            let _ = tasks.spawn(async move {
1188                let res = scoring_peer(
1189                    network_clone,
1190                    (peer_id, addresses),
1191                    request_clone,
1192                    expected_proofs_clone,
1193                )
1194                .await;
1195                (peer_id, res)
1196            });
1197        }
1198
1199        let mut peer_scores = vec![];
1200        while let Some(res) = tasks.join_next().await {
1201            match res {
1202                Ok((peer_id, score)) => {
1203                    let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
1204                    if !is_healthy {
1205                        info!(
1206                            "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
1207                        );
1208                        // TODO: shall the challenge failure immediately triggers the node to be removed?
1209                        network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
1210                    }
1211                    peer_scores.push((peer_id, is_healthy));
1212                }
1213                Err(e) => {
1214                    info!("StorageChallenge task completed with error {e:?}");
1215                }
1216            }
1217        }
1218        if !peer_scores.is_empty() {
1219            network.notify_peer_scores(peer_scores);
1220        }
1221
1222        Self::verify_local_replication_health(network.clone()).await;
1223
1224        info!(
1225            "Completed node StorageChallenge against neighbours in {:?}!",
1226            start.elapsed()
1227        );
1228    }
1229
1230    /// Perform a direct replicated record fetch from nearby peers to verify replication health.
1231    async fn verify_local_replication_health(network: Network) {
1232        let closest_peers = match network
1233            .get_k_closest_local_peers_to_the_target(None)
1234            .await
1235        {
1236            Ok(peers) => peers,
1237            Err(err) => {
1238                warn!("Cannot fetch closest peers for replica verification: {err:?}");
1239                return;
1240            }
1241        };
1242
1243        if closest_peers.len() <= CLOSE_NEIGHBOUR_DISTANCE_INDEX {
1244            debug!(
1245                "Skipping replica verification as we only know {} neighbours (need > {}).",
1246                closest_peers.len(),
1247                CLOSE_NEIGHBOUR_DISTANCE_INDEX
1248            );
1249            return;
1250        }
1251
1252        let self_address = NetworkAddress::from(network.peer_id());
1253        let Some((threshold_peer, _)) = closest_peers.get(CLOSE_NEIGHBOUR_DISTANCE_INDEX) else {
1254            debug!("Unable to determine distance threshold for replica verification.");
1255            return;
1256        };
1257
1258        let threshold_distance =
1259            self_address.distance(&NetworkAddress::from(*threshold_peer));
1260        let Some(threshold_ilog2) = threshold_distance.ilog2() else {
1261            debug!("Threshold distance lacks ilog2; cannot proceed with replica verification.");
1262            return;
1263        };
1264
1265        let local_records = match network.get_all_local_record_addresses().await {
1266            Ok(records) => records,
1267            Err(err) => {
1268                warn!("Failed to list local records for replica verification: {err:?}");
1269                return;
1270            }
1271        };
1272
1273        let mut nearby_records: Vec<NetworkAddress> = local_records
1274            .into_iter()
1275            .filter_map(|(address, record_type)| {
1276                if record_type != ValidationType::Chunk {
1277                    return None;
1278                }
1279                let distance = self_address.distance(&address);
1280                distance
1281                    .ilog2()
1282                    .and_then(|record_ilog2| {
1283                        if record_ilog2 <= threshold_ilog2 {
1284                            Some(address)
1285                        } else {
1286                            None
1287                        }
1288                    })
1289            })
1290            .collect();
1291
1292        nearby_records.shuffle(&mut thread_rng());
1293        let target_record = if let Some(entry) = nearby_records.first().cloned() {
1294            entry
1295        } else {
1296            debug!("No nearby chunk records available for replica verification.");
1297            return;
1298        };
1299
1300        let pretty_key =
1301            PrettyPrintRecordKey::from(&target_record.to_record_key()).into_owned();
1302
1303        let candidate_peers = match network
1304            .get_k_closest_local_peers_to_the_target(Some(target_record.clone()))
1305            .await
1306        {
1307            Ok(peers) => peers
1308                .into_iter()
1309                .filter(|(peer_id, _)| peer_id != &network.peer_id())
1310                .take(REPLICA_FETCH_PEER_COUNT)
1311                .collect::<Vec<_>>(),
1312            Err(err) => {
1313                warn!(
1314                    "Cannot fetch record-specific closest peers for replica verification: {err:?}"
1315                );
1316                return;
1317            }
1318        };
1319
1320        if candidate_peers.len() < REPLICA_FETCH_PEER_COUNT {
1321            debug!(
1322                "Only {} peers available for replica verification (need at least {}).",
1323                candidate_peers.len(),
1324                REPLICA_FETCH_PEER_COUNT
1325            );
1326            return;
1327        }
1328
1329        debug!(
1330            "Verifying replicated record {pretty_key:?} against {} closest peers.",
1331            candidate_peers.len()
1332        );
1333
1334        let (successful_peers, failed_peers) = Self::fetch_record_from_peers_with_addresses(
1335            network.clone(),
1336            target_record.clone(),
1337            candidate_peers,
1338        )
1339        .await;
1340
1341        if failed_peers.is_empty() {
1342            debug!(
1343                "All peers returned record {pretty_key:?} during replica verification."
1344            );
1345            return;
1346        }
1347
1348        if successful_peers.len() < MIN_HEALTHY_REPLICA_COUNT {
1349            warn!(
1350                "Replica verification fetched only {} copies of {pretty_key:?}. Record is unhealthy; skipping peer classification.",
1351                successful_peers.len()
1352            );
1353            return;
1354        }
1355
1356        if !failed_peers.is_empty() {
1357            info!(
1358                "Scheduling replica verification retry for {} peers on record {pretty_key:?}.",
1359                failed_peers.len()
1360            );
1361            Self::schedule_record_fetch_retry(network.clone(), target_record.clone(), failed_peers).await;
1362        }
1363    }
1364
1365    /// Fetch a replicated record from the provided peers and return the success/failure split.
1366    async fn fetch_record_from_peers_with_addresses(
1367        network: Network,
1368        record_address: NetworkAddress,
1369        peers: Vec<(PeerId, Addresses)>,
1370    ) -> (Vec<PeerId>, Vec<PeerId>) {
1371        let request = Request::Query(Query::GetReplicatedRecord {
1372            requester: NetworkAddress::from(network.peer_id()),
1373            key: record_address.clone(),
1374        });
1375        let expected_key = record_address.to_record_key();
1376        let pretty_key = PrettyPrintRecordKey::from(&expected_key).into_owned();
1377
1378        let mut successes = Vec::new();
1379        let mut failures = Vec::new();
1380        let concurrency = peers.len();
1381        let results = stream::iter(
1382            peers
1383                .into_iter()
1384                .map(|(peer_id, addrs)| {
1385                    let network_clone = network.clone();
1386                    let request_clone = request.clone();
1387                    async move {
1388                        let result = network_clone
1389                            .send_request(request_clone, peer_id, addrs.clone())
1390                            .await;
1391                        (peer_id, addrs, result)
1392                    }
1393                }),
1394        )
1395        .buffer_unordered(concurrency)
1396        .collect::<Vec<_>>()
1397        .await;
1398
1399        for res in results {
1400            match res {
1401                (peer_id, _addrs, Ok((Response::Query(QueryResponse::GetReplicatedRecord(result)), _))) => {
1402                    match result {
1403                        Ok((_holder, value)) => {
1404                            // Try to deserialize the record and ensure the Chunk content matches
1405                            let record = Record::new(record_address.to_record_key(), value.to_vec());
1406                            if let Ok(chunk) = try_deserialize_record::<Chunk>(&record) 
1407                            && chunk.network_address().to_record_key() == expected_key {
1408                                successes.push(peer_id);
1409                            } else {
1410                                warn!(
1411                                    "Peer {peer_id:?} responded with an incorrect chunk copy of {pretty_key:?}."
1412                                );
1413                                failures.push(peer_id);
1414                            }
1415                        }
1416                        Err(err) => {
1417                            info!(
1418                                "Peer {peer_id:?} responded with error {err:?} for replicated record {pretty_key:?}."
1419                            );
1420                            failures.push(peer_id);
1421                        }
1422                    }
1423                }
1424                (peer_id, _addrs, Ok((other_response, _))) => {
1425                    warn!(
1426                        "Peer {peer_id:?} responded with unexpected message {other_response:?} for {pretty_key:?}."
1427                    );
1428                    failures.push(peer_id);
1429                }
1430                (peer_id, _addrs, Err(err)) => {
1431                    info!(
1432                        "Failed to reach peer {peer_id:?} for replicated record {pretty_key:?}: {err:?}"
1433                    );
1434                    failures.push(peer_id);
1435                }
1436            }
1437        }
1438
1439        (successes, failures)
1440    }
1441
1442    /// Schedule a future retry for peers that failed to return the replicated record.
1443    async fn schedule_record_fetch_retry(
1444        network: Network,
1445        record_address: NetworkAddress,
1446        failed_peers: Vec<PeerId>,
1447    ) {
1448        let retry_peers: HashSet<_> = failed_peers.into_iter().collect();
1449        if retry_peers.is_empty() {
1450            return;
1451        }
1452
1453        let record_clone = record_address.clone();
1454        let network_clone = network.clone();
1455
1456        tokio::time::sleep(REPLICA_FETCH_RETRY_DELAY).await;
1457        let pretty_key =
1458            PrettyPrintRecordKey::from(&record_clone.to_record_key()).into_owned();
1459
1460        let refreshed_candidates = Self::refresh_retry_candidate_addresses(
1461            &network_clone,
1462            &record_clone,
1463            &retry_peers,
1464        )
1465        .await;
1466
1467        if refreshed_candidates.is_empty() {
1468            info!(
1469                "Skipping replica retry for {pretty_key:?}; no tracked peers remain close to the record."
1470            );
1471            return;
1472        }
1473
1474        let (_, still_failed) = Self::fetch_record_from_peers_with_addresses(
1475            network_clone.clone(),
1476            record_clone.clone(),
1477            refreshed_candidates,
1478        )
1479        .await;
1480
1481        if still_failed.is_empty() {
1482            info!(
1483                "All peers successfully returned {pretty_key:?} during replica retry."
1484            );
1485            return;
1486        }
1487
1488        warn!(
1489            "{} peers still failed to provide {pretty_key:?}; evicting and blacklisting.",
1490            still_failed.len()
1491        );
1492        for peer_id in still_failed {
1493            Self::evict_and_blacklist_peer(network_clone.clone(), peer_id);
1494        }
1495    }
1496
1497    /// Get the latest address information for tracked peers via a DHT query.
1498    async fn refresh_retry_candidate_addresses(
1499        network: &Network,
1500        record_address: &NetworkAddress,
1501        retry_peers: &HashSet<PeerId>,
1502    ) -> Vec<(PeerId, Addresses)> {
1503        let pretty_key =
1504            PrettyPrintRecordKey::from(&record_address.to_record_key()).into_owned();
1505        let Ok(closest_peers) = network.get_closest_peers(record_address).await else {
1506            warn!(
1507                "Failed to refresh peer addresses for {pretty_key:?}; unable to retry replica fetch."
1508            );
1509            return Vec::new();
1510        };
1511
1512        let closest_map: HashMap<PeerId, Addresses> = closest_peers
1513            .into_iter()
1514            .collect();
1515
1516        retry_peers
1517            .iter()
1518            .filter_map(|peer_id| closest_map.get(peer_id).cloned().map(|addrs| (*peer_id, addrs)))
1519            .collect()
1520    }
1521
1522    /// Remove the peer from the routing table and add it to the blacklist.
1523    fn evict_and_blacklist_peer(network: Network, peer_id: PeerId) {
1524        network.remove_peer(peer_id);
1525        network.add_peer_to_blocklist(peer_id);
1526    }
1527
1528    /// Query peers' versions and update local knowledge.
1529    async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1530        // To avoid choking, carry out the queries one by one
1531        for (peer_id, addrs) in peers {
1532            Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1533        }
1534    }
1535
1536    /// Query peer's version and update local knowledge.
1537    async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1538        let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1539        // We can skip passing `addrs` here as the new peer should be part of the kad::RT and swarm can get the addr.
1540        let version = match network.send_request(request, peer, addrs).await {
1541            Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1542                trace!("Fetched peer version {peer:?} as {version:?}");
1543                version
1544            }
1545            Ok(other) => {
1546                info!("Not a fetched peer version {peer:?}, {other:?}");
1547                "none".to_string()
1548            }
1549            Err(err) => {
1550                info!("Failed to fetch peer version {peer:?} with error {err:?}");
1551                // Failed version fetch (which contains dial then re-attempt by itself)
1552                // with error of `DialFailure` indicates the peer could be dead with high chance.
1553                // In that case, the peer shall be removed from the routing table.
1554                if let NetworkError::OutboundError(OutboundFailure::DialFailure) = err {
1555                    network.remove_peer(peer);
1556                    return;
1557                }
1558                "old".to_string()
1559            }
1560        };
1561        network.notify_node_version(peer, version);
1562    }
1563}
1564
1565#[derive(Debug)]
1566struct CloseGroupTracker {
1567    self_address: NetworkAddress,
1568    /// Tracks the closest N peers by (distance, peer_id), sorted by distance to self.
1569    /// Using a tuple key handles the theoretical case where two peers have identical
1570    /// XOR distance to self.
1571    close_up_peers: BTreeSet<(Distance, PeerId)>,
1572    tracked_entries: HashMap<PeerId, BehaviourEntry>,
1573}
1574
1575impl CloseGroupTracker {
1576    fn new(self_peer_id: PeerId) -> Self {
1577        Self {
1578            self_address: NetworkAddress::from(self_peer_id),
1579            close_up_peers: BTreeSet::new(),
1580            tracked_entries: HashMap::new(),
1581        }
1582    }
1583
1584    fn record_peer_added(&mut self, peer_id: PeerId) -> ReplicationDirective {
1585        let distance = self.distance_to_peer(peer_id);
1586
1587        // Check if this peer was already present
1588        let was_present = self.close_up_peers.contains(&(distance, peer_id));
1589
1590        // Determine if the peer is close enough to track
1591        let is_close_enough = self.is_distance_within_close_range(distance);
1592
1593        if is_close_enough {
1594            // Add to close_up_peers
1595            let _ = self.close_up_peers.insert((distance, peer_id));
1596
1597            // If we exceed the limit, remove the farthest peer
1598            if self.close_up_peers.len() > CLOSE_GROUP_TRACKING_LIMIT
1599                && let Some(&(farthest_distance, farthest_peer)) =
1600                    self.close_up_peers.iter().next_back()
1601            {
1602                let _ = self.close_up_peers.remove(&(farthest_distance, farthest_peer));
1603            }
1604        } else {
1605            // Too far, don't track and clean up any existing entry
1606            let _ = self.tracked_entries.remove(&peer_id);
1607            return ReplicationDirective::Ignore;
1608        }
1609
1610        use std::collections::hash_map::Entry;
1611        match self.tracked_entries.entry(peer_id) {
1612            Entry::Vacant(vacant_entry) => {
1613                let _ = vacant_entry.insert(BehaviourEntry::default());
1614                if !was_present {
1615                    ReplicationDirective::Trigger
1616                } else {
1617                    ReplicationDirective::Ignore
1618                }
1619            }
1620            Entry::Occupied(mut occupied_entry) => {
1621                let entry = occupied_entry.get_mut();
1622                if entry.awaiting_rejoin || entry.timer_deadline.is_some() {
1623                    entry.restart_detected = true;
1624                    entry.awaiting_rejoin = false;
1625                    entry.timer_deadline = None;
1626                    ReplicationDirective::Skip
1627                } else if !was_present {
1628                    entry.restart_detected = false;
1629                    ReplicationDirective::Trigger
1630                } else {
1631                    ReplicationDirective::Ignore
1632                }
1633            }
1634        }
1635    }
1636
1637    fn record_peer_removed(&mut self, peer_id: PeerId) -> ReplicationDirective {
1638        let distance = self.distance_to_peer(peer_id);
1639
1640        // Check if this peer is in our close_up_peers
1641        let was_tracked = self.close_up_peers.contains(&(distance, peer_id));
1642
1643        if !was_tracked {
1644            // Not being tracked, i.e. being out-of-range, hence Ignore
1645            return ReplicationDirective::Ignore;
1646        }
1647
1648        // Remove from close_up_peers
1649        let _ = self.close_up_peers.remove(&(distance, peer_id));
1650
1651        let entry = self.tracked_entries.entry(peer_id).or_default();
1652
1653        if entry.restart_detected {
1654            entry.timer_deadline = Some(Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION);
1655            entry.awaiting_rejoin = false;
1656            ReplicationDirective::Skip
1657        } else {
1658            entry.awaiting_rejoin = true;
1659            entry.timer_deadline = None;
1660            ReplicationDirective::Trigger
1661        }
1662    }
1663
1664    fn handle_timer_expiry(&mut self, now: Instant) -> bool {
1665        let mut expired = false;
1666        for entry in self.tracked_entries.values_mut() {
1667            if let Some(deadline) = entry.timer_deadline
1668                && now >= deadline
1669            {
1670                entry.timer_deadline = None;
1671                entry.restart_detected = false;
1672                entry.awaiting_rejoin = false;
1673                expired = true;
1674            }
1675        }
1676
1677        // Keep only entries that are still relevant
1678        self.tracked_entries.retain(|peer_id, entry| {
1679            let is_currently_present = self
1680                .close_up_peers
1681                .iter()
1682                .any(|(_, present_peer)| present_peer == peer_id);
1683
1684            entry.awaiting_rejoin
1685                || entry.restart_detected
1686                || entry.timer_deadline.is_some()
1687                || is_currently_present
1688        });
1689
1690        expired
1691    }
1692
1693    fn is_distance_within_close_range(&self, distance: Distance) -> bool {
1694        // If we haven't reached the limit yet, any peer is close enough
1695        if self.close_up_peers.len() < CLOSE_GROUP_TRACKING_LIMIT {
1696            return true;
1697        }
1698
1699        // Otherwise, check if this distance is closer than the farthest tracked peer
1700        if let Some(&(farthest_distance, _)) = self.close_up_peers.iter().next_back() {
1701            distance < farthest_distance
1702        } else {
1703            true
1704        }
1705    }
1706
1707    fn distance_to_peer(&self, peer_id: PeerId) -> Distance {
1708        self.self_address.distance(&NetworkAddress::from(peer_id))
1709    }
1710}
1711
1712#[derive(Debug, Default)]
1713struct BehaviourEntry {
1714    awaiting_rejoin: bool,
1715    restart_detected: bool,
1716    timer_deadline: Option<Instant>,
1717}
1718
1719#[derive(Debug, PartialEq, Eq)]
1720enum ReplicationDirective {
1721    Trigger,
1722    Skip,
1723    Ignore,
1724}
1725
1726impl ReplicationDirective {
1727    fn should_trigger(&self) -> bool {
1728        matches!(self, Self::Trigger)
1729    }
1730}
1731
1732#[cfg(test)]
1733mod close_group_tracker_tests {
1734    use super::*;
1735    use std::time::Duration;
1736
1737    fn random_peer() -> PeerId {
1738        let keypair = libp2p::identity::Keypair::generate_ed25519();
1739        PeerId::from(keypair.public())
1740    }
1741
1742    #[test]
1743    fn new_peer_triggers_replication() {
1744        let mut tracker = CloseGroupTracker::new(random_peer());
1745        let peer = random_peer();
1746
1747        assert_eq!(
1748            tracker.record_peer_added(peer),
1749            ReplicationDirective::Trigger
1750        );
1751
1752        assert_eq!(
1753            tracker.record_peer_removed(peer),
1754            ReplicationDirective::Trigger
1755        );
1756    }
1757
1758    #[test]
1759    fn restart_detection_skips_replication() {
1760        let mut tracker = CloseGroupTracker::new(random_peer());
1761        let peer = random_peer();
1762
1763        assert_eq!(
1764            tracker.record_peer_added(peer),
1765            ReplicationDirective::Trigger
1766        );
1767
1768        assert_eq!(
1769            tracker.record_peer_removed(peer),
1770            ReplicationDirective::Trigger
1771        );
1772
1773        assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1774
1775        assert_eq!(
1776            tracker.record_peer_removed(peer),
1777            ReplicationDirective::Skip
1778        );
1779
1780        assert!(tracker.handle_timer_expiry(
1781            Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1)
1782        ));
1783
1784        assert_eq!(
1785            tracker.record_peer_added(peer),
1786            ReplicationDirective::Trigger
1787        );
1788    }
1789
1790    #[test]
1791    fn peer_outside_close_group_is_ignored() {
1792        let mut tracker = CloseGroupTracker::new(random_peer());
1793
1794        // Fill the tracker to capacity with random peers
1795        let mut added_peers = Vec::new();
1796        for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1797            let peer = random_peer();
1798            let result = tracker.record_peer_added(peer);
1799            // All should trigger since we're under capacity
1800            assert_eq!(result, ReplicationDirective::Trigger);
1801            added_peers.push(peer);
1802        }
1803
1804        assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1805
1806        // Now add more peers - some will be ignored if they're farther than existing ones
1807        let mut ignored_count = 0;
1808        let mut trigger_count = 0;
1809        for _ in 0..50 {
1810            let peer = random_peer();
1811            match tracker.record_peer_added(peer) {
1812                ReplicationDirective::Ignore => ignored_count += 1,
1813                ReplicationDirective::Trigger => trigger_count += 1,
1814                _ => {}
1815            }
1816        }
1817
1818        // At capacity, new peers are only accepted if closer than the farthest
1819        // Some should be ignored (too far), some should trigger (closer than farthest)
1820        assert!(
1821            ignored_count > 0 || trigger_count > 0,
1822            "Expected some peers to be processed"
1823        );
1824
1825        // Tracker should still only have CLOSE_GROUP_TRACKING_LIMIT peers
1826        assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1827    }
1828
1829    #[test]
1830    fn removal_of_untracked_peer_is_ignored() {
1831        let mut tracker = CloseGroupTracker::new(random_peer());
1832
1833        // Try to remove a peer that was never added
1834        let unknown_peer = random_peer();
1835        assert_eq!(
1836            tracker.record_peer_removed(unknown_peer),
1837            ReplicationDirective::Ignore
1838        );
1839    }
1840
1841    #[test]
1842    fn timer_expiry_resets_restart_state() {
1843        let mut tracker = CloseGroupTracker::new(random_peer());
1844        let peer = random_peer();
1845
1846        // Add and remove peer (normal behavior)
1847        assert_eq!(
1848            tracker.record_peer_added(peer),
1849            ReplicationDirective::Trigger
1850        );
1851        assert_eq!(
1852            tracker.record_peer_removed(peer),
1853            ReplicationDirective::Trigger
1854        );
1855
1856        // Peer rejoins quickly - detected as restart, skipped
1857        assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1858
1859        // Remove again - still in restart mode, skipped with timer set
1860        assert_eq!(
1861            tracker.record_peer_removed(peer),
1862            ReplicationDirective::Skip
1863        );
1864
1865        // Timer hasn't expired yet - no change
1866        assert!(!tracker.handle_timer_expiry(Instant::now()));
1867
1868        // Timer expires
1869        let after_expiry = Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1870        assert!(tracker.handle_timer_expiry(after_expiry));
1871
1872        // Now the peer should trigger replication again (state reset)
1873        assert_eq!(
1874            tracker.record_peer_added(peer),
1875            ReplicationDirective::Trigger
1876        );
1877    }
1878
1879    #[test]
1880    fn eviction_maintains_closest_peers() {
1881        let self_peer = random_peer();
1882        let mut tracker = CloseGroupTracker::new(self_peer);
1883
1884        // Add exactly CLOSE_GROUP_TRACKING_LIMIT peers
1885        let mut peers_with_distances: Vec<(PeerId, Distance)> = Vec::new();
1886        for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1887            let peer = random_peer();
1888            let distance = tracker.distance_to_peer(peer);
1889            let _ = tracker.record_peer_added(peer);
1890            peers_with_distances.push((peer, distance));
1891        }
1892
1893        assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1894
1895        // Sort by distance to know which is farthest
1896        peers_with_distances.sort_by_key(|(_, d)| *d);
1897        let farthest_distance = peers_with_distances.last().map(|(_, d)| *d).unwrap();
1898
1899        // Add a new peer that's closer than the farthest
1900        // Keep trying until we find one (random, so may take a few tries)
1901        let mut found_closer = false;
1902        for _ in 0..100 {
1903            let new_peer = random_peer();
1904            let new_distance = tracker.distance_to_peer(new_peer);
1905            if new_distance < farthest_distance {
1906                let result = tracker.record_peer_added(new_peer);
1907                assert_eq!(result, ReplicationDirective::Trigger);
1908                // Should still have exactly CLOSE_GROUP_TRACKING_LIMIT peers
1909                assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1910                found_closer = true;
1911                break;
1912            }
1913        }
1914
1915        // Note: This assertion might occasionally fail due to randomness,
1916        // but with 100 attempts it's extremely unlikely
1917        assert!(
1918            found_closer,
1919            "Could not find a peer closer than the farthest in 100 attempts"
1920        );
1921    }
1922
1923    #[test]
1924    fn tracked_entries_cleaned_up_on_timer_expiry() {
1925        let mut tracker = CloseGroupTracker::new(random_peer());
1926        let peer = random_peer();
1927
1928        // Add, remove, re-add (restart detected), remove again (timer set)
1929        let _ = tracker.record_peer_added(peer);
1930        let _ = tracker.record_peer_removed(peer);
1931        let _ = tracker.record_peer_added(peer);
1932        let _ = tracker.record_peer_removed(peer);
1933
1934        // Peer should have a tracked entry with timer
1935        assert!(tracker.tracked_entries.contains_key(&peer));
1936
1937        // After timer expiry, entry should be cleaned up since peer is not present
1938        let after_expiry = Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1939        let _ = tracker.handle_timer_expiry(after_expiry);
1940
1941        // Entry should be removed since peer is not in close_up_peers
1942        // and no longer has active flags
1943        assert!(!tracker.tracked_entries.contains_key(&peer));
1944    }
1945}
1946
1947async fn scoring_peer(
1948    network: Network,
1949    peer: (PeerId, Addresses),
1950    request: Request,
1951    expected_proofs: HashMap<NetworkAddress, ChunkProof>,
1952) -> usize {
1953    let peer_id = peer.0;
1954    let start = Instant::now();
1955    let responses = network
1956        .send_and_get_responses(&[peer], &request, true)
1957        .await;
1958
1959    if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
1960        responses.get(&peer_id)
1961    {
1962        if answers.is_empty() {
1963            info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
1964            return 0;
1965        }
1966        let elapsed = start.elapsed();
1967
1968        let mut received_proofs = vec![];
1969        for (addr, proof) in answers {
1970            if let Ok(proof) = proof {
1971                received_proofs.push((addr.clone(), proof.clone()));
1972            }
1973        }
1974
1975        let score = mark_peer(elapsed, received_proofs, &expected_proofs);
1976        info!(
1977            "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
1978            answers.len()
1979        );
1980        score
1981    } else {
1982        info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
1983        0
1984    }
1985}
1986
1987// Based on following metrics:
1988//   * the duration
1989//   * is there false answer
1990//   * percentage of correct answers among the expected closest
1991// The higher the score, the better confidence on the peer
1992fn mark_peer(
1993    duration: Duration,
1994    answers: Vec<(NetworkAddress, ChunkProof)>,
1995    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1996) -> usize {
1997    let duration_score = duration_score_scheme(duration);
1998    let challenge_score = challenge_score_scheme(answers, expected_proofs);
1999
2000    duration_score * challenge_score
2001}
2002
2003// Less duration shall get higher score
2004fn duration_score_scheme(duration: Duration) -> usize {
2005    // So far just a simple stepped scheme, capped by HIGHEST_SCORE
2006    let in_ms = if let Some(value) = duration.as_millis().to_usize() {
2007        value
2008    } else {
2009        info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
2010        1000
2011    };
2012
2013    let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
2014    HIGHEST_SCORE - step
2015}
2016
2017// Any false answer shall result in 0 score immediately
2018fn challenge_score_scheme(
2019    answers: Vec<(NetworkAddress, ChunkProof)>,
2020    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
2021) -> usize {
2022    let mut correct_answers = 0;
2023    for (addr, chunk_proof) in answers {
2024        if let Some(expected_proof) = expected_proofs.get(&addr) {
2025            if expected_proof.verify(&chunk_proof) {
2026                correct_answers += 1;
2027            } else {
2028                info!("Spot a false answer to the challenge regarding {addr:?}");
2029                // Any false answer shall result in 0 score immediately
2030                return 0;
2031            }
2032        }
2033    }
2034    // TODO: For those answers not among the expected_proofs,
2035    //       it could be due to having different knowledge of records to us.
2036    //       shall we:
2037    //         * set the target being close to us, so that neighbours sharing same knowledge in higher chance
2038    //         * fetch from local to testify
2039    //         * fetch from network to testify
2040    std::cmp::min(
2041        HIGHEST_SCORE,
2042        HIGHEST_SCORE * correct_answers / expected_proofs.len(),
2043    )
2044}
2045
2046#[cfg(test)]
2047mod tests {
2048    use super::*;
2049    use std::str::FromStr;
2050
2051    #[test]
2052    fn test_no_local_peers() {
2053        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
2054        let target = NetworkAddress::from(PeerId::random());
2055        let num_of_peers = Some(5);
2056        let range = None;
2057        let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
2058
2059        assert_eq!(result, vec![]);
2060    }
2061
2062    #[test]
2063    fn test_fewer_local_peers_than_num_of_peers() {
2064        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2065            (
2066                PeerId::random(),
2067                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2068            ),
2069            (
2070                PeerId::random(),
2071                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2072            ),
2073            (
2074                PeerId::random(),
2075                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2076            ),
2077        ];
2078        let target = NetworkAddress::from(PeerId::random());
2079        let num_of_peers = Some(2);
2080        let range = None;
2081        let result = Node::calculate_get_closest_peers(
2082            local_peers.clone(),
2083            target.clone(),
2084            num_of_peers,
2085            range,
2086        );
2087
2088        // Result shall be sorted and truncated
2089        let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2090            .iter()
2091            .map(|(peer_id, multi_addrs)| {
2092                let addr = NetworkAddress::from(*peer_id);
2093                (addr, multi_addrs.clone())
2094            })
2095            .collect();
2096        expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
2097        let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
2098
2099        assert_eq!(expected_result, result);
2100    }
2101
2102    #[test]
2103    fn test_with_range_and_num_of_peers() {
2104        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2105            (
2106                PeerId::random(),
2107                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2108            ),
2109            (
2110                PeerId::random(),
2111                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2112            ),
2113            (
2114                PeerId::random(),
2115                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2116            ),
2117        ];
2118        let target = NetworkAddress::from(PeerId::random());
2119        let num_of_peers = Some(0);
2120        let range_value = [128; 32];
2121        let range = Some(range_value);
2122        let result = Node::calculate_get_closest_peers(
2123            local_peers.clone(),
2124            target.clone(),
2125            num_of_peers,
2126            range,
2127        );
2128
2129        // Range shall be preferred, i.e. the result peers shall all within the range
2130        let distance = U256::from_big_endian(&range_value);
2131        let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2132            .into_iter()
2133            .filter_map(|(peer_id, multi_addrs)| {
2134                let addr = NetworkAddress::from(peer_id);
2135                if target.distance(&addr).0 <= distance {
2136                    Some((addr, multi_addrs.clone()))
2137                } else {
2138                    None
2139                }
2140            })
2141            .collect();
2142
2143        assert_eq!(expected_result, result);
2144    }
2145
2146    mod merkle_payment_tests {
2147        use super::*;
2148
2149        /// Test that timestamp validation accepts valid timestamps (within the acceptable window)
2150        #[test]
2151        fn test_timestamp_validation_accepts_valid_timestamp() {
2152            let now = std::time::SystemTime::now()
2153                .duration_since(std::time::UNIX_EPOCH)
2154                .unwrap()
2155                .as_secs();
2156
2157            // Valid timestamp: 1 hour ago
2158            let valid_timestamp = now - 3600;
2159
2160            // Validate timestamp
2161            let age = now.saturating_sub(valid_timestamp);
2162
2163            assert!(
2164                valid_timestamp <= now,
2165                "Valid timestamp should not be in the future"
2166            );
2167            assert!(
2168                age <= MERKLE_PAYMENT_EXPIRATION,
2169                "Valid timestamp should not be expired"
2170            );
2171        }
2172
2173        /// Test that timestamp validation rejects future timestamps
2174        #[test]
2175        fn test_timestamp_validation_rejects_future_timestamp() {
2176            let now = std::time::SystemTime::now()
2177                .duration_since(std::time::UNIX_EPOCH)
2178                .unwrap()
2179                .as_secs();
2180
2181            // Future timestamp: 1 hour in the future
2182            let future_timestamp = now + 3600;
2183
2184            // Timestamp should be rejected
2185            assert!(
2186                future_timestamp > now,
2187                "Future timestamp should be rejected"
2188            );
2189        }
2190
2191        /// Test that timestamp validation rejects expired timestamps
2192        #[test]
2193        fn test_timestamp_validation_rejects_expired_timestamp() {
2194            let now = std::time::SystemTime::now()
2195                .duration_since(std::time::UNIX_EPOCH)
2196                .unwrap()
2197                .as_secs();
2198
2199            // Expired timestamp: 8 days ago (> 7 day expiration)
2200            let expired_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 86400);
2201
2202            // Calculate age
2203            let age = now.saturating_sub(expired_timestamp);
2204
2205            // Timestamp should be rejected
2206            assert!(
2207                age > MERKLE_PAYMENT_EXPIRATION,
2208                "Expired timestamp should be rejected"
2209            );
2210        }
2211
2212        /// Test timestamp at the exact expiration boundary (should be rejected)
2213        #[test]
2214        fn test_timestamp_validation_at_expiration_boundary() {
2215            let now = std::time::SystemTime::now()
2216                .duration_since(std::time::UNIX_EPOCH)
2217                .unwrap()
2218                .as_secs();
2219
2220            // Timestamp exactly at expiration boundary
2221            let boundary_timestamp = now - MERKLE_PAYMENT_EXPIRATION;
2222
2223            let age = now.saturating_sub(boundary_timestamp);
2224
2225            // At the boundary, age == MERKLE_PAYMENT_EXPIRATION
2226            assert_eq!(age, MERKLE_PAYMENT_EXPIRATION);
2227            // The validation uses >, so this should pass
2228            assert!(
2229                age <= MERKLE_PAYMENT_EXPIRATION,
2230                "Timestamp exactly at boundary should not be rejected"
2231            );
2232        }
2233
2234        /// Test timestamp just beyond expiration boundary (should be rejected)
2235        #[test]
2236        fn test_timestamp_validation_beyond_expiration_boundary() {
2237            let now = std::time::SystemTime::now()
2238                .duration_since(std::time::UNIX_EPOCH)
2239                .unwrap()
2240                .as_secs();
2241
2242            // Timestamp just beyond expiration boundary (1 second past)
2243            let beyond_boundary_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 1);
2244
2245            let age = now.saturating_sub(beyond_boundary_timestamp);
2246
2247            assert!(
2248                age > MERKLE_PAYMENT_EXPIRATION,
2249                "Timestamp beyond boundary should be rejected"
2250            );
2251        }
2252
2253        /// Test timestamp at current time (should be accepted)
2254        #[test]
2255        fn test_timestamp_validation_at_current_time() {
2256            let now = std::time::SystemTime::now()
2257                .duration_since(std::time::UNIX_EPOCH)
2258                .unwrap()
2259                .as_secs();
2260
2261            // Timestamp at current time
2262            let current_timestamp = now;
2263
2264            let age = now.saturating_sub(current_timestamp);
2265
2266            assert!(
2267                current_timestamp <= now,
2268                "Current timestamp should not be in future"
2269            );
2270            assert!(
2271                age <= MERKLE_PAYMENT_EXPIRATION,
2272                "Current timestamp should not be expired"
2273            );
2274            assert_eq!(age, 0, "Age should be 0 for current timestamp");
2275        }
2276
2277        /// Test timestamp near future boundary (1 second in future)
2278        #[test]
2279        fn test_timestamp_validation_near_future_boundary() {
2280            let now = std::time::SystemTime::now()
2281                .duration_since(std::time::UNIX_EPOCH)
2282                .unwrap()
2283                .as_secs();
2284
2285            // Timestamp 1 second in the future
2286            let near_future_timestamp = now + 1;
2287
2288            assert!(
2289                near_future_timestamp > now,
2290                "Near-future timestamp should be rejected"
2291            );
2292        }
2293
2294        /// Test expiration constant is set correctly (7 days = 604800 seconds)
2295        #[test]
2296        fn test_merkle_payment_expiration_constant() {
2297            const SEVEN_DAYS_IN_SECONDS: u64 = 7 * 24 * 60 * 60;
2298            assert_eq!(
2299                MERKLE_PAYMENT_EXPIRATION, SEVEN_DAYS_IN_SECONDS,
2300                "MERKLE_PAYMENT_EXPIRATION should be 7 days"
2301            );
2302        }
2303    }
2304}