Skip to main content

ant_node/
node.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use super::{
10    Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::bootstrap::Bootstrap;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_evm::merkle_payments::MERKLE_PAYMENT_EXPIRATION;
22use ant_protocol::{
23    CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
24    error::Error as ProtocolError,
25    messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
26    storage::{Chunk, ValidationType, try_deserialize_record},
27};
28use bytes::Bytes;
29use futures::stream::{self, StreamExt};
30use itertools::Itertools;
31use libp2p::{
32    Multiaddr, PeerId,
33    identity::Keypair,
34    kad::{KBucketDistance as Distance, Record, U256},
35};
36use num_traits::cast::ToPrimitive;
37use rand::{
38    Rng, SeedableRng,
39    rngs::{OsRng, StdRng},
40    seq::SliceRandom,
41    thread_rng,
42};
43use std::{
44    collections::{BTreeSet, HashMap, HashSet},
45    net::SocketAddr,
46    path::PathBuf,
47    sync::{
48        Arc,
49        atomic::{AtomicUsize, Ordering},
50    },
51    time::{Duration, Instant},
52};
53use tokio::sync::{Mutex, watch};
54use tokio::{
55    sync::mpsc::Receiver,
56    task::{JoinSet, spawn},
57};
58
59/// Interval to trigger replication of all records to all peers.
60/// This is the max time it should take. Minimum interval at any node will be half this
61pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
62
63/// Interval to trigger storage challenge.
64/// This is the max time it should take. Minimum interval at any node will be half this
65const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
66
67/// Interval to update the nodes uptime metric
68const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
69
70/// Interval to clean up unrelevant records
71/// This is the max time it should take. Minimum interval at any node will be half this
72const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
73
74/// Highest score to achieve from each metric sub-sector during StorageChallenge.
75const HIGHEST_SCORE: usize = 100;
76
77/// Any nodes bearing a score below this shall be considered as bad.
78/// Max is to be 100 * 100
79const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
80
81/// Time step in milliseconds for duration scoring.
82/// Production measurements show StorageChallenge typically completes in 2-3 seconds.
83/// This step maps 0-5000ms evenly to the score range (HIGHEST_SCORE down to 0).
84/// With TIME_STEP = 50ms and HIGHEST_SCORE = 100:
85///   - 0ms     → score 100 (instant response, best possible)
86///   - 1000ms  → score 80  (1 second, excellent)
87///   - 2500ms  → score 50  (2.5 seconds, average)
88///   - 5000ms  → score 0   (5 seconds or more, slowest acceptable)
89const TIME_STEP: usize = 50;
90
91/// Delay before retrying a failed replicated record fetch.
92const REPLICA_FETCH_RETRY_DELAY: Duration = Duration::from_secs(15 * 60);
93
94/// Number of peers to query for a replicated record health check.
95const REPLICA_FETCH_PEER_COUNT: usize = 5;
96
97/// Minimum number of successful replicas required to consider the record healthy.
98const MIN_HEALTHY_REPLICA_COUNT: usize = 3;
99
100/// Index (0-based, including self) used to derive the distance threshold for nearby records.
101const CLOSE_NEIGHBOUR_DISTANCE_INDEX: usize = 7;
102
103/// Maximum number of closest peers to track for replication behaviour detection.
104///
105/// Set to 20 (roughly 4x CLOSE_GROUP_SIZE) to cover peers that are close enough
106/// to affect data responsibility during churn. Tracking only the closest peers
107/// keeps memory bounded while still detecting restart patterns in the critical
108/// zone where replication matters most.
109const CLOSE_GROUP_TRACKING_LIMIT: usize = 20;
110
111/// Grace period before triggering replication after detecting a peer restart.
112///
113/// When a peer is detected as restarting (removed then quickly re-added),
114/// replication is suppressed for this duration to allow the peer to stabilize.
115/// Set to 90 seconds which is:
116/// - Long enough to cover typical node restart times (30-60s)
117/// - Short enough to ensure data availability isn't compromised
118/// - Approximately half of PERIODIC_REPLICATION_INTERVAL_MAX_S, so the periodic
119///   replication will catch any missed updates within a reasonable timeframe
120const CLOSE_GROUP_RESTART_SUPPRESSION: Duration = Duration::from_secs(90);
121
122/// Helper to build and run a Node
123pub struct NodeBuilder {
124    addr: SocketAddr,
125    bootstrap: Bootstrap,
126    evm_address: RewardsAddress,
127    evm_network: EvmNetwork,
128    identity_keypair: Keypair,
129    local: bool,
130    #[cfg(feature = "open-metrics")]
131    /// Set to Some to enable the metrics server
132    metrics_server_port: Option<u16>,
133    no_upnp: bool,
134    relay_client: bool,
135    root_dir: PathBuf,
136}
137
138impl NodeBuilder {
139    /// Instantiate the builder. The initial peers can either be supplied via the `initial_peers` method
140    /// or fetched from the bootstrap cache set using `bootstrap_cache` method.
141    pub fn new(
142        identity_keypair: Keypair,
143        bootstrap_flow: Bootstrap,
144        evm_address: RewardsAddress,
145        evm_network: EvmNetwork,
146        addr: SocketAddr,
147        root_dir: PathBuf,
148    ) -> Self {
149        Self {
150            addr,
151            bootstrap: bootstrap_flow,
152            evm_address,
153            evm_network,
154            identity_keypair,
155            local: false,
156            #[cfg(feature = "open-metrics")]
157            metrics_server_port: None,
158            no_upnp: false,
159            relay_client: false,
160            root_dir,
161        }
162    }
163
164    /// Set the flag to indicate if the node is running in local mode
165    pub fn local(&mut self, local: bool) {
166        self.local = local;
167    }
168
169    #[cfg(feature = "open-metrics")]
170    /// Set the port for the OpenMetrics server. Defaults to a random port if not set
171    pub fn metrics_server_port(&mut self, port: Option<u16>) {
172        self.metrics_server_port = port;
173    }
174
175    /// Set the flag to make the node act as a relay client
176    pub fn relay_client(&mut self, relay_client: bool) {
177        self.relay_client = relay_client;
178    }
179
180    /// Set the flag to disable UPnP for the node
181    pub fn no_upnp(&mut self, no_upnp: bool) {
182        self.no_upnp = no_upnp;
183    }
184
185    /// Asynchronously runs a new node instance, setting up the swarm driver,
186    /// creating a data storage, and handling network events. Returns the
187    /// created `RunningNode` which contains a `NodeEventsChannel` for listening
188    /// to node-related events.
189    ///
190    /// # Returns
191    ///
192    /// A `RunningNode` instance.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if there is a problem initializing the Network.
197    pub fn build_and_run(self) -> Result<RunningNode> {
198        // setup metrics
199        #[cfg(feature = "open-metrics")]
200        let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
201            // metadata registry
202            let mut metrics_registries = MetricsRegistries::default();
203            let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
204
205            (Some(metrics_recorder), metrics_registries)
206        } else {
207            (None, MetricsRegistries::default())
208        };
209
210        // create a shutdown signal channel
211        let (shutdown_tx, shutdown_rx) = watch::channel(false);
212
213        // init network
214        let network_config = NetworkConfig {
215            keypair: self.identity_keypair,
216            local: self.local,
217            listen_addr: self.addr,
218            root_dir: self.root_dir.clone(),
219            shutdown_rx: shutdown_rx.clone(),
220            bootstrap: self.bootstrap,
221            no_upnp: self.no_upnp,
222            relay_client: self.relay_client,
223            custom_request_timeout: None,
224            #[cfg(feature = "open-metrics")]
225            metrics_registries,
226            #[cfg(feature = "open-metrics")]
227            metrics_server_port: self.metrics_server_port,
228        };
229        let (network, network_event_receiver) = Network::init(network_config)?;
230
231        // init node
232        let node_events_channel = NodeEventsChannel::default();
233        let node = NodeInner {
234            network: network.clone(),
235            events_channel: node_events_channel.clone(),
236            close_group_tracker: Mutex::new(CloseGroupTracker::new(network.peer_id())),
237            reward_address: self.evm_address,
238            #[cfg(feature = "open-metrics")]
239            metrics_recorder,
240            evm_network: self.evm_network,
241        };
242        let node = Node {
243            inner: Arc::new(node),
244        };
245
246        // Run the node
247        node.run(network_event_receiver, shutdown_rx);
248        let running_node = RunningNode {
249            shutdown_sender: shutdown_tx,
250            network,
251            node_events_channel,
252            root_dir_path: self.root_dir,
253            rewards_address: self.evm_address,
254        };
255
256        Ok(running_node)
257    }
258}
259
260/// `Node` represents a single node in the distributed network. It handles
261/// network events, processes incoming requests, interacts with the data
262/// storage, and broadcasts node-related events.
263#[derive(Clone)]
264pub(crate) struct Node {
265    inner: Arc<NodeInner>,
266}
267
268/// The actual implementation of the Node. The other is just a wrapper around this, so that we don't expose
269/// the Arc from the interface.
270struct NodeInner {
271    events_channel: NodeEventsChannel,
272    network: Network,
273    close_group_tracker: Mutex<CloseGroupTracker>,
274    #[cfg(feature = "open-metrics")]
275    metrics_recorder: Option<NodeMetricsRecorder>,
276    reward_address: RewardsAddress,
277    evm_network: EvmNetwork,
278}
279
280impl Node {
281    /// Returns the NodeEventsChannel
282    pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
283        &self.inner.events_channel
284    }
285
286    /// Returns the instance of Network
287    pub(crate) fn network(&self) -> &Network {
288        &self.inner.network
289    }
290
291    fn close_group_tracker(&self) -> &Mutex<CloseGroupTracker> {
292        &self.inner.close_group_tracker
293    }
294
295    #[cfg(feature = "open-metrics")]
296    /// Returns a reference to the NodeMetricsRecorder if the `open-metrics` feature flag is enabled
297    /// This is used to record various metrics for the node.
298    pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
299        self.inner.metrics_recorder.as_ref()
300    }
301
302    /// Returns the reward address of the node
303    pub(crate) fn reward_address(&self) -> &RewardsAddress {
304        &self.inner.reward_address
305    }
306
307    pub(crate) fn evm_network(&self) -> &EvmNetwork {
308        &self.inner.evm_network
309    }
310
311    /// Spawns a task to process for `NetworkEvents`.
312    /// Returns both tasks as JoinHandle<()>.
313    fn run(
314        self,
315        mut network_event_receiver: Receiver<NetworkEvent>,
316        mut shutdown_rx: watch::Receiver<bool>,
317    ) {
318        let mut rng = StdRng::from_entropy();
319
320        let peers_connected = Arc::new(AtomicUsize::new(0));
321
322        let _node_task = spawn(async move {
323            // use a random activity timeout to ensure that the nodes do not sync when messages
324            // are being transmitted.
325            let replication_interval: u64 = rng.gen_range(
326                PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
327            );
328            let replication_interval_time = Duration::from_secs(replication_interval);
329            debug!("Replication interval set to {replication_interval_time:?}");
330
331            let mut replication_interval = tokio::time::interval(replication_interval_time);
332            let _ = replication_interval.tick().await; // first tick completes immediately
333
334            let mut uptime_metrics_update_interval =
335                tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
336            let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately
337
338            // use a random activity timeout to ensure that the nodes do not sync on work,
339            // causing an overall CPU spike.
340            let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
341                UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
342                    ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
343            );
344            let irrelevant_records_cleanup_interval_time =
345                Duration::from_secs(irrelevant_records_cleanup_interval);
346            let mut irrelevant_records_cleanup_interval =
347                tokio::time::interval(irrelevant_records_cleanup_interval_time);
348            let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately
349
350            // use a random neighbour storage challenge ticker to ensure
351            // neighbours do not carryout challenges at the same time
352            let storage_challenge_interval: u64 =
353                rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
354            let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
355            debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
356
357            let mut storage_challenge_interval =
358                tokio::time::interval(storage_challenge_interval_time);
359            let _ = storage_challenge_interval.tick().await; // first tick completes immediately
360
361            loop {
362                let peers_connected = &peers_connected;
363
364                tokio::select! {
365                    // Check for a shutdown command.
366                    result = shutdown_rx.changed() => {
367                        if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
368                            info!("Shutdown signal received or sender dropped. Exiting network events loop.");
369                            break;
370                        }
371                    },
372                    net_event = network_event_receiver.recv() => {
373                        match net_event {
374                            Some(event) => {
375                                let start = Instant::now();
376                                let event_string = format!("{event:?}");
377
378                                self.handle_network_event(event, peers_connected).await;
379                                trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
380
381                            }
382                            None => {
383                                error!("The `NetworkEvent` channel is closed");
384                                self.events_channel().broadcast(NodeEvent::ChannelClosed);
385                                break;
386                            }
387                        }
388                    }
389                    // runs every replication_interval time
390                    _ = replication_interval.tick() => {
391                        let now = Instant::now();
392
393                        {
394                            let mut tracker = self.close_group_tracker().lock().await;
395                            if tracker.handle_timer_expiry(now) {
396                                trace!("Replication timer expired for tracked peers; periodic run will cover it");
397                            }
398                        }
399
400                        let start = now;
401                        let network = self.network().clone();
402                        self.record_metrics(Marker::IntervalReplicationTriggered);
403
404                        let _handle = spawn(async move {
405                            Self::try_interval_replication(network);
406                            trace!("Periodic replication took {:?}", start.elapsed());
407                        });
408                    }
409                    _ = uptime_metrics_update_interval.tick() => {
410                        #[cfg(feature = "open-metrics")]
411                        if let Some(metrics_recorder) = self.metrics_recorder() {
412                            let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
413                        }
414                    }
415                    _ = irrelevant_records_cleanup_interval.tick() => {
416                        let network = self.network().clone();
417
418                        let _handle = spawn(async move {
419                            Self::trigger_irrelevant_record_cleanup(network);
420                        });
421                    }
422                    // runs every storage_challenge_interval time
423                    _ = storage_challenge_interval.tick() => {
424                        let start = Instant::now();
425                        debug!("Periodic storage challenge triggered");
426                        let network = self.network().clone();
427
428                        let _handle = spawn(async move {
429                            Self::storage_challenge(network).await;
430                            trace!("Periodic storage challenge took {:?}", start.elapsed());
431                        });
432                    }
433                }
434            }
435        });
436    }
437
438    /// Calls Marker::log() to insert the marker into the log files.
439    /// Also calls NodeMetrics::record() to record the metric if the `open-metrics` feature flag is enabled.
440    pub(crate) fn record_metrics(&self, marker: Marker) {
441        marker.log();
442        #[cfg(feature = "open-metrics")]
443        if let Some(metrics_recorder) = self.metrics_recorder() {
444            metrics_recorder.record(marker)
445        }
446    }
447
448    // **** Private helpers *****
449
450    /// Handle a network event.
451    /// Spawns a thread for any likely long running tasks
452    async fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
453        let start = Instant::now();
454        let event_string = format!("{event:?}");
455        let event_header;
456
457        // Reducing non-mandatory logging
458        if let NetworkEvent::QueryRequestReceived {
459            query: Query::GetVersion { .. },
460            ..
461        } = event
462        {
463            trace!("Handling NetworkEvent {event_string}");
464        } else {
465            debug!("Handling NetworkEvent {event_string}");
466        }
467
468        match event {
469            NetworkEvent::PeerAdded(peer_id, connected_peers) => {
470                event_header = "PeerAdded";
471                // increment peers_connected and send ConnectedToNetwork event if have connected to K_VALUE peers
472                let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
473                if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
474                    self.events_channel()
475                        .broadcast(NodeEvent::ConnectedToNetwork);
476                }
477
478                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
479                self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
480
481                // try query peer version
482                let network = self.network().clone();
483                let _handle = spawn(async move {
484                    Self::try_query_peer_version(network, peer_id, Default::default()).await;
485                });
486
487                let replication_decision = {
488                    let mut tracker = self.close_group_tracker().lock().await;
489                    tracker.record_peer_added(peer_id)
490                };
491
492                if replication_decision.should_trigger() {
493                    let network = self.network().clone();
494                    self.record_metrics(Marker::IntervalReplicationTriggered);
495                    let _handle = spawn(async move {
496                        Self::try_interval_replication(network);
497                    });
498                } else if matches!(replication_decision, ReplicationDirective::Skip) {
499                    trace!(
500                        "Replication skipped for {peer_id:?} addition due to restart behaviour tracking"
501                    );
502                }
503            }
504            NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
505                event_header = "PeerRemoved";
506                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
507                self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
508
509                let self_id = self.network().peer_id();
510                let distance =
511                    NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
512                info!(
513                    "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
514                    distance.ilog2()
515                );
516
517                let replication_decision = {
518                    let mut tracker = self.close_group_tracker().lock().await;
519                    tracker.record_peer_removed(peer_id)
520                };
521
522                if replication_decision.should_trigger() {
523                    let network = self.network().clone();
524                    self.record_metrics(Marker::IntervalReplicationTriggered);
525                    let _handle = spawn(async move {
526                        Self::try_interval_replication(network);
527                    });
528                } else if matches!(replication_decision, ReplicationDirective::Skip) {
529                    trace!(
530                        "Replication skipped for {peer_id:?} removal due to restart behaviour tracking"
531                    );
532                }
533            }
534            NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
535                event_header = "PeerWithUnsupportedProtocol";
536            }
537            NetworkEvent::NewListenAddr(_) => {
538                event_header = "NewListenAddr";
539            }
540            NetworkEvent::ResponseReceived { res } => {
541                event_header = "ResponseReceived";
542                if let Err(err) = self.handle_response(res) {
543                    error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
544                }
545            }
546            NetworkEvent::KeysToFetchForReplication(keys) => {
547                event_header = "KeysToFetchForReplication";
548                self.record_metrics(Marker::fetching_keys_for_replication(&keys));
549
550                if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
551                    error!("Error while trying to fetch replicated data {err:?}");
552                }
553            }
554            NetworkEvent::QueryRequestReceived { query, channel } => {
555                event_header = "QueryRequestReceived";
556                let node = self.clone();
557                let payment_address = *self.reward_address();
558
559                let _handle = spawn(async move {
560                    let network = node.network().clone();
561                    let res = Self::handle_query(node, query, payment_address).await;
562
563                    // Reducing non-mandatory logging
564                    if let Response::Query(QueryResponse::GetVersion { .. }) = res {
565                        trace!("Sending response {res:?}");
566                    } else {
567                        debug!("Sending response {res:?}");
568                    }
569
570                    network.send_response(res, channel);
571                });
572            }
573            NetworkEvent::UnverifiedRecord(record) => {
574                event_header = "UnverifiedRecord";
575                // queries can be long running and require validation, so we spawn a task to handle them
576                let self_clone = self.clone();
577                let _handle = spawn(async move {
578                    let key = PrettyPrintRecordKey::from(&record.key).into_owned();
579                    match self_clone.validate_and_store_record(record).await {
580                        Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
581                        Err(err) => {
582                            self_clone.record_metrics(Marker::RecordRejected(&key, &err));
583                        }
584                    }
585                });
586            }
587            NetworkEvent::TerminateNode { reason } => {
588                event_header = "TerminateNode";
589                error!("Received termination from swarm_driver due to {reason:?}");
590                self.events_channel()
591                    .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
592            }
593            NetworkEvent::FailedToFetchHolders(bad_nodes) => {
594                event_header = "FailedToFetchHolders";
595                let network = self.network().clone();
596                let pretty_log: Vec<_> = bad_nodes
597                    .iter()
598                    .map(|(peer_id, record_key)| {
599                        let pretty_key = PrettyPrintRecordKey::from(record_key);
600                        (peer_id, pretty_key)
601                    })
602                    .collect();
603                // Note: this log will be checked in CI, and expecting `not appear`.
604                //       any change to the keyword `failed to fetch` shall incur
605                //       correspondent CI script change as well.
606                debug!(
607                    "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
608                );
609                let _handle = spawn(async move {
610                    for (peer_id, record_key) in bad_nodes {
611                        // Obsoleted fetch request (due to flooded in fresh replicates) could result
612                        // in peer to be claimed as bad, as local copy blocks the entry to be cleared.
613                        if let Ok(false) = network.is_record_key_present_locally(&record_key).await
614                        {
615                            error!(
616                                "From peer {peer_id:?}, failed to fetch record {:?}",
617                                PrettyPrintRecordKey::from(&record_key)
618                            );
619                            network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
620                        }
621                    }
622                });
623            }
624            NetworkEvent::QuoteVerification { quotes } => {
625                event_header = "QuoteVerification";
626                let network = self.network().clone();
627
628                let _handle = spawn(async move {
629                    quotes_verification(&network, quotes).await;
630                });
631            }
632            NetworkEvent::FreshReplicateToFetch { holder, keys } => {
633                event_header = "FreshReplicateToFetch";
634                self.fresh_replicate_to_fetch(holder, keys);
635            }
636            NetworkEvent::PeersForVersionQuery(peers) => {
637                event_header = "PeersForVersionQuery";
638                let network = self.network().clone();
639                let _handle = spawn(async move {
640                    Self::query_peers_version(network, peers).await;
641                });
642            }
643            NetworkEvent::NetworkWideReplication { keys } => {
644                event_header = "NetworkWideReplication";
645                self.perform_network_wide_replication(keys);
646            }
647            NetworkEvent::PeerVersionChecked {
648                peer_id,
649                peer_type,
650                result,
651            } => {
652                event_header = "PeerVersionChecked";
653                // Version check completed - logging already done in identify handler
654                // This event is primarily for external monitoring/metrics
655                trace!(
656                    target: "version_gate",
657                    peer_id = %peer_id,
658                    peer_type = %peer_type,
659                    result = ?result,
660                    "Peer version check completed"
661                );
662            }
663            NetworkEvent::PeerVersionRejected {
664                peer_id,
665                peer_type,
666                result,
667                min_version,
668            } => {
669                event_header = "PeerVersionRejected";
670                // Phase 2: Peer was rejected due to version requirements
671                warn!(
672                    target: "version_gate",
673                    peer_id = %peer_id,
674                    peer_type = %peer_type,
675                    result = ?result,
676                    min_version = %min_version,
677                    "Peer rejected due to version requirements and disconnected"
678                );
679            }
680        }
681
682        trace!(
683            "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
684            start.elapsed()
685        );
686    }
687
688    // Handle the response that was not awaited at the call site
689    fn handle_response(&self, response: Response) -> Result<()> {
690        match response {
691            Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
692                // This should actually have been short-circuted when received
693                warn!("Mishandled replicate response, should be handled earlier");
694            }
695            Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
696                error!(
697                    "Response to replication shall be handled by called not by common handler, {resp:?}"
698                );
699            }
700            Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
701                // No need to handle
702            }
703            other => {
704                warn!("handle_response not implemented for {other:?}");
705            }
706        };
707
708        Ok(())
709    }
710
711    async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
712        let network = node.network();
713        let resp: QueryResponse = match query {
714            Query::GetStoreQuote {
715                key,
716                data_type,
717                data_size,
718                nonce,
719                difficulty,
720            } => {
721                let record_key = key.to_record_key();
722                let self_id = network.peer_id();
723
724                let maybe_quoting_metrics = network
725                    .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
726                    .await;
727
728                let storage_proofs = if let Some(nonce) = nonce {
729                    Self::respond_x_closest_record_proof(
730                        network,
731                        key.clone(),
732                        nonce,
733                        difficulty,
734                        false,
735                    )
736                    .await
737                } else {
738                    vec![]
739                };
740
741                match maybe_quoting_metrics {
742                    Ok((quoting_metrics, is_already_stored)) => {
743                        if is_already_stored {
744                            QueryResponse::GetStoreQuote {
745                                quote: Err(ProtocolError::RecordExists(
746                                    PrettyPrintRecordKey::from(&record_key).into_owned(),
747                                )),
748                                peer_address: NetworkAddress::from(self_id),
749                                storage_proofs,
750                            }
751                        } else {
752                            QueryResponse::GetStoreQuote {
753                                quote: Self::create_quote_for_storecost(
754                                    network,
755                                    &key,
756                                    &quoting_metrics,
757                                    &payment_address,
758                                ),
759                                peer_address: NetworkAddress::from(self_id),
760                                storage_proofs,
761                            }
762                        }
763                    }
764                    Err(err) => {
765                        warn!("GetStoreQuote failed for {key:?}: {err}");
766                        QueryResponse::GetStoreQuote {
767                            quote: Err(ProtocolError::GetStoreQuoteFailed),
768                            peer_address: NetworkAddress::from(self_id),
769                            storage_proofs,
770                        }
771                    }
772                }
773            }
774            Query::GetReplicatedRecord { requester: _, key } => {
775                let our_address = NetworkAddress::from(network.peer_id());
776                let record_key = key.to_record_key();
777
778                let result = match network.get_local_record(&record_key).await {
779                    Ok(Some(record)) => Ok((our_address, Bytes::from(record.value))),
780                    Ok(None) => Err(ProtocolError::ReplicatedRecordNotFound {
781                        holder: Box::new(our_address),
782                        key: Box::new(key.clone()),
783                    }),
784                    // Use `PutRecordFailed` as place holder
785                    Err(err) => Err(ProtocolError::PutRecordFailed(format!(
786                        "Error to fetch local record for GetReplicatedRecord {err:?}"
787                    ))),
788                };
789
790                QueryResponse::GetReplicatedRecord(result)
791            }
792            Query::GetChunkExistenceProof {
793                key,
794                nonce,
795                difficulty,
796            } => QueryResponse::GetChunkExistenceProof(
797                Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
798            ),
799            Query::CheckNodeInProblem(target_address) => {
800                debug!("Got CheckNodeInProblem for peer {target_address:?}");
801
802                let is_in_trouble =
803                    if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
804                        result
805                    } else {
806                        debug!("Could not get status of {target_address:?}.");
807                        false
808                    };
809
810                QueryResponse::CheckNodeInProblem {
811                    reporter_address: NetworkAddress::from(network.peer_id()),
812                    target_address,
813                    is_in_trouble,
814                }
815            }
816            Query::GetClosestPeers {
817                key,
818                num_of_peers,
819                range,
820                sign_result,
821            } => {
822                debug!(
823                    "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
824                );
825                Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
826                    .await
827            }
828            Query::GetVersion(_) => QueryResponse::GetVersion {
829                peer: NetworkAddress::from(network.peer_id()),
830                version: ant_build_info::package_version(),
831            },
832            Query::PutRecord {
833                holder,
834                address,
835                serialized_record,
836            } => {
837                let record = Record {
838                    key: address.to_record_key(),
839                    value: serialized_record,
840                    publisher: None,
841                    expires: None,
842                };
843
844                let key = PrettyPrintRecordKey::from(&record.key).into_owned();
845                let result = match node.validate_and_store_record(record).await {
846                    Ok(()) => Ok(()),
847                    Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
848                        node.record_metrics(Marker::RecordRejected(
849                            &key,
850                            &PutValidationError::OutdatedRecordCounter { counter, expected },
851                        ));
852                        Err(ProtocolError::OutdatedRecordCounter { counter, expected })
853                    }
854                    Err(PutValidationError::TopologyVerificationFailed {
855                        target_address,
856                        valid_count,
857                        total_paid,
858                        closest_count,
859                        node_peers,
860                        paid_peers,
861                    }) => {
862                        node.record_metrics(Marker::RecordRejected(
863                            &key,
864                            &PutValidationError::TopologyVerificationFailed {
865                                target_address: target_address.clone(),
866                                valid_count,
867                                total_paid,
868                                closest_count,
869                                node_peers: node_peers.clone(),
870                                paid_peers: paid_peers.clone(),
871                            },
872                        ));
873                        Err(ProtocolError::TopologyVerificationFailed {
874                            target_address: Box::new(target_address),
875                            valid_count,
876                            total_paid,
877                            closest_count,
878                            node_peers,
879                            paid_peers,
880                        })
881                    }
882                    Err(err) => {
883                        node.record_metrics(Marker::RecordRejected(&key, &err));
884                        Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
885                    }
886                };
887                QueryResponse::PutRecord {
888                    result,
889                    peer_address: holder,
890                    record_addr: address,
891                }
892            }
893            Query::GetMerkleCandidateQuote {
894                key,
895                data_type,
896                data_size,
897                merkle_payment_timestamp,
898            } => {
899                Self::respond_merkle_candidate_quote(
900                    network,
901                    key,
902                    data_type,
903                    data_size,
904                    merkle_payment_timestamp,
905                    payment_address,
906                )
907                .await
908            }
909            #[cfg(feature = "developer")]
910            Query::DevGetClosestPeersFromNetwork { key, num_of_peers } => {
911                debug!(
912                    "Got DevGetClosestPeersFromNetwork targeting {key:?} with {num_of_peers:?} peers"
913                );
914                Self::respond_dev_get_closest_peers_from_network(network, key, num_of_peers).await
915            }
916        };
917        Response::Query(resp)
918    }
919
920    async fn respond_get_closest_peers(
921        network: &Network,
922        target: NetworkAddress,
923        num_of_peers: Option<usize>,
924        range: Option<[u8; 32]>,
925        sign_result: bool,
926    ) -> QueryResponse {
927        let local_peers = network.get_local_peers_with_multiaddr().await;
928        let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
929            Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
930        } else {
931            vec![]
932        };
933
934        let signature = if sign_result {
935            let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
936            bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
937            network.sign(&bytes).ok()
938        } else {
939            None
940        };
941
942        QueryResponse::GetClosestPeers {
943            target,
944            peers,
945            signature,
946        }
947    }
948
949    /// Handle DevGetClosestPeersFromNetwork query
950    /// Unlike respond_get_closest_peers which returns local routing table peers,
951    /// this method actively queries the Kademlia network for closest peers.
952    /// Only available when the `developer` feature is enabled.
953    #[cfg(feature = "developer")]
954    async fn respond_dev_get_closest_peers_from_network(
955        network: &Network,
956        target: NetworkAddress,
957        num_of_peers: Option<usize>,
958    ) -> QueryResponse {
959        use ant_protocol::messages::QueryResponse;
960
961        let queried_node = NetworkAddress::from(network.peer_id());
962        debug!(
963            "DevGetClosestPeersFromNetwork: node {queried_node:?} querying network for closest peers to {target:?}"
964        );
965
966        // Query the network for closest peers (this performs an actual Kademlia lookup)
967        let result = network.get_closest_peers(&target).await;
968
969        let peers = match result {
970            Ok(peers) => {
971                let mut converted: Vec<(NetworkAddress, Vec<Multiaddr>)> = peers
972                    .into_iter()
973                    .map(|(peer_id, addrs)| (NetworkAddress::from(peer_id), addrs.0))
974                    .collect();
975
976                // If num_of_peers is specified, limit the results
977                if let Some(n) = num_of_peers {
978                    converted.sort_by_key(|(addr, _)| target.distance(addr));
979                    converted.truncate(n);
980                }
981
982                debug!(
983                    "DevGetClosestPeersFromNetwork: found {} peers closest to {target:?}",
984                    converted.len()
985                );
986                converted
987            }
988            Err(err) => {
989                warn!(
990                    "DevGetClosestPeersFromNetwork: failed to query network for {target:?}: {err:?}"
991                );
992                vec![]
993            }
994        };
995
996        QueryResponse::DevGetClosestPeersFromNetwork {
997            target,
998            queried_node,
999            peers,
1000        }
1001    }
1002
1003    fn calculate_get_closest_peers(
1004        peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
1005        target: NetworkAddress,
1006        num_of_peers: Option<usize>,
1007        range: Option<[u8; 32]>,
1008    ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
1009        match (num_of_peers, range) {
1010            (_, Some(value)) => {
1011                let distance = U256::from_big_endian(&value);
1012                peer_addrs
1013                    .iter()
1014                    .filter_map(|(peer_id, multi_addrs)| {
1015                        let addr = NetworkAddress::from(*peer_id);
1016                        if target.distance(&addr).0 <= distance {
1017                            Some((addr, multi_addrs.clone()))
1018                        } else {
1019                            None
1020                        }
1021                    })
1022                    .collect()
1023            }
1024            (Some(num_of_peers), _) => {
1025                let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
1026                    .iter()
1027                    .map(|(peer_id, multi_addrs)| {
1028                        let addr = NetworkAddress::from(*peer_id);
1029                        (addr, multi_addrs.clone())
1030                    })
1031                    .collect();
1032                result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
1033                result.into_iter().take(num_of_peers).collect()
1034            }
1035            (None, None) => vec![],
1036        }
1037    }
1038
1039    /// Handle GetMerkleCandidateQuote query
1040    /// Returns a signed MerklePaymentCandidateNode containing the node's current quoting metrics,
1041    /// reward address, and timestamp commitment
1042    async fn respond_merkle_candidate_quote(
1043        network: &Network,
1044        key: NetworkAddress,
1045        data_type: u32,
1046        data_size: usize,
1047        merkle_payment_timestamp: u64,
1048        payment_address: RewardsAddress,
1049    ) -> QueryResponse {
1050        debug!(
1051            "merkle payment: GetMerkleCandidateQuote for target {key:?}, timestamp: {merkle_payment_timestamp}, data_type: {data_type}, data_size: {data_size}"
1052        );
1053
1054        // Validate timestamp before signing to prevent committing to invalid times.
1055        // Nodes will reject proofs with expired/future timestamps during payment verification,
1056        // so signing such timestamps would create useless quotes that can't be used.
1057        //
1058        // Allow ±24 hours tolerance to handle timezone differences and clock skew between
1059        // clients and nodes. This prevents valid payments from being rejected due to minor
1060        // time differences while still catching truly expired/invalid timestamps.
1061        const TIMESTAMP_TOLERANCE: u64 = 24 * 60 * 60; // 24 hours
1062
1063        let now = std::time::SystemTime::now()
1064            .duration_since(std::time::UNIX_EPOCH)
1065            .unwrap_or_default()
1066            .as_secs();
1067
1068        // Reject future timestamps (with 24h tolerance for clock skew)
1069        let future_threshold = now + TIMESTAMP_TOLERANCE;
1070        if merkle_payment_timestamp > future_threshold {
1071            let error_msg = format!(
1072                "Rejected future timestamp {merkle_payment_timestamp} (current time: {now}, threshold: {future_threshold})"
1073            );
1074            warn!("{error_msg} for {key:?}");
1075            return QueryResponse::GetMerkleCandidateQuote(Err(
1076                ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1077            ));
1078        }
1079
1080        // Reject expired timestamps (with 24h tolerance for clock skew)
1081        let expiration_threshold = MERKLE_PAYMENT_EXPIRATION + TIMESTAMP_TOLERANCE;
1082        let age = now.saturating_sub(merkle_payment_timestamp);
1083        if age > expiration_threshold {
1084            let error_msg = format!(
1085                "Rejected expired timestamp {merkle_payment_timestamp} (age: {age}s, max: {expiration_threshold}s)",
1086            );
1087            warn!("{error_msg} for {key:?}");
1088            return QueryResponse::GetMerkleCandidateQuote(Err(
1089                ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1090            ));
1091        }
1092
1093        // Get node's current quoting metrics
1094        let record_key = key.to_record_key();
1095        let (quoting_metrics, _is_already_stored) = match network
1096            .get_local_quoting_metrics(record_key, data_type, data_size)
1097            .await
1098        {
1099            Ok(metrics) => metrics,
1100            Err(err) => {
1101                let error_msg = format!("Failed to get quoting metrics for {key:?}: {err}");
1102                warn!("{error_msg}");
1103                return QueryResponse::GetMerkleCandidateQuote(Err(
1104                    ProtocolError::GetMerkleCandidateQuoteFailed(error_msg),
1105                ));
1106            }
1107        };
1108
1109        // Create the MerklePaymentCandidateNode with node's signed commitment
1110        let pub_key = network.get_pub_key();
1111        let reward_address = payment_address;
1112        let bytes = ant_evm::merkle_payments::MerklePaymentCandidateNode::bytes_to_sign(
1113            &quoting_metrics,
1114            &reward_address,
1115            merkle_payment_timestamp,
1116        );
1117        let signature = match network.sign(&bytes) {
1118            Ok(sig) => sig,
1119            Err(e) => {
1120                let error_msg = format!("Failed to sign candidate node for {key:?}: {e}");
1121                error!("{error_msg}");
1122                return QueryResponse::GetMerkleCandidateQuote(Err(
1123                    ProtocolError::FailedToSignMerkleCandidate(error_msg),
1124                ));
1125            }
1126        };
1127
1128        let candidate = ant_evm::merkle_payments::MerklePaymentCandidateNode {
1129            quoting_metrics,
1130            reward_address,
1131            merkle_payment_timestamp,
1132            pub_key,
1133            signature,
1134        };
1135        QueryResponse::GetMerkleCandidateQuote(Ok(candidate))
1136    }
1137
1138    // Nodes only check ChunkProof each other, to avoid `multi-version` issue
1139    // Client check proof against all records, as have to fetch from network anyway.
1140    async fn respond_x_closest_record_proof(
1141        network: &Network,
1142        key: NetworkAddress,
1143        nonce: Nonce,
1144        difficulty: usize,
1145        chunk_only: bool,
1146    ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
1147        let start = Instant::now();
1148        let mut results = vec![];
1149        if difficulty == 1 {
1150            // Client checking existence of published chunk.
1151            let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
1152            if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
1153                let proof = ChunkProof::new(&record.value, nonce);
1154                debug!("Chunk proof for {key:?} is {proof:?}");
1155                result = Ok(proof)
1156            } else {
1157                debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
1158            }
1159
1160            results.push((key.clone(), result));
1161        } else {
1162            let all_local_records = network.get_all_local_record_addresses().await;
1163
1164            if let Ok(all_local_records) = all_local_records {
1165                let mut all_chunk_addrs: Vec<_> = if chunk_only {
1166                    all_local_records
1167                        .iter()
1168                        .filter_map(|(addr, record_type)| {
1169                            if *record_type == ValidationType::Chunk {
1170                                Some(addr.clone())
1171                            } else {
1172                                None
1173                            }
1174                        })
1175                        .collect()
1176                } else {
1177                    all_local_records.keys().cloned().collect()
1178                };
1179
1180                // Sort by distance and only take first X closest entries
1181                all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
1182
1183                // TODO: this shall be deduced from resource usage dynamically
1184                let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
1185
1186                for addr in all_chunk_addrs.iter().take(workload_factor) {
1187                    if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
1188                    {
1189                        let proof = ChunkProof::new(&record.value, nonce);
1190                        debug!("Chunk proof for {key:?} is {proof:?}");
1191                        results.push((addr.clone(), Ok(proof)));
1192                    }
1193                }
1194            }
1195
1196            info!(
1197                "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
1198                results.len(),
1199                start.elapsed()
1200            );
1201        }
1202
1203        results
1204    }
1205
1206    /// Check among all chunk type records that we have,
1207    /// and randomly pick one as the verification candidate.
1208    /// This will challenge all closest peers at once.
1209    async fn storage_challenge(network: Network) {
1210        let start = Instant::now();
1211        let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
1212            network.get_k_closest_local_peers_to_the_target(None).await
1213        {
1214            closest_peers
1215                .into_iter()
1216                .take(CLOSE_GROUP_SIZE)
1217                .collect_vec()
1218        } else {
1219            error!("Cannot get local neighbours");
1220            return;
1221        };
1222        if closest_peers.len() < CLOSE_GROUP_SIZE {
1223            debug!(
1224                "Not enough neighbours ({}/{}) to carry out storage challenge.",
1225                closest_peers.len(),
1226                CLOSE_GROUP_SIZE
1227            );
1228            return;
1229        }
1230
1231        let mut verify_candidates: Vec<NetworkAddress> =
1232            if let Ok(all_keys) = network.get_all_local_record_addresses().await {
1233                all_keys
1234                    .iter()
1235                    .filter_map(|(addr, record_type)| {
1236                        if ValidationType::Chunk == *record_type {
1237                            Some(addr.clone())
1238                        } else {
1239                            None
1240                        }
1241                    })
1242                    .collect()
1243            } else {
1244                error!("Failed to get local record addresses.");
1245                return;
1246            };
1247        let num_of_targets = verify_candidates.len();
1248        if num_of_targets < 50 {
1249            debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
1250            return;
1251        }
1252
1253        // To ensure the neighbours sharing same knowledge as to us,
1254        // The target is choosen to be not far from us.
1255        let self_addr = NetworkAddress::from(network.peer_id());
1256        verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
1257        let index: usize = OsRng.gen_range(0..num_of_targets / 2);
1258        let target = verify_candidates[index].clone();
1259        // TODO: workload shall be dynamically deduced from resource usage
1260        let difficulty = CLOSE_GROUP_SIZE;
1261        verify_candidates.sort_by_key(|addr| target.distance(addr));
1262        let expected_targets = verify_candidates.into_iter().take(difficulty);
1263        let nonce: Nonce = thread_rng().r#gen::<u64>();
1264        let mut expected_proofs = HashMap::new();
1265        for addr in expected_targets {
1266            if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
1267                let expected_proof = ChunkProof::new(&record.value, nonce);
1268                let _ = expected_proofs.insert(addr, expected_proof);
1269            } else {
1270                error!("Local record {addr:?} cann't be loaded from disk.");
1271            }
1272        }
1273        let request = Request::Query(Query::GetChunkExistenceProof {
1274            key: target.clone(),
1275            nonce,
1276            difficulty,
1277        });
1278
1279        let mut tasks = JoinSet::new();
1280        for (peer_id, addresses) in closest_peers {
1281            if peer_id == network.peer_id() {
1282                continue;
1283            }
1284            let network_clone = network.clone();
1285            let request_clone = request.clone();
1286            let expected_proofs_clone = expected_proofs.clone();
1287            let _ = tasks.spawn(async move {
1288                let res = scoring_peer(
1289                    network_clone,
1290                    (peer_id, addresses),
1291                    request_clone,
1292                    expected_proofs_clone,
1293                )
1294                .await;
1295                (peer_id, res)
1296            });
1297        }
1298
1299        let mut peer_scores = vec![];
1300        while let Some(res) = tasks.join_next().await {
1301            match res {
1302                Ok((peer_id, score)) => {
1303                    let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
1304                    if !is_healthy {
1305                        info!(
1306                            "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
1307                        );
1308                        // TODO: shall the challenge failure immediately triggers the node to be removed?
1309                        network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
1310                    }
1311                    peer_scores.push((peer_id, is_healthy));
1312                }
1313                Err(e) => {
1314                    info!("StorageChallenge task completed with error {e:?}");
1315                }
1316            }
1317        }
1318        if !peer_scores.is_empty() {
1319            network.notify_peer_scores(peer_scores);
1320        }
1321
1322        Self::verify_local_replication_health(network.clone()).await;
1323
1324        info!(
1325            "Completed node StorageChallenge against neighbours in {:?}!",
1326            start.elapsed()
1327        );
1328    }
1329
1330    /// Perform a direct replicated record fetch from nearby peers to verify replication health.
1331    async fn verify_local_replication_health(network: Network) {
1332        let closest_peers = match network.get_k_closest_local_peers_to_the_target(None).await {
1333            Ok(peers) => peers,
1334            Err(err) => {
1335                warn!("Cannot fetch closest peers for replica verification: {err:?}");
1336                return;
1337            }
1338        };
1339
1340        if closest_peers.len() <= CLOSE_NEIGHBOUR_DISTANCE_INDEX {
1341            debug!(
1342                "Skipping replica verification as we only know {} neighbours (need > {}).",
1343                closest_peers.len(),
1344                CLOSE_NEIGHBOUR_DISTANCE_INDEX
1345            );
1346            return;
1347        }
1348
1349        let self_address = NetworkAddress::from(network.peer_id());
1350        let Some((threshold_peer, _)) = closest_peers.get(CLOSE_NEIGHBOUR_DISTANCE_INDEX) else {
1351            debug!("Unable to determine distance threshold for replica verification.");
1352            return;
1353        };
1354
1355        let threshold_distance = self_address.distance(&NetworkAddress::from(*threshold_peer));
1356        let Some(threshold_ilog2) = threshold_distance.ilog2() else {
1357            debug!("Threshold distance lacks ilog2; cannot proceed with replica verification.");
1358            return;
1359        };
1360
1361        let local_records = match network.get_all_local_record_addresses().await {
1362            Ok(records) => records,
1363            Err(err) => {
1364                warn!("Failed to list local records for replica verification: {err:?}");
1365                return;
1366            }
1367        };
1368
1369        let mut nearby_records: Vec<NetworkAddress> = local_records
1370            .into_iter()
1371            .filter_map(|(address, record_type)| {
1372                if record_type != ValidationType::Chunk {
1373                    return None;
1374                }
1375                let distance = self_address.distance(&address);
1376                distance.ilog2().and_then(|record_ilog2| {
1377                    if record_ilog2 <= threshold_ilog2 {
1378                        Some(address)
1379                    } else {
1380                        None
1381                    }
1382                })
1383            })
1384            .collect();
1385
1386        nearby_records.shuffle(&mut thread_rng());
1387        let target_record = if let Some(entry) = nearby_records.first().cloned() {
1388            entry
1389        } else {
1390            debug!("No nearby chunk records available for replica verification.");
1391            return;
1392        };
1393
1394        let pretty_key = PrettyPrintRecordKey::from(&target_record.to_record_key()).into_owned();
1395
1396        let candidate_peers = match network
1397            .get_k_closest_local_peers_to_the_target(Some(target_record.clone()))
1398            .await
1399        {
1400            Ok(peers) => peers
1401                .into_iter()
1402                .filter(|(peer_id, _)| peer_id != &network.peer_id())
1403                .take(REPLICA_FETCH_PEER_COUNT)
1404                .collect::<Vec<_>>(),
1405            Err(err) => {
1406                warn!(
1407                    "Cannot fetch record-specific closest peers for replica verification: {err:?}"
1408                );
1409                return;
1410            }
1411        };
1412
1413        if candidate_peers.len() < REPLICA_FETCH_PEER_COUNT {
1414            debug!(
1415                "Only {} peers available for replica verification (need at least {}).",
1416                candidate_peers.len(),
1417                REPLICA_FETCH_PEER_COUNT
1418            );
1419            return;
1420        }
1421
1422        debug!(
1423            "Verifying replicated record {pretty_key:?} against {} closest peers.",
1424            candidate_peers.len()
1425        );
1426
1427        let (successful_peers, failed_peers) = Self::fetch_record_from_peers_with_addresses(
1428            network.clone(),
1429            target_record.clone(),
1430            candidate_peers,
1431        )
1432        .await;
1433
1434        if failed_peers.is_empty() {
1435            debug!("All peers returned record {pretty_key:?} during replica verification.");
1436            return;
1437        }
1438
1439        if successful_peers.len() < MIN_HEALTHY_REPLICA_COUNT {
1440            warn!(
1441                "Replica verification fetched only {} copies of {pretty_key:?}. Record is unhealthy; skipping peer classification.",
1442                successful_peers.len()
1443            );
1444            return;
1445        }
1446
1447        if !failed_peers.is_empty() {
1448            info!(
1449                "Scheduling replica verification retry for {} peers on record {pretty_key:?}.",
1450                failed_peers.len()
1451            );
1452            Self::schedule_record_fetch_retry(network.clone(), target_record.clone(), failed_peers)
1453                .await;
1454        }
1455    }
1456
1457    /// Fetch a replicated record from the provided peers and return the success/failure split.
1458    async fn fetch_record_from_peers_with_addresses(
1459        network: Network,
1460        record_address: NetworkAddress,
1461        peers: Vec<(PeerId, Addresses)>,
1462    ) -> (Vec<PeerId>, Vec<PeerId>) {
1463        let request = Request::Query(Query::GetReplicatedRecord {
1464            requester: NetworkAddress::from(network.peer_id()),
1465            key: record_address.clone(),
1466        });
1467        let expected_key = record_address.to_record_key();
1468        let pretty_key = PrettyPrintRecordKey::from(&expected_key).into_owned();
1469
1470        let mut successes = Vec::new();
1471        let mut failures = Vec::new();
1472        let concurrency = peers.len();
1473        let results = stream::iter(peers.into_iter().map(|(peer_id, addrs)| {
1474            let network_clone = network.clone();
1475            let request_clone = request.clone();
1476            async move {
1477                let result = network_clone
1478                    .send_request(request_clone, peer_id, addrs.clone())
1479                    .await;
1480                (peer_id, addrs, result)
1481            }
1482        }))
1483        .buffer_unordered(concurrency)
1484        .collect::<Vec<_>>()
1485        .await;
1486
1487        for res in results {
1488            match res {
1489                (
1490                    peer_id,
1491                    _addrs,
1492                    Ok((Response::Query(QueryResponse::GetReplicatedRecord(result)), _)),
1493                ) => {
1494                    match result {
1495                        Ok((_holder, value)) => {
1496                            // Try to deserialize the record and ensure the Chunk content matches
1497                            let record =
1498                                Record::new(record_address.to_record_key(), value.to_vec());
1499                            if let Ok(chunk) = try_deserialize_record::<Chunk>(&record)
1500                                && chunk.network_address().to_record_key() == expected_key
1501                            {
1502                                successes.push(peer_id);
1503                            } else {
1504                                warn!(
1505                                    "Peer {peer_id:?} responded with an incorrect chunk copy of {pretty_key:?}."
1506                                );
1507                                failures.push(peer_id);
1508                            }
1509                        }
1510                        Err(err) => {
1511                            info!(
1512                                "Peer {peer_id:?} responded with error {err:?} for replicated record {pretty_key:?}."
1513                            );
1514                            failures.push(peer_id);
1515                        }
1516                    }
1517                }
1518                (peer_id, _addrs, Ok((other_response, _))) => {
1519                    warn!(
1520                        "Peer {peer_id:?} responded with unexpected message {other_response:?} for {pretty_key:?}."
1521                    );
1522                    failures.push(peer_id);
1523                }
1524                (peer_id, _addrs, Err(err)) => {
1525                    info!(
1526                        "Failed to reach peer {peer_id:?} for replicated record {pretty_key:?}: {err:?}"
1527                    );
1528                    failures.push(peer_id);
1529                }
1530            }
1531        }
1532
1533        (successes, failures)
1534    }
1535
1536    /// Schedule a future retry for peers that failed to return the replicated record.
1537    async fn schedule_record_fetch_retry(
1538        network: Network,
1539        record_address: NetworkAddress,
1540        failed_peers: Vec<PeerId>,
1541    ) {
1542        let retry_peers: HashSet<_> = failed_peers.into_iter().collect();
1543        if retry_peers.is_empty() {
1544            return;
1545        }
1546
1547        let record_clone = record_address.clone();
1548        let network_clone = network.clone();
1549
1550        tokio::time::sleep(REPLICA_FETCH_RETRY_DELAY).await;
1551        let pretty_key = PrettyPrintRecordKey::from(&record_clone.to_record_key()).into_owned();
1552
1553        let refreshed_candidates =
1554            Self::refresh_retry_candidate_addresses(&network_clone, &record_clone, &retry_peers)
1555                .await;
1556
1557        if refreshed_candidates.is_empty() {
1558            info!(
1559                "Skipping replica retry for {pretty_key:?}; no tracked peers remain close to the record."
1560            );
1561            return;
1562        }
1563
1564        let (_, still_failed) = Self::fetch_record_from_peers_with_addresses(
1565            network_clone.clone(),
1566            record_clone.clone(),
1567            refreshed_candidates,
1568        )
1569        .await;
1570
1571        if still_failed.is_empty() {
1572            info!("All peers successfully returned {pretty_key:?} during replica retry.");
1573            return;
1574        }
1575
1576        warn!(
1577            "{} peers still failed to provide {pretty_key:?}; evicting and blacklisting.",
1578            still_failed.len()
1579        );
1580        for peer_id in still_failed {
1581            Self::evict_and_blacklist_peer(network_clone.clone(), peer_id);
1582        }
1583    }
1584
1585    /// Get the latest address information for tracked peers via a DHT query.
1586    async fn refresh_retry_candidate_addresses(
1587        network: &Network,
1588        record_address: &NetworkAddress,
1589        retry_peers: &HashSet<PeerId>,
1590    ) -> Vec<(PeerId, Addresses)> {
1591        let pretty_key = PrettyPrintRecordKey::from(&record_address.to_record_key()).into_owned();
1592        let Ok(closest_peers) = network.get_closest_peers(record_address).await else {
1593            warn!(
1594                "Failed to refresh peer addresses for {pretty_key:?}; unable to retry replica fetch."
1595            );
1596            return Vec::new();
1597        };
1598
1599        let closest_map: HashMap<PeerId, Addresses> = closest_peers.into_iter().collect();
1600
1601        retry_peers
1602            .iter()
1603            .filter_map(|peer_id| {
1604                closest_map
1605                    .get(peer_id)
1606                    .cloned()
1607                    .map(|addrs| (*peer_id, addrs))
1608            })
1609            .collect()
1610    }
1611
1612    /// Remove the peer from the routing table and add it to the blacklist.
1613    fn evict_and_blacklist_peer(network: Network, peer_id: PeerId) {
1614        network.remove_peer(peer_id);
1615        network.add_peer_to_blocklist(peer_id);
1616    }
1617
1618    /// Query peers' versions and update local knowledge.
1619    async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1620        // To avoid choking, carry out the queries one by one
1621        for (peer_id, addrs) in peers {
1622            Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1623        }
1624    }
1625
1626    /// Query peer's version and update local knowledge.
1627    async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1628        let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1629        // We can skip passing `addrs` here as the new peer should be part of the kad::RT and swarm can get the addr.
1630        let version = match network.send_request(request, peer, addrs).await {
1631            Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1632                trace!("Fetched peer version {peer:?} as {version:?}");
1633                version
1634            }
1635            Ok(other) => {
1636                info!("Not a fetched peer version {peer:?}, {other:?}");
1637                "none".to_string()
1638            }
1639            Err(err) => {
1640                info!("Failed to fetch peer version {peer:?} with error {err:?}");
1641                // Failed version fetch (which contains dial then re-attempt by itself)
1642                // indicates the peer could be dead with high chance.
1643                // In that case, the peer shall be removed from the routing table.
1644                network.remove_peer(peer);
1645                return;
1646            }
1647        };
1648        network.notify_node_version(peer, version);
1649    }
1650}
1651
1652#[derive(Debug)]
1653struct CloseGroupTracker {
1654    self_address: NetworkAddress,
1655    /// Tracks the closest N peers by (distance, peer_id), sorted by distance to self.
1656    /// Using a tuple key handles the theoretical case where two peers have identical
1657    /// XOR distance to self.
1658    close_up_peers: BTreeSet<(Distance, PeerId)>,
1659    tracked_entries: HashMap<PeerId, BehaviourEntry>,
1660}
1661
1662impl CloseGroupTracker {
1663    fn new(self_peer_id: PeerId) -> Self {
1664        Self {
1665            self_address: NetworkAddress::from(self_peer_id),
1666            close_up_peers: BTreeSet::new(),
1667            tracked_entries: HashMap::new(),
1668        }
1669    }
1670
1671    fn record_peer_added(&mut self, peer_id: PeerId) -> ReplicationDirective {
1672        let distance = self.distance_to_peer(peer_id);
1673
1674        // Check if this peer was already present
1675        let was_present = self.close_up_peers.contains(&(distance, peer_id));
1676
1677        // Determine if the peer is close enough to track
1678        let is_close_enough = self.is_distance_within_close_range(distance);
1679
1680        if is_close_enough {
1681            // Add to close_up_peers
1682            let _ = self.close_up_peers.insert((distance, peer_id));
1683
1684            // If we exceed the limit, remove the farthest peer
1685            if self.close_up_peers.len() > CLOSE_GROUP_TRACKING_LIMIT
1686                && let Some(&(farthest_distance, farthest_peer)) =
1687                    self.close_up_peers.iter().next_back()
1688            {
1689                let _ = self
1690                    .close_up_peers
1691                    .remove(&(farthest_distance, farthest_peer));
1692            }
1693        } else {
1694            // Too far, don't track and clean up any existing entry
1695            let _ = self.tracked_entries.remove(&peer_id);
1696            return ReplicationDirective::Ignore;
1697        }
1698
1699        use std::collections::hash_map::Entry;
1700        match self.tracked_entries.entry(peer_id) {
1701            Entry::Vacant(vacant_entry) => {
1702                let _ = vacant_entry.insert(BehaviourEntry::default());
1703                if !was_present {
1704                    ReplicationDirective::Trigger
1705                } else {
1706                    ReplicationDirective::Ignore
1707                }
1708            }
1709            Entry::Occupied(mut occupied_entry) => {
1710                let entry = occupied_entry.get_mut();
1711                if entry.awaiting_rejoin || entry.timer_deadline.is_some() {
1712                    entry.restart_detected = true;
1713                    entry.awaiting_rejoin = false;
1714                    entry.timer_deadline = None;
1715                    ReplicationDirective::Skip
1716                } else if !was_present {
1717                    entry.restart_detected = false;
1718                    ReplicationDirective::Trigger
1719                } else {
1720                    ReplicationDirective::Ignore
1721                }
1722            }
1723        }
1724    }
1725
1726    fn record_peer_removed(&mut self, peer_id: PeerId) -> ReplicationDirective {
1727        let distance = self.distance_to_peer(peer_id);
1728
1729        // Check if this peer is in our close_up_peers
1730        let was_tracked = self.close_up_peers.contains(&(distance, peer_id));
1731
1732        if !was_tracked {
1733            // Not being tracked, i.e. being out-of-range, hence Ignore
1734            return ReplicationDirective::Ignore;
1735        }
1736
1737        // Remove from close_up_peers
1738        let _ = self.close_up_peers.remove(&(distance, peer_id));
1739
1740        let entry = self.tracked_entries.entry(peer_id).or_default();
1741
1742        if entry.restart_detected {
1743            entry.timer_deadline = Some(Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION);
1744            entry.awaiting_rejoin = false;
1745            ReplicationDirective::Skip
1746        } else {
1747            entry.awaiting_rejoin = true;
1748            entry.timer_deadline = None;
1749            ReplicationDirective::Trigger
1750        }
1751    }
1752
1753    fn handle_timer_expiry(&mut self, now: Instant) -> bool {
1754        let mut expired = false;
1755        for entry in self.tracked_entries.values_mut() {
1756            if let Some(deadline) = entry.timer_deadline
1757                && now >= deadline
1758            {
1759                entry.timer_deadline = None;
1760                entry.restart_detected = false;
1761                entry.awaiting_rejoin = false;
1762                expired = true;
1763            }
1764        }
1765
1766        // Keep only entries that are still relevant
1767        self.tracked_entries.retain(|peer_id, entry| {
1768            let is_currently_present = self
1769                .close_up_peers
1770                .iter()
1771                .any(|(_, present_peer)| present_peer == peer_id);
1772
1773            entry.awaiting_rejoin
1774                || entry.restart_detected
1775                || entry.timer_deadline.is_some()
1776                || is_currently_present
1777        });
1778
1779        expired
1780    }
1781
1782    fn is_distance_within_close_range(&self, distance: Distance) -> bool {
1783        // If we haven't reached the limit yet, any peer is close enough
1784        if self.close_up_peers.len() < CLOSE_GROUP_TRACKING_LIMIT {
1785            return true;
1786        }
1787
1788        // Otherwise, check if this distance is closer than the farthest tracked peer
1789        if let Some(&(farthest_distance, _)) = self.close_up_peers.iter().next_back() {
1790            distance < farthest_distance
1791        } else {
1792            true
1793        }
1794    }
1795
1796    fn distance_to_peer(&self, peer_id: PeerId) -> Distance {
1797        self.self_address.distance(&NetworkAddress::from(peer_id))
1798    }
1799}
1800
1801#[derive(Debug, Default)]
1802struct BehaviourEntry {
1803    awaiting_rejoin: bool,
1804    restart_detected: bool,
1805    timer_deadline: Option<Instant>,
1806}
1807
1808#[derive(Debug, PartialEq, Eq)]
1809enum ReplicationDirective {
1810    Trigger,
1811    Skip,
1812    Ignore,
1813}
1814
1815impl ReplicationDirective {
1816    fn should_trigger(&self) -> bool {
1817        matches!(self, Self::Trigger)
1818    }
1819}
1820
1821#[cfg(test)]
1822mod close_group_tracker_tests {
1823    use super::*;
1824    use std::time::Duration;
1825
1826    fn random_peer() -> PeerId {
1827        let keypair = libp2p::identity::Keypair::generate_ed25519();
1828        PeerId::from(keypair.public())
1829    }
1830
1831    #[test]
1832    fn new_peer_triggers_replication() {
1833        let mut tracker = CloseGroupTracker::new(random_peer());
1834        let peer = random_peer();
1835
1836        assert_eq!(
1837            tracker.record_peer_added(peer),
1838            ReplicationDirective::Trigger
1839        );
1840
1841        assert_eq!(
1842            tracker.record_peer_removed(peer),
1843            ReplicationDirective::Trigger
1844        );
1845    }
1846
1847    #[test]
1848    fn restart_detection_skips_replication() {
1849        let mut tracker = CloseGroupTracker::new(random_peer());
1850        let peer = random_peer();
1851
1852        assert_eq!(
1853            tracker.record_peer_added(peer),
1854            ReplicationDirective::Trigger
1855        );
1856
1857        assert_eq!(
1858            tracker.record_peer_removed(peer),
1859            ReplicationDirective::Trigger
1860        );
1861
1862        assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1863
1864        assert_eq!(
1865            tracker.record_peer_removed(peer),
1866            ReplicationDirective::Skip
1867        );
1868
1869        assert!(tracker.handle_timer_expiry(
1870            Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1)
1871        ));
1872
1873        assert_eq!(
1874            tracker.record_peer_added(peer),
1875            ReplicationDirective::Trigger
1876        );
1877    }
1878
1879    #[test]
1880    fn peer_outside_close_group_is_ignored() {
1881        let mut tracker = CloseGroupTracker::new(random_peer());
1882
1883        // Fill the tracker to capacity with random peers
1884        let mut added_peers = Vec::new();
1885        for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1886            let peer = random_peer();
1887            let result = tracker.record_peer_added(peer);
1888            // All should trigger since we're under capacity
1889            assert_eq!(result, ReplicationDirective::Trigger);
1890            added_peers.push(peer);
1891        }
1892
1893        assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1894
1895        // Now add more peers - some will be ignored if they're farther than existing ones
1896        let mut ignored_count = 0;
1897        let mut trigger_count = 0;
1898        for _ in 0..50 {
1899            let peer = random_peer();
1900            match tracker.record_peer_added(peer) {
1901                ReplicationDirective::Ignore => ignored_count += 1,
1902                ReplicationDirective::Trigger => trigger_count += 1,
1903                _ => {}
1904            }
1905        }
1906
1907        // At capacity, new peers are only accepted if closer than the farthest
1908        // Some should be ignored (too far), some should trigger (closer than farthest)
1909        assert!(
1910            ignored_count > 0 || trigger_count > 0,
1911            "Expected some peers to be processed"
1912        );
1913
1914        // Tracker should still only have CLOSE_GROUP_TRACKING_LIMIT peers
1915        assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1916    }
1917
1918    #[test]
1919    fn removal_of_untracked_peer_is_ignored() {
1920        let mut tracker = CloseGroupTracker::new(random_peer());
1921
1922        // Try to remove a peer that was never added
1923        let unknown_peer = random_peer();
1924        assert_eq!(
1925            tracker.record_peer_removed(unknown_peer),
1926            ReplicationDirective::Ignore
1927        );
1928    }
1929
1930    #[test]
1931    fn timer_expiry_resets_restart_state() {
1932        let mut tracker = CloseGroupTracker::new(random_peer());
1933        let peer = random_peer();
1934
1935        // Add and remove peer (normal behavior)
1936        assert_eq!(
1937            tracker.record_peer_added(peer),
1938            ReplicationDirective::Trigger
1939        );
1940        assert_eq!(
1941            tracker.record_peer_removed(peer),
1942            ReplicationDirective::Trigger
1943        );
1944
1945        // Peer rejoins quickly - detected as restart, skipped
1946        assert_eq!(tracker.record_peer_added(peer), ReplicationDirective::Skip);
1947
1948        // Remove again - still in restart mode, skipped with timer set
1949        assert_eq!(
1950            tracker.record_peer_removed(peer),
1951            ReplicationDirective::Skip
1952        );
1953
1954        // Timer hasn't expired yet - no change
1955        assert!(!tracker.handle_timer_expiry(Instant::now()));
1956
1957        // Timer expires
1958        let after_expiry =
1959            Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
1960        assert!(tracker.handle_timer_expiry(after_expiry));
1961
1962        // Now the peer should trigger replication again (state reset)
1963        assert_eq!(
1964            tracker.record_peer_added(peer),
1965            ReplicationDirective::Trigger
1966        );
1967    }
1968
1969    #[test]
1970    fn eviction_maintains_closest_peers() {
1971        let self_peer = random_peer();
1972        let mut tracker = CloseGroupTracker::new(self_peer);
1973
1974        // Add exactly CLOSE_GROUP_TRACKING_LIMIT peers
1975        let mut peers_with_distances: Vec<(PeerId, Distance)> = Vec::new();
1976        for _ in 0..CLOSE_GROUP_TRACKING_LIMIT {
1977            let peer = random_peer();
1978            let distance = tracker.distance_to_peer(peer);
1979            let _ = tracker.record_peer_added(peer);
1980            peers_with_distances.push((peer, distance));
1981        }
1982
1983        assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
1984
1985        // Sort by distance to know which is farthest
1986        peers_with_distances.sort_by_key(|(_, d)| *d);
1987        let farthest_distance = peers_with_distances.last().map(|(_, d)| *d).unwrap();
1988
1989        // Add a new peer that's closer than the farthest
1990        // Keep trying until we find one (random, so may take a few tries)
1991        let mut found_closer = false;
1992        for _ in 0..100 {
1993            let new_peer = random_peer();
1994            let new_distance = tracker.distance_to_peer(new_peer);
1995            if new_distance < farthest_distance {
1996                let result = tracker.record_peer_added(new_peer);
1997                assert_eq!(result, ReplicationDirective::Trigger);
1998                // Should still have exactly CLOSE_GROUP_TRACKING_LIMIT peers
1999                assert_eq!(tracker.close_up_peers.len(), CLOSE_GROUP_TRACKING_LIMIT);
2000                found_closer = true;
2001                break;
2002            }
2003        }
2004
2005        // Note: This assertion might occasionally fail due to randomness,
2006        // but with 100 attempts it's extremely unlikely
2007        assert!(
2008            found_closer,
2009            "Could not find a peer closer than the farthest in 100 attempts"
2010        );
2011    }
2012
2013    #[test]
2014    fn tracked_entries_cleaned_up_on_timer_expiry() {
2015        let mut tracker = CloseGroupTracker::new(random_peer());
2016        let peer = random_peer();
2017
2018        // Add, remove, re-add (restart detected), remove again (timer set)
2019        let _ = tracker.record_peer_added(peer);
2020        let _ = tracker.record_peer_removed(peer);
2021        let _ = tracker.record_peer_added(peer);
2022        let _ = tracker.record_peer_removed(peer);
2023
2024        // Peer should have a tracked entry with timer
2025        assert!(tracker.tracked_entries.contains_key(&peer));
2026
2027        // After timer expiry, entry should be cleaned up since peer is not present
2028        let after_expiry =
2029            Instant::now() + CLOSE_GROUP_RESTART_SUPPRESSION + Duration::from_secs(1);
2030        let _ = tracker.handle_timer_expiry(after_expiry);
2031
2032        // Entry should be removed since peer is not in close_up_peers
2033        // and no longer has active flags
2034        assert!(!tracker.tracked_entries.contains_key(&peer));
2035    }
2036}
2037
2038async fn scoring_peer(
2039    network: Network,
2040    peer: (PeerId, Addresses),
2041    request: Request,
2042    expected_proofs: HashMap<NetworkAddress, ChunkProof>,
2043) -> usize {
2044    let peer_id = peer.0;
2045    let start = Instant::now();
2046    let responses = network
2047        .send_and_get_responses(&[peer], &request, true)
2048        .await;
2049
2050    if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
2051        responses.get(&peer_id)
2052    {
2053        if answers.is_empty() {
2054            info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
2055            return 0;
2056        }
2057        let elapsed = start.elapsed();
2058
2059        let mut received_proofs = vec![];
2060        for (addr, proof) in answers {
2061            if let Ok(proof) = proof {
2062                received_proofs.push((addr.clone(), proof.clone()));
2063            }
2064        }
2065
2066        let score = mark_peer(elapsed, received_proofs, &expected_proofs);
2067        info!(
2068            "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
2069            answers.len()
2070        );
2071        score
2072    } else {
2073        info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
2074        0
2075    }
2076}
2077
2078// ========================================================================================
2079// TRUST SCORE CALCULATION AND USAGE DOCUMENTATION
2080// ========================================================================================
2081//
2082// ## Overview
2083//
2084// The trust scoring system evaluates peer reliability through StorageChallenge responses.
2085// Scores are used to determine whether to accept replication requests from specific peers,
2086// providing protection against malicious actors trying to inject invalid data.
2087//
2088// ## Score Calculation
2089//
2090// The final trust score is calculated as: `duration_score * challenge_score`
2091//
2092// ### Duration Score (0-100)
2093// - Measures how quickly a peer responds to StorageChallenge
2094// - Calculated using `duration_score_scheme()`:
2095//   * score = HIGHEST_SCORE - min(HIGHEST_SCORE, duration_ms / TIME_STEP)
2096//   * With TIME_STEP=50ms: 0ms→100, 2500ms→50, 5000ms+→0
2097// - Rationale: Faster responses indicate a healthy, responsive node
2098//
2099// ### Challenge Score (0-100)
2100// - Measures the correctness of StorageChallenge responses
2101// - Calculated using `challenge_score_scheme()`:
2102//   * Returns 0 immediately if ANY false answer is detected (critical failure)
2103//   * Otherwise: min(100, 100 * correct_answers / expected_proofs)
2104// - Rationale: Correct proofs indicate the peer actually stores the claimed data
2105//
2106// ### Combined Score (0-10000)
2107// - Final score = duration_score * challenge_score
2108// - Range: 0 (completely untrustworthy) to 10000 (perfect)
2109// - Threshold: MIN_ACCEPTABLE_HEALTHY_SCORE (3000) determines healthy vs unhealthy
2110//
2111// ## Score Usage in Replication
2112//
2113// The ReplicationFetcher uses trust scores with a dual-approach strategy:
2114//
2115// ### 1. Trust-Based Approach
2116// - Each peer maintains a history of the last 2 challenge scores (healthy/unhealthy)
2117// - `is_peer_trustworthy()` returns:
2118//   * Some(true)  - Peer has 2+ scores, at least 2 healthy → accept immediately
2119//   * Some(false) - Peer has 2+ scores, fewer than 2 healthy → reject
2120//   * None        - Insufficient history → cannot determine
2121//
2122// ### 2. Majority Approach
2123// - For unknown peers (or as parallel validation), accumulate replication requests
2124// - Accept a key when CLOSE_GROUP_SIZE/2 different peers report the same key
2125// - Provides Byzantine fault tolerance against malicious peers
2126//
2127// ### Parallel Operation
2128// Both approaches work simultaneously:
2129// - Trustworthy peers: Keys accepted immediately AND added to majority accumulator
2130// - Untrustworthy peers: Keys rejected entirely (not added to majority)
2131// - Unknown peers: Keys added to majority accumulator only (wait for consensus)
2132//
2133// This dual approach provides defense-in-depth:
2134// - Fast acceptance from proven trustworthy peers
2135// - Byzantine fault tolerance for unknown/new peers
2136// - Protection against malicious peers through scoring history
2137//
2138// ## Score Lifecycle
2139//
2140// 1. StorageChallenge is triggered periodically (STORE_CHALLENGE_INTERVAL_MAX_S)
2141// 2. Random chunks are selected and proofs requested from peers
2142// 3. Responses are scored using `mark_peer()`
2143// 4. Scores are recorded in ReplicationFetcher via `add_peer_scores()`
2144// 5. Scores influence subsequent replication acceptance decisions
2145//
2146// ## Security Considerations
2147//
2148// - Any false proof immediately results in score 0 (zero tolerance for data corruption)
2149// - At least 2 healthy scores required to be considered trustworthy
2150// - Majority consensus required for unknown peers (Byzantine fault tolerance)
2151// - Score history is limited to 2 entries (recent behavior matters most)
2152// - Peer score entries pruned to 20 most recent (bounded memory usage)
2153//
2154// ========================================================================================
2155
2156/// Calculates the overall trust score for a peer based on StorageChallenge response.
2157///
2158/// The score combines two metrics:
2159///   * Duration: How quickly the peer responded (faster = higher score)
2160///   * Correctness: Whether the proofs provided are valid (any false = 0)
2161///
2162/// Returns: A score from 0 to 10000 (HIGHEST_SCORE * HIGHEST_SCORE)
2163///   * 0: Untrustworthy (false proof or extremely slow)
2164///   * 3000+: Healthy (MIN_ACCEPTABLE_HEALTHY_SCORE threshold)
2165///   * 10000: Perfect (instant response, all proofs correct)
2166fn mark_peer(
2167    duration: Duration,
2168    answers: Vec<(NetworkAddress, ChunkProof)>,
2169    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
2170) -> usize {
2171    let duration_score = duration_score_scheme(duration);
2172    let challenge_score = challenge_score_scheme(answers, expected_proofs);
2173
2174    duration_score * challenge_score
2175}
2176
2177/// Calculates a score based on response duration - faster responses get higher scores.
2178///
2179/// The scoring uses a linear step function that maps duration to score:
2180///   score = HIGHEST_SCORE - min(HIGHEST_SCORE, duration_ms / TIME_STEP)
2181///
2182/// With current constants (TIME_STEP=50ms, HIGHEST_SCORE=100):
2183///   - 0ms     → 100 (instant, best)
2184///   - 500ms   → 90
2185///   - 1000ms  → 80
2186///   - 2000ms  → 60
2187///   - 2500ms  → 50 (midpoint)
2188///   - 4000ms  → 20
2189///   - 5000ms+ → 0  (slowest, capped)
2190///
2191/// This scheme is designed for production networks where StorageChallenge
2192/// typically completes in 2-3 seconds under normal conditions.
2193fn duration_score_scheme(duration: Duration) -> usize {
2194    let in_ms = if let Some(value) = duration.as_millis().to_usize() {
2195        value
2196    } else {
2197        info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
2198        HIGHEST_SCORE * TIME_STEP 
2199    };
2200
2201    let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
2202    HIGHEST_SCORE - step
2203}
2204
2205/// Calculates a score based on the correctness of StorageChallenge responses.
2206///
2207/// Scoring rules:
2208/// - Any false proof (invalid ChunkProof) → returns 0 immediately (zero tolerance)
2209/// - All proofs valid → returns percentage of correct answers (0-100)
2210///
2211/// The zero-tolerance policy for false proofs is critical for security:
2212/// a malicious node trying to fake storage will be immediately detected
2213/// and marked as untrustworthy.
2214///
2215/// Returns: A score from 0 to HIGHEST_SCORE (100)
2216fn challenge_score_scheme(
2217    answers: Vec<(NetworkAddress, ChunkProof)>,
2218    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
2219) -> usize {
2220    let mut correct_answers = 0;
2221    for (addr, chunk_proof) in answers {
2222        if let Some(expected_proof) = expected_proofs.get(&addr) {
2223            if expected_proof.verify(&chunk_proof) {
2224                correct_answers += 1;
2225            } else {
2226                info!("Spot a false answer to the challenge regarding {addr:?}");
2227                // Any false answer shall result in 0 score immediately
2228                // This is zero-tolerance policy for data integrity
2229                return 0;
2230            }
2231        }
2232    }
2233    // For answers not among expected_proofs, they might be valid but unknown to us
2234    // (due to different network knowledge). We only score what we can verify.
2235    std::cmp::min(
2236        HIGHEST_SCORE,
2237        HIGHEST_SCORE * correct_answers / expected_proofs.len(),
2238    )
2239}
2240
2241#[cfg(test)]
2242mod tests {
2243    use super::*;
2244    use std::str::FromStr;
2245
2246    #[test]
2247    fn test_no_local_peers() {
2248        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
2249        let target = NetworkAddress::from(PeerId::random());
2250        let num_of_peers = Some(5);
2251        let range = None;
2252        let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
2253
2254        assert_eq!(result, vec![]);
2255    }
2256
2257    #[test]
2258    fn test_fewer_local_peers_than_num_of_peers() {
2259        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2260            (
2261                PeerId::random(),
2262                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2263            ),
2264            (
2265                PeerId::random(),
2266                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2267            ),
2268            (
2269                PeerId::random(),
2270                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2271            ),
2272        ];
2273        let target = NetworkAddress::from(PeerId::random());
2274        let num_of_peers = Some(2);
2275        let range = None;
2276        let result = Node::calculate_get_closest_peers(
2277            local_peers.clone(),
2278            target.clone(),
2279            num_of_peers,
2280            range,
2281        );
2282
2283        // Result shall be sorted and truncated
2284        let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2285            .iter()
2286            .map(|(peer_id, multi_addrs)| {
2287                let addr = NetworkAddress::from(*peer_id);
2288                (addr, multi_addrs.clone())
2289            })
2290            .collect();
2291        expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
2292        let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
2293
2294        assert_eq!(expected_result, result);
2295    }
2296
2297    #[test]
2298    fn test_with_range_and_num_of_peers() {
2299        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
2300            (
2301                PeerId::random(),
2302                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
2303            ),
2304            (
2305                PeerId::random(),
2306                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2307            ),
2308            (
2309                PeerId::random(),
2310                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
2311            ),
2312        ];
2313        let target = NetworkAddress::from(PeerId::random());
2314        let num_of_peers = Some(0);
2315        let range_value = [128; 32];
2316        let range = Some(range_value);
2317        let result = Node::calculate_get_closest_peers(
2318            local_peers.clone(),
2319            target.clone(),
2320            num_of_peers,
2321            range,
2322        );
2323
2324        // Range shall be preferred, i.e. the result peers shall all within the range
2325        let distance = U256::from_big_endian(&range_value);
2326        let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
2327            .into_iter()
2328            .filter_map(|(peer_id, multi_addrs)| {
2329                let addr = NetworkAddress::from(peer_id);
2330                if target.distance(&addr).0 <= distance {
2331                    Some((addr, multi_addrs.clone()))
2332                } else {
2333                    None
2334                }
2335            })
2336            .collect();
2337
2338        assert_eq!(expected_result, result);
2339    }
2340
2341    mod merkle_payment_tests {
2342        use super::*;
2343
2344        /// Test that timestamp validation accepts valid timestamps (within the acceptable window)
2345        #[test]
2346        fn test_timestamp_validation_accepts_valid_timestamp() {
2347            let now = std::time::SystemTime::now()
2348                .duration_since(std::time::UNIX_EPOCH)
2349                .unwrap()
2350                .as_secs();
2351
2352            // Valid timestamp: 1 hour ago
2353            let valid_timestamp = now - 3600;
2354
2355            // Validate timestamp
2356            let age = now.saturating_sub(valid_timestamp);
2357
2358            assert!(
2359                valid_timestamp <= now,
2360                "Valid timestamp should not be in the future"
2361            );
2362            assert!(
2363                age <= MERKLE_PAYMENT_EXPIRATION,
2364                "Valid timestamp should not be expired"
2365            );
2366        }
2367
2368        /// Test that timestamp validation rejects future timestamps
2369        #[test]
2370        fn test_timestamp_validation_rejects_future_timestamp() {
2371            let now = std::time::SystemTime::now()
2372                .duration_since(std::time::UNIX_EPOCH)
2373                .unwrap()
2374                .as_secs();
2375
2376            // Future timestamp: 1 hour in the future
2377            let future_timestamp = now + 3600;
2378
2379            // Timestamp should be rejected
2380            assert!(
2381                future_timestamp > now,
2382                "Future timestamp should be rejected"
2383            );
2384        }
2385
2386        /// Test that timestamp validation rejects expired timestamps
2387        #[test]
2388        fn test_timestamp_validation_rejects_expired_timestamp() {
2389            let now = std::time::SystemTime::now()
2390                .duration_since(std::time::UNIX_EPOCH)
2391                .unwrap()
2392                .as_secs();
2393
2394            // Expired timestamp: 8 days ago (> 7 day expiration)
2395            let expired_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 86400);
2396
2397            // Calculate age
2398            let age = now.saturating_sub(expired_timestamp);
2399
2400            // Timestamp should be rejected
2401            assert!(
2402                age > MERKLE_PAYMENT_EXPIRATION,
2403                "Expired timestamp should be rejected"
2404            );
2405        }
2406
2407        /// Test timestamp at the exact expiration boundary (should be rejected)
2408        #[test]
2409        fn test_timestamp_validation_at_expiration_boundary() {
2410            let now = std::time::SystemTime::now()
2411                .duration_since(std::time::UNIX_EPOCH)
2412                .unwrap()
2413                .as_secs();
2414
2415            // Timestamp exactly at expiration boundary
2416            let boundary_timestamp = now - MERKLE_PAYMENT_EXPIRATION;
2417
2418            let age = now.saturating_sub(boundary_timestamp);
2419
2420            // At the boundary, age == MERKLE_PAYMENT_EXPIRATION
2421            assert_eq!(age, MERKLE_PAYMENT_EXPIRATION);
2422            // The validation uses >, so this should pass
2423            assert!(
2424                age <= MERKLE_PAYMENT_EXPIRATION,
2425                "Timestamp exactly at boundary should not be rejected"
2426            );
2427        }
2428
2429        /// Test timestamp just beyond expiration boundary (should be rejected)
2430        #[test]
2431        fn test_timestamp_validation_beyond_expiration_boundary() {
2432            let now = std::time::SystemTime::now()
2433                .duration_since(std::time::UNIX_EPOCH)
2434                .unwrap()
2435                .as_secs();
2436
2437            // Timestamp just beyond expiration boundary (1 second past)
2438            let beyond_boundary_timestamp = now - (MERKLE_PAYMENT_EXPIRATION + 1);
2439
2440            let age = now.saturating_sub(beyond_boundary_timestamp);
2441
2442            assert!(
2443                age > MERKLE_PAYMENT_EXPIRATION,
2444                "Timestamp beyond boundary should be rejected"
2445            );
2446        }
2447
2448        /// Test timestamp at current time (should be accepted)
2449        #[test]
2450        fn test_timestamp_validation_at_current_time() {
2451            let now = std::time::SystemTime::now()
2452                .duration_since(std::time::UNIX_EPOCH)
2453                .unwrap()
2454                .as_secs();
2455
2456            // Timestamp at current time
2457            let current_timestamp = now;
2458
2459            let age = now.saturating_sub(current_timestamp);
2460
2461            assert!(
2462                current_timestamp <= now,
2463                "Current timestamp should not be in future"
2464            );
2465            assert!(
2466                age <= MERKLE_PAYMENT_EXPIRATION,
2467                "Current timestamp should not be expired"
2468            );
2469            assert_eq!(age, 0, "Age should be 0 for current timestamp");
2470        }
2471
2472        /// Test timestamp near future boundary (1 second in future)
2473        #[test]
2474        fn test_timestamp_validation_near_future_boundary() {
2475            let now = std::time::SystemTime::now()
2476                .duration_since(std::time::UNIX_EPOCH)
2477                .unwrap()
2478                .as_secs();
2479
2480            // Timestamp 1 second in the future
2481            let near_future_timestamp = now + 1;
2482
2483            assert!(
2484                near_future_timestamp > now,
2485                "Near-future timestamp should be rejected"
2486            );
2487        }
2488
2489        /// Test expiration constant is set correctly (7 days = 604800 seconds)
2490        #[test]
2491        fn test_merkle_payment_expiration_constant() {
2492            const SEVEN_DAYS_IN_SECONDS: u64 = 7 * 24 * 60 * 60;
2493            assert_eq!(
2494                MERKLE_PAYMENT_EXPIRATION, SEVEN_DAYS_IN_SECONDS,
2495                "MERKLE_PAYMENT_EXPIRATION should be 7 days"
2496            );
2497        }
2498    }
2499}