Skip to main content

ant_node/
node.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use super::{
10    Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::bootstrap::Bootstrap;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_evm::merkle_payments::MERKLE_PAYMENT_EXPIRATION;
22use ant_protocol::{
23    CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
24    error::Error as ProtocolError,
25    messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
26    storage::{Chunk, ValidationType, try_deserialize_record},
27};
28use bytes::Bytes;
29use futures::stream::{self, StreamExt};
30use itertools::Itertools;
31use libp2p::{
32    Multiaddr, PeerId,
33    identity::Keypair,
34    kad::{KBucketDistance as Distance, Record, U256},
35};
36use num_traits::cast::ToPrimitive;
37use rand::{
38    Rng, SeedableRng,
39    rngs::{OsRng, StdRng},
40    seq::SliceRandom,
41    thread_rng,
42};
43use std::{
44    collections::{BTreeSet, HashMap, HashSet},
45    net::SocketAddr,
46    path::PathBuf,
47    sync::{
48        Arc,
49        atomic::{AtomicUsize, Ordering},
50    },
51    time::{Duration, Instant},
52};
53use tokio::sync::{Mutex, watch};
54use tokio::{
55    sync::mpsc::Receiver,
56    task::{JoinSet, spawn},
57};
58
59/// Interval to trigger replication of all records to all peers.
60/// This is the max time it should take. Minimum interval at any node will be half this
61pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
62
63/// Interval to trigger storage challenge.
64/// This is the max time it should take. Minimum interval at any node will be half this
65const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
66
67/// Interval to update the nodes uptime metric
68const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
69
70/// Interval to clean up unrelevant records
71/// This is the max time it should take. Minimum interval at any node will be half this
72const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
73
74/// Highest score to achieve from each metric sub-sector during StorageChallenge.
75const HIGHEST_SCORE: usize = 100;
76
77/// Any nodes bearing a score below this shall be considered as bad.
78/// Max is to be 100 * 100
79const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
80
81/// in ms, expecting average StorageChallenge complete time to be around 250ms.
82const TIME_STEP: usize = 20;
83
84/// Delay before retrying a failed replicated record fetch.
85const REPLICA_FETCH_RETRY_DELAY: Duration = Duration::from_secs(15 * 60);
86
87/// Number of peers to query for a replicated record health check.
88const REPLICA_FETCH_PEER_COUNT: usize = 5;
89
90/// Minimum number of successful replicas required to consider the record healthy.
91const MIN_HEALTHY_REPLICA_COUNT: usize = 3;
92
93/// Index (0-based, including self) used to derive the distance threshold for nearby records.
94const CLOSE_NEIGHBOUR_DISTANCE_INDEX: usize = 7;
95
96/// Maximum number of closest peers to track for replication behaviour detection.
97///
98/// Set to 20 (roughly 4x CLOSE_GROUP_SIZE) to cover peers that are close enough
99/// to affect data responsibility during churn. Tracking only the closest peers
100/// keeps memory bounded while still detecting restart patterns in the critical
101/// zone where replication matters most.
102const CLOSE_GROUP_TRACKING_LIMIT: usize = 20;
103
104/// Grace period before triggering replication after detecting a peer restart.
105///
106/// When a peer is detected as restarting (removed then quickly re-added),
107/// replication is suppressed for this duration to allow the peer to stabilize.
108/// Set to 90 seconds which is:
109/// - Long enough to cover typical node restart times (30-60s)
110/// - Short enough to ensure data availability isn't compromised
111/// - Approximately half of PERIODIC_REPLICATION_INTERVAL_MAX_S, so the periodic
112///   replication will catch any missed updates within a reasonable timeframe
113const CLOSE_GROUP_RESTART_SUPPRESSION: Duration = Duration::from_secs(90);
114
115/// Helper to build and run a Node
116pub struct NodeBuilder {
117    addr: SocketAddr,
118    bootstrap: Bootstrap,
119    evm_address: RewardsAddress,
120    evm_network: EvmNetwork,
121    identity_keypair: Keypair,
122    local: bool,
123    #[cfg(feature = "open-metrics")]
124    /// Set to Some to enable the metrics server
125    metrics_server_port: Option<u16>,
126    no_upnp: bool,
127    relay_client: bool,
128    root_dir: PathBuf,
129}
130
131impl NodeBuilder {
132    /// Instantiate the builder. The initial peers can either be supplied via the `initial_peers` method
133    /// or fetched from the bootstrap cache set using `bootstrap_cache` method.
134    pub fn new(
135        identity_keypair: Keypair,
136        bootstrap_flow: Bootstrap,
137        evm_address: RewardsAddress,
138        evm_network: EvmNetwork,
139        addr: SocketAddr,
140        root_dir: PathBuf,
141    ) -> Self {
142        Self {
143            addr,
144            bootstrap: bootstrap_flow,
145            evm_address,
146            evm_network,
147            identity_keypair,
148            local: false,
149            #[cfg(feature = "open-metrics")]
150            metrics_server_port: None,
151            no_upnp: false,
152            relay_client: false,
153            root_dir,
154        }
155    }
156
157    /// Set the flag to indicate if the node is running in local mode
158    pub fn local(&mut self, local: bool) {
159        self.local = local;
160    }
161
162    #[cfg(feature = "open-metrics")]
163    /// Set the port for the OpenMetrics server. Defaults to a random port if not set
164    pub fn metrics_server_port(&mut self, port: Option<u16>) {
165        self.metrics_server_port = port;
166    }
167
168    /// Set the flag to make the node act as a relay client
169    pub fn relay_client(&mut self, relay_client: bool) {
170        self.relay_client = relay_client;
171    }
172
173    /// Set the flag to disable UPnP for the node
174    pub fn no_upnp(&mut self, no_upnp: bool) {
175        self.no_upnp = no_upnp;
176    }
177
178    /// Asynchronously runs a new node instance, setting up the swarm driver,
179    /// creating a data storage, and handling network events. Returns the
180    /// created `RunningNode` which contains a `NodeEventsChannel` for listening
181    /// to node-related events.
182    ///
183    /// # Returns
184    ///
185    /// A `RunningNode` instance.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if there is a problem initializing the Network.
190    pub fn build_and_run(self) -> Result<RunningNode> {
191        // setup metrics
192        #[cfg(feature = "open-metrics")]
193        let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
194            // metadata registry
195            let mut metrics_registries = MetricsRegistries::default();
196            let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
197
198            (Some(metrics_recorder), metrics_registries)
199        } else {
200            (None, MetricsRegistries::default())
201        };
202
203        // create a shutdown signal channel
204        let (shutdown_tx, shutdown_rx) = watch::channel(false);
205
206        // init network
207        let network_config = NetworkConfig {
208            keypair: self.identity_keypair,
209            local: self.local,
210            listen_addr: self.addr,
211            root_dir: self.root_dir.clone(),
212            shutdown_rx: shutdown_rx.clone(),
213            bootstrap: self.bootstrap,
214            no_upnp: self.no_upnp,
215            relay_client: self.relay_client,
216            custom_request_timeout: None,
217            #[cfg(feature = "open-metrics")]
218            metrics_registries,
219            #[cfg(feature = "open-metrics")]
220            metrics_server_port: self.metrics_server_port,
221        };
222        let (network, network_event_receiver) = Network::init(network_config)?;
223
224        // init node
225        let node_events_channel = NodeEventsChannel::default();
226        let node = NodeInner {
227            network: network.clone(),
228            events_channel: node_events_channel.clone(),
229            close_group_tracker: Mutex::new(CloseGroupTracker::new(network.peer_id())),
230            reward_address: self.evm_address,
231            #[cfg(feature = "open-metrics")]
232            metrics_recorder,
233            evm_network: self.evm_network,
234        };
235        let node = Node {
236            inner: Arc::new(node),
237        };
238
239        // Run the node
240        node.run(network_event_receiver, shutdown_rx);
241        let running_node = RunningNode {
242            shutdown_sender: shutdown_tx,
243            network,
244            node_events_channel,
245            root_dir_path: self.root_dir,
246            rewards_address: self.evm_address,
247        };
248
249        Ok(running_node)
250    }
251}
252
253/// `Node` represents a single node in the distributed network. It handles
254/// network events, processes incoming requests, interacts with the data
255/// storage, and broadcasts node-related events.
256#[derive(Clone)]
257pub(crate) struct Node {
258    inner: Arc<NodeInner>,
259}
260
261/// The actual implementation of the Node. The other is just a wrapper around this, so that we don't expose
262/// the Arc from the interface.
263struct NodeInner {
264    events_channel: NodeEventsChannel,
265    network: Network,
266    close_group_tracker: Mutex<CloseGroupTracker>,
267    #[cfg(feature = "open-metrics")]
268    metrics_recorder: Option<NodeMetricsRecorder>,
269    reward_address: RewardsAddress,
270    evm_network: EvmNetwork,
271}
272
273impl Node {
274    /// Returns the NodeEventsChannel
275    pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
276        &self.inner.events_channel
277    }
278
279    /// Returns the instance of Network
280    pub(crate) fn network(&self) -> &Network {
281        &self.inner.network
282    }
283
284    fn close_group_tracker(&self) -> &Mutex<CloseGroupTracker> {
285        &self.inner.close_group_tracker
286    }
287
288    #[cfg(feature = "open-metrics")]
289    /// Returns a reference to the NodeMetricsRecorder if the `open-metrics` feature flag is enabled
290    /// This is used to record various metrics for the node.
291    pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
292        self.inner.metrics_recorder.as_ref()
293    }
294
295    /// Returns the reward address of the node
296    pub(crate) fn reward_address(&self) -> &RewardsAddress {
297        &self.inner.reward_address
298    }
299
300    pub(crate) fn evm_network(&self) -> &EvmNetwork {
301        &self.inner.evm_network
302    }
303
304    /// Spawns a task to process for `NetworkEvents`.
305    /// Returns both tasks as JoinHandle<()>.
306    fn run(
307        self,
308        mut network_event_receiver: Receiver<NetworkEvent>,
309        mut shutdown_rx: watch::Receiver<bool>,
310    ) {
311        let mut rng = StdRng::from_entropy();
312
313        let peers_connected = Arc::new(AtomicUsize::new(0));
314
315        let _node_task = spawn(async move {
316            // use a random activity timeout to ensure that the nodes do not sync when messages
317            // are being transmitted.
318            let replication_interval: u64 = rng.gen_range(
319                PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
320            );
321            let replication_interval_time = Duration::from_secs(replication_interval);
322            debug!("Replication interval set to {replication_interval_time:?}");
323
324            let mut replication_interval = tokio::time::interval(replication_interval_time);
325            let _ = replication_interval.tick().await; // first tick completes immediately
326
327            let mut uptime_metrics_update_interval =
328                tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
329            let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately
330
331            // use a random activity timeout to ensure that the nodes do not sync on work,
332            // causing an overall CPU spike.
333            let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
334                UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
335                    ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
336            );
337            let irrelevant_records_cleanup_interval_time =
338                Duration::from_secs(irrelevant_records_cleanup_interval);
339            let mut irrelevant_records_cleanup_interval =
340                tokio::time::interval(irrelevant_records_cleanup_interval_time);
341            let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately
342
343            // use a random neighbour storage challenge ticker to ensure
344            // neighbours do not carryout challenges at the same time
345            let storage_challenge_interval: u64 =
346                rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
347            let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
348            debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
349
350            let mut storage_challenge_interval =
351                tokio::time::interval(storage_challenge_interval_time);
352            let _ = storage_challenge_interval.tick().await; // first tick completes immediately
353
354            loop {
355                let peers_connected = &peers_connected;
356
357                tokio::select! {
358                    // Check for a shutdown command.
359                    result = shutdown_rx.changed() => {
360                        if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
361                            info!("Shutdown signal received or sender dropped. Exiting network events loop.");
362                            break;
363                        }
364                    },
365                    net_event = network_event_receiver.recv() => {
366                        match net_event {
367                            Some(event) => {
368                                let start = Instant::now();
369                                let event_string = format!("{event:?}");
370
371                                self.handle_network_event(event, peers_connected).await;
372                                trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
373
374                            }
375                            None => {
376                                error!("The `NetworkEvent` channel is closed");
377                                self.events_channel().broadcast(NodeEvent::ChannelClosed);
378                                break;
379                            }
380                        }
381                    }
382                    // runs every replication_interval time
383                    _ = replication_interval.tick() => {
384                        let now = Instant::now();
385
386                        {
387                            let mut tracker = self.close_group_tracker().lock().await;
388                            if tracker.handle_timer_expiry(now) {
389                                trace!("Replication timer expired for tracked peers; periodic run will cover it");
390                            }
391                        }
392
393                        let start = now;
394                        let network = self.network().clone();
395                        self.record_metrics(Marker::IntervalReplicationTriggered);
396
397                        let _handle = spawn(async move {
398                            Self::try_interval_replication(network);
399                            trace!("Periodic replication took {:?}", start.elapsed());
400                        });
401                    }
402                    _ = uptime_metrics_update_interval.tick() => {
403                        #[cfg(feature = "open-metrics")]
404                        if let Some(metrics_recorder) = self.metrics_recorder() {
405                            let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
406                        }
407                    }
408                    _ = irrelevant_records_cleanup_interval.tick() => {
409                        let network = self.network().clone();
410
411                        let _handle = spawn(async move {
412                            Self::trigger_irrelevant_record_cleanup(network);
413                        });
414                    }
415                    // runs every storage_challenge_interval time
416                    _ = storage_challenge_interval.tick() => {
417                        let start = Instant::now();
418                        debug!("Periodic storage challenge triggered");
419                        let network = self.network().clone();
420
421                        let _handle = spawn(async move {
422                            Self::storage_challenge(network).await;
423                            trace!("Periodic storage challenge took {:?}", start.elapsed());
424                        });
425                    }
426                }
427            }
428        });
429    }
430
431    /// Calls Marker::log() to insert the marker into the log files.
432    /// Also calls NodeMetrics::record() to record the metric if the `open-metrics` feature flag is enabled.
433    pub(crate) fn record_metrics(&self, marker: Marker) {
434        marker.log();
435        #[cfg(feature = "open-metrics")]
436        if let Some(metrics_recorder) = self.metrics_recorder() {
437            metrics_recorder.record(marker)
438        }
439    }
440
441    // **** Private helpers *****
442
443    /// Handle a network event.
444    /// Spawns a thread for any likely long running tasks
445    async fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
446        let start = Instant::now();
447        let event_string = format!("{event:?}");
448        let event_header;
449
450        // Reducing non-mandatory logging
451        if let NetworkEvent::QueryRequestReceived {
452            query: Query::GetVersion { .. },
453            ..
454        } = event
455        {
456            trace!("Handling NetworkEvent {event_string}");
457        } else {
458            debug!("Handling NetworkEvent {event_string}");
459        }
460
461        match event {
462            NetworkEvent::PeerAdded(peer_id, connected_peers) => {
463                event_header = "PeerAdded";
464                // increment peers_connected and send ConnectedToNetwork event if have connected to K_VALUE peers
465                let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
466                if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
467                    self.events_channel()
468                        .broadcast(NodeEvent::ConnectedToNetwork);
469                }
470
471                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
472                self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
473
474                // try query peer version
475                let network = self.network().clone();
476                let _handle = spawn(async move {
477                    Self::try_query_peer_version(network, peer_id, Default::default()).await;
478                });
479
480                let replication_decision = {
481                    let mut tracker = self.close_group_tracker().lock().await;
482                    tracker.record_peer_added(peer_id)
483                };
484
485                if replication_decision.should_trigger() {
486                    let network = self.network().clone();
487                    self.record_metrics(Marker::IntervalReplicationTriggered);
488                    let _handle = spawn(async move {
489                        Self::try_interval_replication(network);
490                    });
491                } else if matches!(replication_decision, ReplicationDirective::Skip) {
492                    trace!(
493                        "Replication skipped for {peer_id:?} addition due to restart behaviour tracking"
494                    );
495                }
496            }
497            NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
498                event_header = "PeerRemoved";
499                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
500                self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
501
502                let self_id = self.network().peer_id();
503                let distance =
504                    NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
505                info!(
506                    "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
507                    distance.ilog2()
508                );
509
510                let replication_decision = {
511                    let mut tracker = self.close_group_tracker().lock().await;
512                    tracker.record_peer_removed(peer_id)
513                };
514
515                if replication_decision.should_trigger() {
516                    let network = self.network().clone();
517                    self.record_metrics(Marker::IntervalReplicationTriggered);
518                    let _handle = spawn(async move {
519                        Self::try_interval_replication(network);
520                    });
521                } else if matches!(replication_decision, ReplicationDirective::Skip) {
522                    trace!(
523                        "Replication skipped for {peer_id:?} removal due to restart behaviour tracking"
524                    );
525                }
526            }
527            NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
528                event_header = "PeerWithUnsupportedProtocol";
529            }
530            NetworkEvent::NewListenAddr(_) => {
531                event_header = "NewListenAddr";
532            }
533            NetworkEvent::ResponseReceived { res } => {
534                event_header = "ResponseReceived";
535                if let Err(err) = self.handle_response(res) {
536                    error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
537                }
538            }
539            NetworkEvent::KeysToFetchForReplication(keys) => {
540                event_header = "KeysToFetchForReplication";
541                self.record_metrics(Marker::fetching_keys_for_replication(&keys));
542
543                if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
544                    error!("Error while trying to fetch replicated data {err:?}");
545                }
546            }
547            NetworkEvent::QueryRequestReceived { query, channel } => {
548                event_header = "QueryRequestReceived";
549                let node = self.clone();
550                let payment_address = *self.reward_address();
551
552                let _handle = spawn(async move {
553                    let network = node.network().clone();
554                    let res = Self::handle_query(node, query, payment_address).await;
555
556                    // Reducing non-mandatory logging
557                    if let Response::Query(QueryResponse::GetVersion { .. }) = res {
558                        trace!("Sending response {res:?}");
559                    } else {
560                        debug!("Sending response {res:?}");
561                    }
562
563                    network.send_response(res, channel);
564                });
565            }
566            NetworkEvent::UnverifiedRecord(record) => {
567                event_header = "UnverifiedRecord";
568                // queries can be long running and require validation, so we spawn a task to handle them
569                let self_clone = self.clone();
570                let _handle = spawn(async move {
571                    let key = PrettyPrintRecordKey::from(&record.key).into_owned();
572                    match self_clone.validate_and_store_record(record).await {
573                        Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
574                        Err(err) => {
575                            self_clone.record_metrics(Marker::RecordRejected(&key, &err));
576                        }
577                    }
578                });
579            }
580            NetworkEvent::TerminateNode { reason } => {
581                event_header = "TerminateNode";
582                error!("Received termination from swarm_driver due to {reason:?}");
583                self.events_channel()
584                    .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
585            }
586            NetworkEvent::FailedToFetchHolders(bad_nodes) => {
587                event_header = "FailedToFetchHolders";
588                let network = self.network().clone();
589                let pretty_log: Vec<_> = bad_nodes
590                    .iter()
591                    .map(|(peer_id, record_key)| {
592                        let pretty_key = PrettyPrintRecordKey::from(record_key);
593                        (peer_id, pretty_key)
594                    })
595                    .collect();
596                // Note: this log will be checked in CI, and expecting `not appear`.
597                //       any change to the keyword `failed to fetch` shall incur
598                //       correspondent CI script change as well.
599                debug!(
600                    "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
601                );
602                let _handle = spawn(async move {
603                    for (peer_id, record_key) in bad_nodes {
604                        // Obsoleted fetch request (due to flooded in fresh replicates) could result
605                        // in peer to be claimed as bad, as local copy blocks the entry to be cleared.
606                        if let Ok(false) = network.is_record_key_present_locally(&record_key).await
607                        {
608                            error!(
609                                "From peer {peer_id:?}, failed to fetch record {:?}",
610                                PrettyPrintRecordKey::from(&record_key)
611                            );
612                            network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
613                        }
614                    }
615                });
616            }
617            NetworkEvent::QuoteVerification { quotes } => {
618                event_header = "QuoteVerification";
619                let network = self.network().clone();
620
621                let _handle = spawn(async move {
622                    quotes_verification(&network, quotes).await;
623                });
624            }
625            NetworkEvent::FreshReplicateToFetch { holder, keys } => {
626                event_header = "FreshReplicateToFetch";
627                self.fresh_replicate_to_fetch(holder, keys);
628            }
629            NetworkEvent::PeersForVersionQuery(peers) => {
630                event_header = "PeersForVersionQuery";
631                let network = self.network().clone();
632                let _handle = spawn(async move {
633                    Self::query_peers_version(network, peers).await;
634                });
635            }
636            NetworkEvent::NetworkWideReplication { keys } => {
637                event_header = "NetworkWideReplication";
638                self.perform_network_wide_replication(keys);
639            }
640        }
641
642        trace!(
643            "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
644            start.elapsed()
645        );
646    }
647
648    // Handle the response that was not awaited at the call site
649    fn handle_response(&self, response: Response) -> Result<()> {
650        match response {
651            Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
652                // This should actually have been short-circuted when received
653                warn!("Mishandled replicate response, should be handled earlier");
654            }
655            Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
656                error!(
657                    "Response to replication shall be handled by called not by common handler, {resp:?}"
658                );
659            }
660            Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
661                // No need to handle
662            }
663            other => {
664                warn!("handle_response not implemented for {other:?}");
665            }
666        };
667
668        Ok(())
669    }
670
671    async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
672        let network = node.network();
673        let resp: QueryResponse = match query {
674            Query::GetStoreQuote {
675                key,
676                data_type,
677                data_size,
678                nonce,
679                difficulty,
680            } => {
681                let record_key = key.to_record_key();
682                let self_id = network.peer_id();
683
684                let maybe_quoting_metrics = network
685                    .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
686                    .await;
687
688                let storage_proofs = if let Some(nonce) = nonce {
689                    Self::respond_x_closest_record_proof(
690                        network,
691                        key.clone(),
692                        nonce,
693                        difficulty,
694                        false,
695                    )
696                    .await
697                } else {
698                    vec![]
699                };
700
701                match maybe_quoting_metrics {
702                    Ok((quoting_metrics, is_already_stored)) => {
703                        if is_already_stored {
704                            QueryResponse::GetStoreQuote {
705                                quote: Err(ProtocolError::RecordExists(
706                                    PrettyPrintRecordKey::from(&record_key).into_owned(),
707                                )),
708                                peer_address: NetworkAddress::from(self_id),
709                                storage_proofs,
710                            }
711                        } else {
712                            QueryResponse::GetStoreQuote {
713                                quote: Self::create_quote_for_storecost(
714                                    network,
715                                    &key,
716                                    &quoting_metrics,
717                                    &payment_address,
718                                ),
719                                peer_address: NetworkAddress::from(self_id),
720                                storage_proofs,
721                            }
722                        }
723                    }
724                    Err(err) => {
725                        warn!("GetStoreQuote failed for {key:?}: {err}");
726                        QueryResponse::GetStoreQuote {
727                            quote: Err(ProtocolError::GetStoreQuoteFailed),
728                            peer_address: NetworkAddress::from(self_id),
729                            storage_proofs,
730                        }
731                    }
732                }
733            }
734            Query::GetReplicatedRecord { requester: _, key } => {
735                let our_address = NetworkAddress::from(network.peer_id());
736                let record_key = key.to_record_key();
737
738                let result = match network.get_local_record(&record_key).await {
739                    Ok(Some(record)) => Ok((our_address, Bytes::from(record.value))),
740                    Ok(None) => Err(ProtocolError::ReplicatedRecordNotFound {
741                        holder: Box::new(our_address),
742                        key: Box::new(key.clone()),
743                    }),
744                    // Use `PutRecordFailed` as place holder
745                    Err(err) => Err(ProtocolError::PutRecordFailed(format!(
746                        "Error to fetch local record for GetReplicatedRecord {err:?}"
747                    ))),
748                };
749
750                QueryResponse::GetReplicatedRecord(result)
751            }
752            Query::GetChunkExistenceProof {
753                key,
754                nonce,
755                difficulty,
756            } => QueryResponse::GetChunkExistenceProof(
757                Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
758            ),
759            Query::CheckNodeInProblem(target_address) => {
760                debug!("Got CheckNodeInProblem for peer {target_address:?}");
761
762                let is_in_trouble =
763                    if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
764                        result
765                    } else {
766                        debug!("Could not get status of {target_address:?}.");
767                        false
768                    };
769
770                QueryResponse::CheckNodeInProblem {
771                    reporter_address: NetworkAddress::from(network.peer_id()),
772                    target_address,
773                    is_in_trouble,
774                }
775            }
776            Query::GetClosestPeers {
777                key,
778                num_of_peers,
779                range,
780                sign_result,
781            } => {
782                debug!(
783                    "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
784                );
785                Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
786                    .await
787            }
788            Query::GetVersion(_) => QueryResponse::GetVersion {
789                peer: NetworkAddress::from(network.peer_id()),
790                version: ant_build_info::package_version(),
791            },
792            Query::PutRecord {
793                holder,
794                address,
795                serialized_record,
796            } => {
797                let record = Record {
798                    key: address.to_record_key(),
799                    value: serialized_record,
800                    publisher: None,
801                    expires: None,
802                };
803
804                let key = PrettyPrintRecordKey::from(&record.key).into_owned();
805                let result = match node.validate_and_store_record(record).await {
806                    Ok(()) => Ok(()),
807                    Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
808                        node.record_metrics(Marker::RecordRejected(
809                            &key,
810                            &PutValidationError::OutdatedRecordCounter { counter, expected },
811                        ));
812                        Err(ProtocolError::OutdatedRecordCounter { counter, expected })
813                    }
814                    Err(PutValidationError::TopologyVerificationFailed {
815                        target_address,
816                        valid_count,
817                        total_paid,
818                        closest_count,
819                        node_peers,
820                        paid_peers,
821                    }) => {
822                        node.record_metrics(Marker::RecordRejected(
823                            &key,
824                            &PutValidationError::TopologyVerificationFailed {
825                                target_address: target_address.clone(),
826                                valid_count,
827                                total_paid,
828                                closest_count,
829                                node_peers: node_peers.clone(),
830                                paid_peers: paid_peers.clone(),
831                            },
832                        ));
833                        Err(ProtocolError::TopologyVerificationFailed {
834                            target_address: Box::new(target_address),
835                            valid_count,
836                            total_paid,
837                            closest_count,
838                            node_peers,
839                            paid_peers,
840                        })
841                    }
842                    Err(err) => {
843                        node.record_metrics(Marker::RecordRejected(&key, &err));
844                        Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
845                    }
846                };
847                QueryResponse::PutRecord {
848                    result,
849                    peer_address: holder,
850                    record_addr: address,
851                }
852            }
853            Query::GetMerkleCandidateQuote {
854                key,
855                data_type,
856                data_size,
857                merkle_payment_timestamp,
858            } => {
859                Self::respond_merkle_candidate_quote(
860                    network,
861                    key,
862                    data_type,
863                    data_size,
864                    merkle_payment_timestamp,
865                    payment_address,
866                )
867                .await
868            }
869            #[cfg(feature = "developer")]
870            Query::DevGetClosestPeersFromNetwork { key, num_of_peers } => {
871                debug!(
872                    "Got DevGetClosestPeersFromNetwork targeting {key:?} with {num_of_peers:?} peers"
873                );
874                Self::respond_dev_get_closest_peers_from_network(network, key, num_of_peers).await
875            }
876        };
877        Response::Query(resp)
878    }
879
880    async fn respond_get_closest_peers(
881        network: &Network,
882        target: NetworkAddress,
883        num_of_peers: Option<usize>,
884        range: Option<[u8; 32]>,
885        sign_result: bool,
886    ) -> QueryResponse {
887        let local_peers = network.get_local_peers_with_multiaddr().await;
888        let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
889            Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
890        } else {
891            vec![]
892        };
893
894        let signature = if sign_result {
895            let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
896            bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
897            network.sign(&bytes).ok()
898        } else {
899            None
900        };
901
902        QueryResponse::GetClosestPeers {
903            target,
904            peers,
905            signature,
906        }
907    }
908
909    /// Handle DevGetClosestPeersFromNetwork query
910    /// Unlike respond_get_closest_peers which returns local routing table peers,
911    /// this method actively queries the Kademlia network for closest peers.
912    /// Only available when the `developer` feature is enabled.
913    #[cfg(feature = "developer")]
914    async fn respond_dev_get_closest_peers_from_network(
915        network: &Network,
916        target: NetworkAddress,
917        num_of_peers: Option<usize>,
918    ) -> QueryResponse {
919        use ant_protocol::messages::QueryResponse;
920
921        let queried_node = NetworkAddress::from(network.peer_id());
922        debug!(
923            "DevGetClosestPeersFromNetwork: node {queried_node:?} querying network for closest peers to {target:?}"
924        );
925
926        // Query the network for closest peers (this performs an actual Kademlia lookup)
927        let result = network.get_closest_peers(&target).await;
928
929        let peers = match result {
930            Ok(peers) => {
931                let mut converted: Vec<(NetworkAddress, Vec<Multiaddr>)> = peers
932                    .into_iter()
933                    .map(|(peer_id, addrs)| (NetworkAddress::from(peer_id), addrs.0))
934                    .collect();
935
936                // If num_of_peers is specified, limit the results
937                if let Some(n) = num_of_peers {
938                    converted.sort_by_key(|(addr, _)| target.distance(addr));
939                    converted.truncate(n);
940                }
941
942                debug!(
943                    "DevGetClosestPeersFromNetwork: found {} peers closest to {target:?}",
944                    converted.len()
945                );
946                converted
947            }
948            Err(err) => {
949                warn!(
950                    "DevGetClosestPeersFromNetwork: failed to query network for {target:?}: {err:?}"
951                );
952                vec![]
953            }
954        };
955
956        QueryResponse::DevGetClosestPeersFromNetwork {
957            target,
958            queried_node,
959            peers,
960        }
961    }
962
963    fn calculate_get_closest_peers(
964        peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
965        target: NetworkAddress,
966        num_of_peers: Option<usize>,
967        range: Option<[u8; 32]>,
968    ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
969        match (num_of_peers, range) {
970            (_, Some(value)) => {
971                let distance = U256::from_big_endian(&value);
972                peer_addrs
973                    .iter()
974                    .filter_map(|(peer_id, multi_addrs)| {
975                        let addr = NetworkAddress::from(*peer_id);
976                        if target.distance(&addr).0 <= distance {
977                            Some((addr, multi_addrs.clone()))
978                        } else {
979                            None
980                        }
981                    })
982                    .collect()
983            }
984            (Some(num_of_peers), _) => {
985                let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
986                    .iter()
987                    .map(|(peer_id, multi_addrs)| {
988                        let addr = NetworkAddress::from(*peer_id);
989                        (addr, multi_addrs.clone())
990                    })
991                    .collect();
992                result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
993                result.into_iter().take(num_of_peers).collect()
994            }
995            (None, None) => vec![],
996        }
997    }
998
999    /// Handle GetMerkleCandidateQuote query
1000    /// Returns a signed MerklePaymentCandidateNode containing the node's current quoting metrics,
1001    /// reward address, and timestamp commitment
1002    async fn respond_merkle_candidate_quote(
1003        network: &Network,
1004        key: NetworkAddress,
1005        data_type: u32,
1006        data_size: usize,
1007        merkle_payment_timestamp: u64,
1008        payment_address: RewardsAddress,
1009    ) -> QueryResponse {
1010        debug!(
1011            "merkle payment: GetMerkleCandidateQuote for target {key:?}, timestamp: {merkle_payment_timestamp}, data_type: {data_type}, data_size: {data_size}"
1012        );
1013
1014        // Validate timestamp before signing to prevent committing to invalid times.
1015        // Nodes will reject proofs with expired/future timestamps during payment verification,
1016        // so signing such timestamps would create useless quotes that can't be used.
1017        //
1018        // Allow ±24 hours tolerance to handle timezone differences and clock skew between
1019        // clients and nodes. This prevents valid payments from being rejected due to minor
1020        // time differences while still catching truly expired/invalid timestamps.
1021        const TIMESTAMP_TOLERANCE: u64 = 24 * 60 * 60; // 24 hours
1022
1023        let now = std::time::SystemTime::now()
1024            .duration_since(std::time::UNIX_EPOCH)
1025            .unwrap_or_default()
1026            .as_secs();
1027
1028        // Reject future timestamps (with 24h tolerance for clock skew)
1029        let future_threshold = now + TIMESTAMP_TOLERANCE;
1030        if merkle_payment_timestamp > future_threshold {
1031            let error_msg = format!(
1032                "Rejected future timestamp {merkle_payment_timestamp} (current time: {now}, threshold: {future_threshold})"
1033            );
1034            warn!("{error_msg} for {key:?}");
1035            return QueryResponse::GetMerkleCandidateQuote(Err(
1036                ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1037            ));
1038        }
1039
1040        // Reject expired timestamps (with 24h tolerance for clock skew)
1041        let expiration_threshold = MERKLE_PAYMENT_EXPIRATION + TIMESTAMP_TOLERANCE;
1042        let age = now.saturating_sub(merkle_payment_timestamp);
1043        if age > expiration_threshold {
1044            let error_msg = format!(
1045                "Rejected expired timestamp {merkle_payment_timestamp} (age: {age}s, max: {expiration_threshold}s)",
1046            );
1047            warn!("{error_msg} for {key:?}");
1048            return QueryResponse::GetMerkleCandidateQuote(Err(
1049                ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1050            ));
1051        }
1052
1053        // Get node's current quoting metrics
1054        let record_key = key.to_record_key();
1055        let (quoting_metrics, _is_already_stored) = match network
1056            .get_local_quoting_metrics(record_key, data_type, data_size)
1057            .await
1058        {
1059            Ok(metrics) => metrics,
1060            Err(err) => {
1061                let error_msg = format!("Failed to get quoting metrics for {key:?}: {err}");
1062                warn!("{error_msg}");
1063                return QueryResponse::GetMerkleCandidateQuote(Err(
1064                    ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1065                ));
1066            }
1067        };
1068
1069        // Create the MerklePaymentCandidateNode with node's signed commitment
1070        let pub_key = network.get_pub_key();
1071        let reward_address = payment_address;
1072        let bytes = ant_evm::merkle_payments::MerklePaymentCandidateNode::bytes_to_sign(
1073            &quoting_metrics,
1074            &reward_address,
1075            merkle_payment_timestamp,
1076        );
1077        let signature = match network.sign(&bytes) {
1078            Ok(sig) => sig,
1079            Err(e) => {
1080                let error_msg = format!("Failed to sign candidate node for {key:?}: {e}");
1081                error!("{error_msg}");
1082                return QueryResponse::GetMerkleCandidateQuote(Err(
1083                    ProtocolError::FailedToSignMerkleCandidate(error_msg),
1084                ));
1085            }
1086        };
1087
1088        let candidate = ant_evm::merkle_payments::MerklePaymentCandidateNode {
1089            quoting_metrics,
1090            reward_address,
1091            merkle_payment_timestamp,
1092            pub_key,
1093            signature,
1094        };
1095        QueryResponse::GetMerkleCandidateQuote(Ok(candidate))
1096    }
1097
1098    // Nodes only check ChunkProof each other, to avoid `multi-version` issue
1099    // Client check proof against all records, as have to fetch from network anyway.
1100    async fn respond_x_closest_record_proof(
1101        network: &Network,
1102        key: NetworkAddress,
1103        nonce: Nonce,
1104        difficulty: usize,
1105        chunk_only: bool,
1106    ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
1107        let start = Instant::now();
1108        let mut results = vec![];
1109        if difficulty == 1 {
1110            // Client checking existence of published chunk.
1111            let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
1112            if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
1113                let proof = ChunkProof::new(&record.value, nonce);
1114                debug!("Chunk proof for {key:?} is {proof:?}");
1115                result = Ok(proof)
1116            } else {
1117                debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
1118            }
1119
1120            results.push((key.clone(), result));
1121        } else {
1122            let all_local_records = network.get_all_local_record_addresses().await;
1123
1124            if let Ok(all_local_records) = all_local_records {
1125                let mut all_chunk_addrs: Vec<_> = if chunk_only {
1126                    all_local_records
1127                        .iter()
1128                        .filter_map(|(addr, record_type)| {
1129                            if *record_type == ValidationType::Chunk {
1130                                Some(addr.clone())
1131                            } else {
1132                                None
1133                            }
1134                        })
1135                        .collect()
1136                } else {
1137                    all_local_records.keys().cloned().collect()
1138                };
1139
1140                // Sort by distance and only take first X closest entries
1141                all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
1142
1143                // TODO: this shall be deduced from resource usage dynamically
1144                let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
1145
1146                for addr in all_chunk_addrs.iter().take(workload_factor) {
1147                    if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
1148                    {
1149                        let proof = ChunkProof::new(&record.value, nonce);
1150                        debug!("Chunk proof for {key:?} is {proof:?}");
1151                        results.push((addr.clone(), Ok(proof)));
1152                    }
1153                }
1154            }
1155
1156            info!(
1157                "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
1158                results.len(),
1159                start.elapsed()
1160            );
1161        }
1162
1163        results
1164    }
1165
1166    /// Check among all chunk type records that we have,
1167    /// and randomly pick one as the verification candidate.
1168    /// This will challenge all closest peers at once.
1169    async fn storage_challenge(network: Network) {
1170        let start = Instant::now();
1171        let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
1172            network.get_k_closest_local_peers_to_the_target(None).await
1173        {
1174            closest_peers
1175                .into_iter()
1176                .take(CLOSE_GROUP_SIZE)
1177                .collect_vec()
1178        } else {
1179            error!("Cannot get local neighbours");
1180            return;
1181        };
1182        if closest_peers.len() < CLOSE_GROUP_SIZE {
1183            debug!(
1184                "Not enough neighbours ({}/{}) to carry out storage challenge.",
1185                closest_peers.len(),
1186                CLOSE_GROUP_SIZE
1187            );
1188            return;
1189        }
1190
1191        let mut verify_candidates: Vec<NetworkAddress> =
1192            if let Ok(all_keys) = network.get_all_local_record_addresses().await {
1193                all_keys
1194                    .iter()
1195                    .filter_map(|(addr, record_type)| {
1196                        if ValidationType::Chunk == *record_type {
1197                            Some(addr.clone())
1198                        } else {
1199                            None
1200                        }
1201                    })
1202                    .collect()
1203            } else {
1204                error!("Failed to get local record addresses.");
1205                return;
1206            };
1207        let num_of_targets = verify_candidates.len();
1208        if num_of_targets < 50 {
1209            debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
1210            return;
1211        }
1212
1213        // To ensure the neighbours sharing same knowledge as to us,
1214        // The target is choosen to be not far from us.
1215        let self_addr = NetworkAddress::from(network.peer_id());
1216        verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
1217        let index: usize = OsRng.gen_range(0..num_of_targets / 2);
1218        let target = verify_candidates[index].clone();
1219        // TODO: workload shall be dynamically deduced from resource usage
1220        let difficulty = CLOSE_GROUP_SIZE;
1221        verify_candidates.sort_by_key(|addr| target.distance(addr));
1222        let expected_targets = verify_candidates.into_iter().take(difficulty);
1223        let nonce: Nonce = thread_rng().r#gen::<u64>();
1224        let mut expected_proofs = HashMap::new();
1225        for addr in expected_targets {
1226            if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
1227                let expected_proof = ChunkProof::new(&record.value, nonce);
1228                let _ = expected_proofs.insert(addr, expected_proof);
1229            } else {
1230                error!("Local record {addr:?} cann't be loaded from disk.");
1231            }
1232        }
1233        let request = Request::Query(Query::GetChunkExistenceProof {
1234            key: target.clone(),
1235            nonce,
1236            difficulty,
1237        });
1238
1239        let mut tasks = JoinSet::new();
1240        for (peer_id, addresses) in closest_peers {
1241            if peer_id == network.peer_id() {
1242                continue;
1243            }
1244            let network_clone = network.clone();
1245            let request_clone = request.clone();
1246            let expected_proofs_clone = expected_proofs.clone();
1247            let _ = tasks.spawn(async move {
1248                let res = scoring_peer(
1249                    network_clone,
1250                    (peer_id, addresses),
1251                    request_clone,
1252                    expected_proofs_clone,
1253                )
1254                .await;
1255                (peer_id, res)
1256            });
1257        }
1258
1259        let mut peer_scores = vec![];
1260        while let Some(res) = tasks.join_next().await {
1261            match res {
1262                Ok((peer_id, score)) => {
1263                    let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
1264                    if !is_healthy {
1265                        info!(
1266                            "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
1267                        );
1268                        // TODO: shall the challenge failure immediately triggers the node to be removed?
1269                        network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
1270                    }
1271                    peer_scores.push((peer_id, is_healthy));
1272                }
1273                Err(e) => {
1274                    info!("StorageChallenge task completed with error {e:?}");
1275                }
1276            }
1277        }
1278        if !peer_scores.is_empty() {
1279            network.notify_peer_scores(peer_scores);
1280        }
1281
1282        Self::verify_local_replication_health(network.clone()).await;
1283
1284        info!(
1285            "Completed node StorageChallenge against neighbours in {:?}!",
1286            start.elapsed()
1287        );
1288    }
1289
1290    /// Perform a direct replicated record fetch from nearby peers to verify replication health.
1291    async fn verify_local_replication_health(network: Network) {
1292        let closest_peers = match network.get_k_closest_local_peers_to_the_target(None).await {
1293            Ok(peers) => peers,
1294            Err(err) => {
1295                warn!("Cannot fetch closest peers for replica verification: {err:?}");
1296                return;
1297            }
1298        };
1299
1300        if closest_peers.len() <= CLOSE_NEIGHBOUR_DISTANCE_INDEX {
1301            debug!(
1302                "Skipping replica verification as we only know {} neighbours (need > {}).",
1303                closest_peers.len(),
1304                CLOSE_NEIGHBOUR_DISTANCE_INDEX
1305            );
1306            return;
1307        }
1308
1309        let self_address = NetworkAddress::from(network.peer_id());
1310        let Some((threshold_peer, _)) = closest_peers.get(CLOSE_NEIGHBOUR_DISTANCE_INDEX) else {
1311            debug!("Unable to determine distance threshold for replica verification.");
1312            return;
1313        };
1314
1315        let threshold_distance = self_address.distance(&NetworkAddress::from(*threshold_peer));
1316        let Some(threshold_ilog2) = threshold_distance.ilog2() else {
1317            debug!("Threshold distance lacks ilog2; cannot proceed with replica verification.");
1318            return;
1319        };
1320
1321        let local_records = match network.get_all_local_record_addresses().await {
1322            Ok(records) => records,
1323            Err(err) => {
1324                warn!("Failed to list local records for replica verification: {err:?}");
1325                return;
1326            }
1327        };
1328
1329        let mut nearby_records: Vec<NetworkAddress> = local_records
1330            .into_iter()
1331            .filter_map(|(address, record_type)| {
1332                if record_type != ValidationType::Chunk {
1333                    return None;
1334                }
1335                let distance = self_address.distance(&address);
1336                distance.ilog2().and_then(|record_ilog2| {
1337                    if record_ilog2 <= threshold_ilog2 {
1338                        Some(address)
1339                    } else {
1340                        None
1341                    }
1342                })
1343            })
1344            .collect();
1345
1346        nearby_records.shuffle(&mut thread_rng());
1347        let target_record = if let Some(entry) = nearby_records.first().cloned() {
1348            entry
1349        } else {
1350            debug!("No nearby chunk records available for replica verification.");
1351            return;
1352        };
1353
1354        let pretty_key = PrettyPrintRecordKey::from(&target_record.to_record_key()).into_owned();
1355
1356        let candidate_peers = match network
1357            .get_k_closest_local_peers_to_the_target(Some(target_record.clone()))
1358            .await
1359        {
1360            Ok(peers) => peers
1361                .into_iter()
1362                .filter(|(peer_id, _)| peer_id != &network.peer_id())
1363                .take(REPLICA_FETCH_PEER_COUNT)
1364                .collect::<Vec<_>>(),
1365            Err(err) => {
1366                warn!(
1367                    "Cannot fetch record-specific closest peers for replica verification: {err:?}"
1368                );
1369                return;
1370            }
1371        };
1372
1373        if candidate_peers.len() < REPLICA_FETCH_PEER_COUNT {
1374            debug!(
1375                "Only {} peers available for replica verification (need at least {}).",
1376                candidate_peers.len(),
1377                REPLICA_FETCH_PEER_COUNT
1378            );
1379            return;
1380        }
1381
1382        debug!(
1383            "Verifying replicated record {pretty_key:?} against {} closest peers.",
1384            candidate_peers.len()
1385        );
1386
1387        let (successful_peers, failed_peers) = Self::fetch_record_from_peers_with_addresses(
1388            network.clone(),
1389            target_record.clone(),
1390            candidate_peers,
1391        )
1392        .await;
1393
1394        if failed_peers.is_empty() {
1395            debug!("All peers returned record {pretty_key:?} during replica verification.");
1396            return;
1397        }
1398
1399        if successful_peers.len() < MIN_HEALTHY_REPLICA_COUNT {
1400            warn!(
1401                "Replica verification fetched only {} copies of {pretty_key:?}. Record is unhealthy; skipping peer classification.",
1402                successful_peers.len()
1403            );
1404            return;
1405        }
1406
1407        if !failed_peers.is_empty() {
1408            info!(
1409                "Scheduling replica verification retry for {} peers on record {pretty_key:?}.",
1410                failed_peers.len()
1411            );
1412            Self::schedule_record_fetch_retry(network.clone(), target_record.clone(), failed_peers)
1413                .await;
1414        }
1415    }
1416
1417    /// Fetch a replicated record from the provided peers and return the success/failure split.
1418    async fn fetch_record_from_peers_with_addresses(
1419        network: Network,
1420        record_address: NetworkAddress,
1421        peers: Vec<(PeerId, Addresses)>,
1422    ) -> (Vec<PeerId>, Vec<PeerId>) {
1423        let request = Request::Query(Query::GetReplicatedRecord {
1424            requester: NetworkAddress::from(network.peer_id()),
1425            key: record_address.clone(),
1426        });
1427        let expected_key = record_address.to_record_key();
1428        let pretty_key = PrettyPrintRecordKey::from(&expected_key).into_owned();
1429
1430        let mut successes = Vec::new();
1431        let mut failures = Vec::new();
1432        let concurrency = peers.len();
1433        let results = stream::iter(peers.into_iter().map(|(peer_id, addrs)| {
1434            let network_clone = network.clone();
1435            let request_clone = request.clone();
1436            async move {
1437                let result = network_clone
1438                    .send_request(request_clone, peer_id, addrs.clone())
1439                    .await;
1440                (peer_id, addrs, result)
1441            }
1442        }))
1443        .buffer_unordered(concurrency)
1444        .collect::<Vec<_>>()
1445        .await;
1446
1447        for res in results {
1448            match res {
1449                (
1450                    peer_id,
1451                    _addrs,
1452                    Ok((Response::Query(QueryResponse::GetReplicatedRecord(result)), _)),
1453                ) => {
1454                    match result {
1455                        Ok((_holder, value)) => {
1456                            // Try to deserialize the record and ensure the Chunk content matches
1457                            let record =
1458                                Record::new(record_address.to_record_key(), value.to_vec());
1459                            if let Ok(chunk) = try_deserialize_record::<Chunk>(&record)
1460                                && chunk.network_address().to_record_key() == expected_key
1461                            {
1462                                successes.push(peer_id);
1463                            } else {
1464                                warn!(
1465                                    "Peer {peer_id:?} responded with an incorrect chunk copy of {pretty_key:?}."
1466                                );
1467                                failures.push(peer_id);
1468                            }
1469                        }
1470                        Err(err) => {
1471                            info!(
1472                                "Peer {peer_id:?} responded with error {err:?} for replicated record {pretty_key:?}."
1473                            );
1474                            failures.push(peer_id);
1475                        }
1476                    }
1477                }
1478                (peer_id, _addrs, Ok((other_response, _))) => {
1479                    warn!(
1480                        "Peer {peer_id:?} responded with unexpected message {other_response:?} for {pretty_key:?}."
1481                    );
1482                    failures.push(peer_id);
1483                }
1484                (peer_id, _addrs, Err(err)) => {
1485                    info!(
1486                        "Failed to reach peer {peer_id:?} for replicated record {pretty_key:?}: {err:?}"
1487                    );
1488                    failures.push(peer_id);
1489                }
1490            }
1491        }
1492
1493        (successes, failures)
1494    }
1495
1496    /// Schedule a future retry for peers that failed to return the replicated record.
1497    async fn schedule_record_fetch_retry(
1498        network: Network,
1499        record_address: NetworkAddress,
1500        failed_peers: Vec<PeerId>,
1501    ) {
1502        let retry_peers: HashSet<_> = failed_peers.into_iter().collect();
1503        if retry_peers.is_empty() {
1504            return;
1505        }
1506
1507        let record_clone = record_address.clone();
1508        let network_clone = network.clone();
1509
1510        tokio::time::sleep(REPLICA_FETCH_RETRY_DELAY).await;
1511        let pretty_key = PrettyPrintRecordKey::from(&record_clone.to_record_key()).into_owned();
1512
1513        let refreshed_candidates =
1514            Self::refresh_retry_candidate_addresses(&network_clone, &record_clone, &retry_peers)
1515                .await;
1516
1517        if refreshed_candidates.is_empty() {
1518            info!(
1519                "Skipping replica retry for {pretty_key:?}; no tracked peers remain close to the record."
1520            );
1521            return;
1522        }
1523
1524        let (_, still_failed) = Self::fetch_record_from_peers_with_addresses(
1525            network_clone.clone(),
1526            record_clone.clone(),
1527            refreshed_candidates,
1528        )
1529        .await;
1530
1531        if still_failed.is_empty() {
1532            info!("All peers successfully returned {pretty_key:?} during replica retry.");
1533            return;
1534        }
1535
1536        warn!(
1537            "{} peers still failed to provide {pretty_key:?}; evicting and blacklisting.",
1538            still_failed.len()
1539        );
1540        for peer_id in still_failed {
1541            Self::evict_and_blacklist_peer(network_clone.clone(), peer_id);
1542        }
1543    }
1544
1545    /// Get the latest address information for tracked peers via a DHT query.
1546    async fn refresh_retry_candidate_addresses(
1547        network: &Network,
1548        record_address: &NetworkAddress,
1549        retry_peers: &HashSet<PeerId>,
1550    ) -> Vec<(PeerId, Addresses)> {
1551        let pretty_key = PrettyPrintRecordKey::from(&record_address.to_record_key()).into_owned();
1552        let Ok(closest_peers) = network.get_closest_peers(record_address).await else {
1553            warn!(
1554                "Failed to refresh peer addresses for {pretty_key:?}; unable to retry replica fetch."
1555            );
1556            return Vec::new();
1557        };
1558
1559        let closest_map: HashMap<PeerId, Addresses> = closest_peers.into_iter().collect();
1560
1561        retry_peers
1562            .iter()
1563            .filter_map(|peer_id| {
1564                closest_map
1565                    .get(peer_id)
1566                    .cloned()
1567                    .map(|addrs| (*peer_id, addrs))
1568            })
1569            .collect()
1570    }
1571
1572    /// Remove the peer from the routing table and add it to the blacklist.
1573    fn evict_and_blacklist_peer(network: Network, peer_id: PeerId) {
1574        network.remove_peer(peer_id);
1575        network.add_peer_to_blocklist(peer_id);
1576    }
1577
1578    /// Query peers' versions and update local knowledge.
1579    async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1580        // To avoid choking, carry out the queries one by one
1581        for (peer_id, addrs) in peers {
1582            Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1583        }
1584    }
1585
1586    /// Query peer's version and update local knowledge.
1587    async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1588        let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1589        // We can skip passing `addrs` here as the new peer should be part of the kad::RT and swarm can get the addr.
1590        let version = match network.send_request(request, peer, addrs).await {
1591            Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1592                trace!("Fetched peer version {peer:?} as {version:?}");
1593                version
1594            }
1595            Ok(other) => {
1596                info!("Not a fetched peer version {peer:?}, {other:?}");
1597                "none".to_string()
1598            }
1599            Err(err) => {
1600                info!("Failed to fetch peer version {peer:?} with error {err:?}");
1601                // Failed version fetch (which contains dial then re-attempt by itself)
1602                // indicates the peer could be dead with high chance.
1603                // In that case, the peer shall be removed from the routing table.
1604                network.remove_peer(peer);
1605                return;
1606            }
1607        };
1608        network.notify_node_version(peer, version);
1609    }
1610}
1611
1612#[derive(Debug)]
1613struct CloseGroupTracker {
1614    self_address: NetworkAddress,
1615    /// Tracks the closest N peers by (distance, peer_id), sorted by distance to self.
1616    /// Using a tuple key handles the theoretical case where two peers have identical
1617    /// XOR distance to self.
1618    close_up_peers: BTreeSet<(Distance, PeerId)>,
1619    tracked_entries: HashMap<PeerId, BehaviourEntry>,
1620}
1621
1622impl CloseGroupTracker {
1623    fn new(self_peer_id: PeerId) -> Self {
1624        Self {
1625            self_address: NetworkAddress::from(self_peer_id),
1626            close_up_peers: BTreeSet::new(),
1627            tracked_entries: HashMap::new(),
1628        }
1629    }
1630
1631    fn record_peer_added(&mut self, peer_id: PeerId) -> ReplicationDirective {
1632        let distance = self.distance_to_peer(peer_id);
1633
1634        // Check if this peer was already present
1635        let was_present = self.close_up_peers.contains(&(distance, peer_id));
1636
1637        // Determine if the peer is close enough to track
1638        let is_close_enough = self.is_distance_within_close_range(distance);
1639
1640        if is_close_enough {
1641            // Add to close_up_peers
1642            let _ = self.close_up_peers.insert((distance, peer_id));
1643
1644            // If we exceed the limit, remove the farthest peer
1645            if self.close_up_peers.len() > CLOSE_GROUP_TRACKING_LIMIT
1646                && let Some(&(farthest_distance, farthest_peer)) =
1647                    self.close_up_peers.iter().next_back()
1648            {
1649                let _ = self
1650                    .close_up_peers
1651                    .remove(&(farthest_distance, farthest_peer));
1652            }
1653        } else {
1654            // Too far, don't track and clean up any existing entry
1655            let _ = self.tracked_entries.remove(&peer_id);
1656            return ReplicationDirective::Ignore;
1657        }
1658
1659        use std::collections::hash_map::Entry;
1660        match self.tracked_entries.entry(peer_id) {
1661            Entry::Vacant(vacant_entry) => {
1662                let _ = vacant_entry.insert(BehaviourEntry::default());
1663                if !was_present {
1664                    ReplicationDirective::Trigger
1665                } else {
1666                    ReplicationDirective::Ignore
1667                }
1668            }
1669            Entry::Occupied(mut occupied_entry) => {
1670                let entry = occupied_entry.get_mut();
1671                if entry.awaiting_rejoin || entry.timer_deadline.is_some() {
1672                    entry.restart_detected = true;
1673                    entry.awaiting_rejoin = false;
1674                    entry.timer_deadline = None;
1675                    ReplicationDirective::Skip
1676                } else if !was_present {
1677                    entry.restart_detected = false;
1678                    ReplicationDirective::Trigger
1679                } else {
1680                    ReplicationDirective::Ignore
1681                }
1682            }
1683        }
1684    }
1685
1686    fn record_peer_removed(&mut self, peer_id: PeerId) -> ReplicationDirective {
1687        let distance = self.distance_to_peer(peer_id);
1688
1689        // Check if this peer is in our close_up_peers
1690        let was_tracked = self.close_up_peers.contains(&(distance, peer_id));
1691
1692        if !was_tracked {
1693            // Not being tracked, i.e. being out-of-range, hence Ignore
1694            return ReplicationDirective::Ignore;
1695        }
1696
1697        // Remove from close_up_peers
1698        let _ = self.close_up_peers.remove(&(distance, peer_id));
1699
1700        let entry = self.tracked_entries.entry(peer_id).or_default();
1701
1702        if entry.restart_detected {
1703            entry.timer_deadline = Some(Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION);
1704            entry.awaiting_rejoin = false;
1705            ReplicationDirective::Skip
1706        } else {
1707            entry.awaiting_rejoin = true;
1708            entry.timer_deadline = None;
1709            ReplicationDirective::Trigger
1710        }
1711    }
1712
1713    fn handle_timer_expiry(&mut self, now: Instant) -> bool {
1714        let mut expired = false;
1715        for entry in self.tracked_entries.values_mut() {
1716            if let Some(deadline) = entry.timer_deadline
1717                && now >= deadline
1718            {
1719                entry.timer_deadline = None;
1720                entry.restart_detected = false;
1721                entry.awaiting_rejoin = false;
1722                expired = true;
1723            }
1724        }
1725
1726        // Keep only entries that are still relevant
1727        self.tracked_entries.retain(|peer_id, entry| {
1728            let is_currently_present = self
1729                .close_up_peers
1730                .iter()
1731                .any(|(_, present_peer)| present_peer == peer_id);
1732
1733            entry.awaiting_rejoin
1734                || entry.restart_detected
1735                || entry.timer_deadline.is_some()
1736                || is_currently_present
1737        });
1738
1739        expired
1740    }
1741
1742    fn is_distance_within_close_range(&self, distance: Distance) -> bool {
1743        // If we haven't reached the limit yet, any peer is close enough
1744        if self.close_up_peers.len() < CLOSE_GROUP_TRACKING_LIMIT {
1745            return true;
1746        }
1747
1748        // Otherwise, check if this distance is closer than the farthest tracked peer
1749        if let Some(&(farthest_distance, _)) = self.close_up_peers.iter().next_back() {
1750            distance < farthest_distance
1751        } else {
1752            true
1753        }
1754    }
1755
1756    fn distance_to_peer(&self, peer_id: PeerId) -> Distance {
1757        self.self_address.distance(&NetworkAddress::from(peer_id))
1758    }
1759}
1760
1761#[derive(Debug, Default)]
1762struct BehaviourEntry {
1763    awaiting_rejoin: bool,
1764    restart_detected: bool,
1765    timer_deadline: Option<Instant>,
1766}
1767
1768#[derive(Debug, PartialEq, Eq)]
1769enum ReplicationDirective {
1770    Trigger,
1771    Skip,
1772    Ignore,
1773}
1774
1775impl ReplicationDirective {
1776    fn should_trigger(&self) -> bool {
1777        matches!(self, Self::Trigger)
1778    }
1779}
1780
1781#[cfg(test)]
1782mod close_group_tracker_tests {
1783    use super::*;
1784    use std::time::Duration;
1785
1786    fn random_peer() -> PeerId {
1787        let keypair = libp2p::identity::Keypair::generate_ed25519();
1788        PeerId::from(keypair.public())
1789    }
1790
1791    #[test]
1792    fn new_peer_triggers_replication() {
1793        let mut tracker = CloseGroupTracker::new(random_peer());
1794        let peer = random_peer();
1795
1796        assert_eq!(
1797            tracker.record_peer_added(peer),
1798            ReplicationDirective::Trigger
1799        );
1800
1801        assert_eq!(
1802            tracker.record_peer_removed(peer),
1803            ReplicationDirective::Trigger
1804        );
1805    }
1806
1807    #[test]
1808    fn restart_detection_skips_replication() {
1809        let mut tracker = CloseGroupTracker::new(random_peer());
1810        let peer = random_peer();
1811
1812        assert_eq!(
1813            tracker.record_peer_added(peer),
1814            ReplicationDirective::Trigger
1815        );
1816
1817        assert_eq!(
1818            tracker.record_peer_removed(peer),
1819            ReplicationDirective::Trigger
1820        );
1821
1822        assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1823
1824        assert_eq!(
1825            tracker.record_peer_removed(peer),
1826            ReplicationDirective::Skip
1827        );
1828
1829        assert!(tracker.handle_timer_expiry(
1830            Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1)
1831        ));
1832
1833        assert_eq!(
1834            tracker.record_peer_added(peer),
1835            ReplicationDirective::Trigger
1836        );
1837    }
1838
1839    #[test]
1840    fn peer_outside_close_group_is_ignored() {
1841        let mut tracker = CloseGroupTracker::new(random_peer());
1842
1843        // Fill the tracker to capacity with random peers
1844        let mut added_peers = Vec::new();
1845        for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1846            let peer = random_peer();
1847            let result = tracker.record_peer_added(peer);
1848            // All should trigger since we're under capacity
1849            assert_eq!(result, ReplicationDirective::Trigger);
1850            added_peers.push(peer);
1851        }
1852
1853        assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1854
1855        // Now add more peers - some will be ignored if they're farther than existing ones
1856        let mut ignored_count = 0;
1857        let mut trigger_count = 0;
1858        for _ in 0..50 {
1859            let peer = random_peer();
1860            match tracker.record_peer_added(peer) {
1861                ReplicationDirective::Ignore => ignored_count += 1,
1862                ReplicationDirective::Trigger => trigger_count += 1,
1863                _ => {}
1864            }
1865        }
1866
1867        // At capacity, new peers are only accepted if closer than the farthest
1868        // Some should be ignored (too far), some should trigger (closer than farthest)
1869        assert!(
1870            ignored_count > 0 || trigger_count > 0,
1871            "Expected some peers to be processed"
1872        );
1873
1874        // Tracker should still only have CLOSE_GROUP_TRACKING_LIMIT peers
1875        assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1876    }
1877
1878    #[test]
1879    fn removal_of_untracked_peer_is_ignored() {
1880        let mut tracker = CloseGroupTracker::new(random_peer());
1881
1882        // Try to remove a peer that was never added
1883        let unknown_peer = random_peer();
1884        assert_eq!(
1885            tracker.record_peer_removed(unknown_peer),
1886            ReplicationDirective::Ignore
1887        );
1888    }
1889
1890    #[test]
1891    fn timer_expiry_resets_restart_state() {
1892        let mut tracker = CloseGroupTracker::new(random_peer());
1893        let peer = random_peer();
1894
1895        // Add and remove peer (normal behavior)
1896        assert_eq!(
1897            tracker.record_peer_added(peer),
1898            ReplicationDirective::Trigger
1899        );
1900        assert_eq!(
1901            tracker.record_peer_removed(peer),
1902            ReplicationDirective::Trigger
1903        );
1904
1905        // Peer rejoins quickly - detected as restart, skipped
1906        assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1907
1908        // Remove again - still in restart mode, skipped with timer set
1909        assert_eq!(
1910            tracker.record_peer_removed(peer),
1911            ReplicationDirective::Skip
1912        );
1913
1914        // Timer hasn't expired yet - no change
1915        assert!(!tracker.handle_timer_expiry(Instant::now()));
1916
1917        // Timer expires
1918        let after_expiry =
1919            Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1920        assert!(tracker.handle_timer_expiry(after_expiry));
1921
1922        // Now the peer should trigger replication again (state reset)
1923        assert_eq!(
1924            tracker.record_peer_added(peer),
1925            ReplicationDirective::Trigger
1926        );
1927    }
1928
1929    #[test]
1930    fn eviction_maintains_closest_peers() {
1931        let self_peer = random_peer();
1932        let mut tracker = CloseGroupTracker::new(self_peer);
1933
1934        // Add exactly CLOSE_GROUP_TRACKING_LIMIT peers
1935        let mut peers_with_distances: Vec<(PeerId, Distance)> = Vec::new();
1936        for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1937            let peer = random_peer();
1938            let distance = tracker.distance_to_peer(peer);
1939            let _ = tracker.record_peer_added(peer);
1940            peers_with_distances.push((peer, distance));
1941        }
1942
1943        assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1944
1945        // Sort by distance to know which is farthest
1946        peers_with_distances.sort_by_key(|(_, d)| *d);
1947        let farthest_distance = peers_with_distances.last().map(|(_, d)| *d).unwrap();
1948
1949        // Add a new peer that's closer than the farthest
1950        // Keep trying until we find one (random, so may take a few tries)
1951        let mut found_closer = false;
1952        for _ in 0..100 {
1953            let new_peer = random_peer();
1954            let new_distance = tracker.distance_to_peer(new_peer);
1955            if new_distance < farthest_distance {
1956                let result = tracker.record_peer_added(new_peer);
1957                assert_eq!(result, ReplicationDirective::Trigger);
1958                // Should still have exactly CLOSE_GROUP_TRACKING_LIMIT peers
1959                assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1960                found_closer = true;
1961                break;
1962            }
1963        }
1964
1965        // Note: This assertion might occasionally fail due to randomness,
1966        // but with 100 attempts it's extremely unlikely
1967        assert!(
1968            found_closer,
1969            "Could not find a peer closer than the farthest in 100 attempts"
1970        );
1971    }
1972
1973    #[test]
1974    fn tracked_entries_cleaned_up_on_timer_expiry() {
1975        let mut tracker = CloseGroupTracker::new(random_peer());
1976        let peer = random_peer();
1977
1978        // Add, remove, re-add (restart detected), remove again (timer set)
1979        let _ = tracker.record_peer_added(peer);
1980        let _ = tracker.record_peer_removed(peer);
1981        let _ = tracker.record_peer_added(peer);
1982        let _ = tracker.record_peer_removed(peer);
1983
1984        // Peer should have a tracked entry with timer
1985        assert!(tracker.tracked_entries.contains_key(&peer));
1986
1987        // After timer expiry, entry should be cleaned up since peer is not present
1988        let after_expiry =
1989            Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1990        let _ = tracker.handle_timer_expiry(after_expiry);
1991
1992        // Entry should be removed since peer is not in close_up_peers
1993        // and no longer has active flags
1994        assert!(!tracker.tracked_entries.contains_key(&peer));
1995    }
1996}
1997
1998async fn scoring_peer(
1999    network: Network,
2000    peer: (PeerId, Addresses),
2001    request: Request,
2002    expected_proofs: HashMap<NetworkAddress, ChunkProof>,
2003) -> usize {
2004    let peer_id = peer.0;
2005    let start = Instant::now();
2006    let responses = network
2007        .send_and_get_responses(&[peer], &request, true)
2008        .await;
2009
2010    if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
2011        responses.get(&peer_id)
2012    {
2013        if answers.is_empty() {
2014            info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
2015            return 0;
2016        }
2017        let elapsed = start.elapsed();
2018
2019        let mut received_proofs = vec![];
2020        for (addr, proof) in answers {
2021            if let Ok(proof) = proof {
2022                received_proofs.push((addr.clone(), proof.clone()));
2023            }
2024        }
2025
2026        let score = mark_peer(elapsed, received_proofs, &expected_proofs);
2027        info!(
2028            "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
2029            answers.len()
2030        );
2031        score
2032    } else {
2033        info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
2034        0
2035    }
2036}
2037
2038// Based on following metrics:
2039//   * the duration
2040//   * is there false answer
2041//   * percentage of correct answers among the expected closest
2042// The higher the score, the better confidence on the peer
2043fn mark_peer(
2044    duration: Duration,
2045    answers: Vec<(NetworkAddress, ChunkProof)>,
2046    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
2047) -> usize {
2048    let duration_score = duration_score_scheme(duration);
2049    let challenge_score = challenge_score_scheme(answers, expected_proofs);
2050
2051    duration_score * challenge_score
2052}
2053
2054// Less duration shall get higher score
2055fn duration_score_scheme(duration: Duration) -> usize {
2056    // So far just a simple stepped scheme, capped by HIGHEST_SCORE
2057    let in_ms = if let Some(value) = duration.as_millis().to_usize() {
2058        value
2059    } else {
2060        info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
2061        1000
2062    };
2063
2064    let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
2065    HIGHEST_SCORE - step
2066}
2067
2068// Any false answer shall result in 0 score immediately
2069fn challenge_score_scheme(
2070    answers: Vec<(NetworkAddress, ChunkProof)>,
2071    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
2072) -> usize {
2073    let mut correct_answers = 0;
2074    for (addr, chunk_proof) in answers {
2075        if let Some(expected_proof) = expected_proofs.get(&addr) {
2076            if expected_proof.verify(&chunk_proof) {
2077                correct_answers += 1;
2078            } else {
2079                info!("Spot a false answer to the challenge regarding {addr:?}");
2080                // Any false answer shall result in 0 score immediately
2081                return 0;
2082            }
2083        }
2084    }
2085    // TODO: For those answers not among the expected_proofs,
2086    //       it could be due to having different knowledge of records to us.
2087    //       shall we:
2088    //         * set the target being close to us, so that neighbours sharing same knowledge in higher chance
2089    //         * fetch from local to testify
2090    //         * fetch from network to testify
2091    std::cmp::min(
2092        HIGHEST_SCORE,
2093        HIGHEST_SCORE * correct_answers / expected_proofs.len(),
2094    )
2095}
2096
2097#[cfg(test)]
2098mod tests {
2099    use super::*;
2100    use std::str::FromStr;
2101
2102    #[test]
2103    fn test_no_local_peers() {
2104        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
2105        let target = NetworkAddress::from(PeerId::random());
2106        let num_of_peers = Some(5);
2107        let range = None;
2108        let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
2109
2110        assert_eq!(result, vec![]);
2111    }
2112
2113    #[test]
2114    fn test_fewer_local_peers_than_num_of_peers() {
2115        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2116            (
2117                PeerId::random(),
2118                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2119            ),
2120            (
2121                PeerId::random(),
2122                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2123            ),
2124            (
2125                PeerId::random(),
2126                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2127            ),
2128        ];
2129        let target = NetworkAddress::from(PeerId::random());
2130        let num_of_peers = Some(2);
2131        let range = None;
2132        let result = Node::calculate_get_closest_peers(
2133            local_peers.clone(),
2134            target.clone(),
2135            num_of_peers,
2136            range,
2137        );
2138
2139        // Result shall be sorted and truncated
2140        let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2141            .iter()
2142            .map(|(peer_id, multi_addrs)| {
2143                let addr = NetworkAddress::from(*peer_id);
2144                (addr, multi_addrs.clone())
2145            })
2146            .collect();
2147        expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
2148        let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
2149
2150        assert_eq!(expected_result, result);
2151    }
2152
2153    #[test]
2154    fn test_with_range_and_num_of_peers() {
2155        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2156            (
2157                PeerId::random(),
2158                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2159            ),
2160            (
2161                PeerId::random(),
2162                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2163            ),
2164            (
2165                PeerId::random(),
2166                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2167            ),
2168        ];
2169        let target = NetworkAddress::from(PeerId::random());
2170        let num_of_peers = Some(0);
2171        let range_value = [128; 32];
2172        let range = Some(range_value);
2173        let result = Node::calculate_get_closest_peers(
2174            local_peers.clone(),
2175            target.clone(),
2176            num_of_peers,
2177            range,
2178        );
2179
2180        // Range shall be preferred, i.e. the result peers shall all within the range
2181        let distance = U256::from_big_endian(&range_value);
2182        let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2183            .into_iter()
2184            .filter_map(|(peer_id, multi_addrs)| {
2185                let addr = NetworkAddress::from(peer_id);
2186                if target.distance(&addr).0 <= distance {
2187                    Some((addr, multi_addrs.clone()))
2188                } else {
2189                    None
2190                }
2191            })
2192            .collect();
2193
2194        assert_eq!(expected_result, result);
2195    }
2196
2197    mod merkle_payment_tests {
2198        use super::*;
2199
2200        /// Test that timestamp validation accepts valid timestamps (within the acceptable window)
2201        #[test]
2202        fn test_timestamp_validation_accepts_valid_timestamp() {
2203            let now = std::time::SystemTime::now()
2204                .duration_since(std::time::UNIX_EPOCH)
2205                .unwrap()
2206                .as_secs();
2207
2208            // Valid timestamp: 1 hour ago
2209            let valid_timestamp = now - 3600;
2210
2211            // Validate timestamp
2212            let age = now.saturating_sub(valid_timestamp);
2213
2214            assert!(
2215                valid_timestamp <= now,
2216                "Valid timestamp should not be in the future"
2217            );
2218            assert!(
2219                age <= MERKLE_PAYMENT_EXPIRATION,
2220                "Valid timestamp should not be expired"
2221            );
2222        }
2223
2224        /// Test that timestamp validation rejects future timestamps
2225        #[test]
2226        fn test_timestamp_validation_rejects_future_timestamp() {
2227            let now = std::time::SystemTime::now()
2228                .duration_since(std::time::UNIX_EPOCH)
2229                .unwrap()
2230                .as_secs();
2231
2232            // Future timestamp: 1 hour in the future
2233            let future_timestamp = now + 3600;
2234
2235            // Timestamp should be rejected
2236            assert!(
2237                future_timestamp > now,
2238                "Future timestamp should be rejected"
2239            );
2240        }
2241
2242        /// Test that timestamp validation rejects expired timestamps
2243        #[test]
2244        fn test_timestamp_validation_rejects_expired_timestamp() {
2245            let now = std::time::SystemTime::now()
2246                .duration_since(std::time::UNIX_EPOCH)
2247                .unwrap()
2248                .as_secs();
2249
2250            // Expired timestamp: 8 days ago (> 7 day expiration)
2251            let expired_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 86400);
2252
2253            // Calculate age
2254            let age = now.saturating_sub(expired_timestamp);
2255
2256            // Timestamp should be rejected
2257            assert!(
2258                age > MERKLE_PAYMENT_EXPIRATION,
2259                "Expired timestamp should be rejected"
2260            );
2261        }
2262
2263        /// Test timestamp at the exact expiration boundary (should be rejected)
2264        #[test]
2265        fn test_timestamp_validation_at_expiration_boundary() {
2266            let now = std::time::SystemTime::now()
2267                .duration_since(std::time::UNIX_EPOCH)
2268                .unwrap()
2269                .as_secs();
2270
2271            // Timestamp exactly at expiration boundary
2272            let boundary_timestamp = now - MERKLE_PAYMENT_EXPIRATION;
2273
2274            let age = now.saturating_sub(boundary_timestamp);
2275
2276            // At the boundary, age == MERKLE_PAYMENT_EXPIRATION
2277            assert_eq!(age, MERKLE_PAYMENT_EXPIRATION);
2278            // The validation uses >, so this should pass
2279            assert!(
2280                age <= MERKLE_PAYMENT_EXPIRATION,
2281                "Timestamp exactly at boundary should not be rejected"
2282            );
2283        }
2284
2285        /// Test timestamp just beyond expiration boundary (should be rejected)
2286        #[test]
2287        fn test_timestamp_validation_beyond_expiration_boundary() {
2288            let now = std::time::SystemTime::now()
2289                .duration_since(std::time::UNIX_EPOCH)
2290                .unwrap()
2291                .as_secs();
2292
2293            // Timestamp just beyond expiration boundary (1 second past)
2294            let beyond_boundary_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 1);
2295
2296            let age = now.saturating_sub(beyond_boundary_timestamp);
2297
2298            assert!(
2299                age > MERKLE_PAYMENT_EXPIRATION,
2300                "Timestamp beyond boundary should be rejected"
2301            );
2302        }
2303
2304        /// Test timestamp at current time (should be accepted)
2305        #[test]
2306        fn test_timestamp_validation_at_current_time() {
2307            let now = std::time::SystemTime::now()
2308                .duration_since(std::time::UNIX_EPOCH)
2309                .unwrap()
2310                .as_secs();
2311
2312            // Timestamp at current time
2313            let current_timestamp = now;
2314
2315            let age = now.saturating_sub(current_timestamp);
2316
2317            assert!(
2318                current_timestamp <= now,
2319                "Current timestamp should not be in future"
2320            );
2321            assert!(
2322                age <= MERKLE_PAYMENT_EXPIRATION,
2323                "Current timestamp should not be expired"
2324            );
2325            assert_eq!(age, 0, "Age should be 0 for current timestamp");
2326        }
2327
2328        /// Test timestamp near future boundary (1 second in future)
2329        #[test]
2330        fn test_timestamp_validation_near_future_boundary() {
2331            let now = std::time::SystemTime::now()
2332                .duration_since(std::time::UNIX_EPOCH)
2333                .unwrap()
2334                .as_secs();
2335
2336            // Timestamp 1 second in the future
2337            let near_future_timestamp = now + 1;
2338
2339            assert!(
2340                near_future_timestamp > now,
2341                "Near-future timestamp should be rejected"
2342            );
2343        }
2344
2345        /// Test expiration constant is set correctly (7 days = 604800 seconds)
2346        #[test]
2347        fn test_merkle_payment_expiration_constant() {
2348            const SEVEN_DAYS_IN_SECONDS: u64 = 7 * 24 * 60 * 60;
2349            assert_eq!(
2350                MERKLE_PAYMENT_EXPIRATION, SEVEN_DAYS_IN_SECONDS,
2351                "MERKLE_PAYMENT_EXPIRATION should be 7 days"
2352            );
2353        }
2354    }
2355}