ant_node/
node.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use super::{
10    Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkError, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::BootstrapCacheStore;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_protocol::{
22    CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
23    error::Error as ProtocolError,
24    messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
25    storage::ValidationType,
26};
27use bytes::Bytes;
28use itertools::Itertools;
29use libp2p::{
30    Multiaddr, PeerId,
31    identity::Keypair,
32    kad::{Record, U256},
33    request_response::OutboundFailure,
34};
35use num_traits::cast::ToPrimitive;
36use rand::{
37    Rng, SeedableRng,
38    rngs::{OsRng, StdRng},
39    thread_rng,
40};
41use std::{
42    collections::HashMap,
43    net::SocketAddr,
44    path::PathBuf,
45    sync::{
46        Arc,
47        atomic::{AtomicUsize, Ordering},
48    },
49    time::{Duration, Instant},
50};
51use tokio::sync::watch;
52use tokio::{
53    sync::mpsc::Receiver,
54    task::{JoinSet, spawn},
55};
56
57/// Interval to trigger replication of all records to all peers.
58/// This is the max time it should take. Minimum interval at any node will be half this
59pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
60
61/// Interval to trigger storage challenge.
62/// This is the max time it should take. Minimum interval at any node will be half this
63const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
64
65/// Interval to update the nodes uptime metric
66const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
67
68/// Interval to clean up unrelevant records
69/// This is the max time it should take. Minimum interval at any node will be half this
70const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
71
72/// Highest score to achieve from each metric sub-sector during StorageChallenge.
73const HIGHEST_SCORE: usize = 100;
74
75/// Any nodes bearing a score below this shall be considered as bad.
76/// Max is to be 100 * 100
77const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
78
79/// in ms, expecting average StorageChallenge complete time to be around 250ms.
80const TIME_STEP: usize = 20;
81
82/// Helper to build and run a Node
83pub struct NodeBuilder {
84    addr: SocketAddr,
85    bootstrap_cache: Option<BootstrapCacheStore>,
86    evm_address: RewardsAddress,
87    evm_network: EvmNetwork,
88    initial_peers: Vec<Multiaddr>,
89    identity_keypair: Keypair,
90    local: bool,
91    #[cfg(feature = "open-metrics")]
92    /// Set to Some to enable the metrics server
93    metrics_server_port: Option<u16>,
94    no_upnp: bool,
95    relay_client: bool,
96    root_dir: PathBuf,
97}
98
99impl NodeBuilder {
100    /// Instantiate the builder. The initial peers can either be supplied via the `initial_peers` method
101    /// or fetched from the bootstrap cache set using `bootstrap_cache` method.
102    pub fn new(
103        identity_keypair: Keypair,
104        initial_peers: Vec<Multiaddr>,
105        evm_address: RewardsAddress,
106        evm_network: EvmNetwork,
107        addr: SocketAddr,
108        root_dir: PathBuf,
109    ) -> Self {
110        Self {
111            addr,
112            bootstrap_cache: None,
113            evm_address,
114            evm_network,
115            initial_peers,
116            identity_keypair,
117            local: false,
118            #[cfg(feature = "open-metrics")]
119            metrics_server_port: None,
120            no_upnp: false,
121            relay_client: false,
122            root_dir,
123        }
124    }
125
126    /// Set the flag to indicate if the node is running in local mode
127    pub fn local(&mut self, local: bool) {
128        self.local = local;
129    }
130
131    #[cfg(feature = "open-metrics")]
132    /// Set the port for the OpenMetrics server. Defaults to a random port if not set
133    pub fn metrics_server_port(&mut self, port: Option<u16>) {
134        self.metrics_server_port = port;
135    }
136
137    /// Set the initialized bootstrap cache.
138    pub fn bootstrap_cache(&mut self, cache: BootstrapCacheStore) {
139        self.bootstrap_cache = Some(cache);
140    }
141
142    /// Set the flag to make the node act as a relay client
143    pub fn relay_client(&mut self, relay_client: bool) {
144        self.relay_client = relay_client;
145    }
146
147    /// Set the flag to disable UPnP for the node
148    pub fn no_upnp(&mut self, no_upnp: bool) {
149        self.no_upnp = no_upnp;
150    }
151
152    /// Asynchronously runs a new node instance, setting up the swarm driver,
153    /// creating a data storage, and handling network events. Returns the
154    /// created `RunningNode` which contains a `NodeEventsChannel` for listening
155    /// to node-related events.
156    ///
157    /// # Returns
158    ///
159    /// A `RunningNode` instance.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if there is a problem initializing the Network.
164    pub fn build_and_run(self) -> Result<RunningNode> {
165        // setup metrics
166        #[cfg(feature = "open-metrics")]
167        let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
168            // metadata registry
169            let mut metrics_registries = MetricsRegistries::default();
170            let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
171
172            (Some(metrics_recorder), metrics_registries)
173        } else {
174            (None, MetricsRegistries::default())
175        };
176
177        // create a shutdown signal channel
178        let (shutdown_tx, shutdown_rx) = watch::channel(false);
179
180        // init network
181        let network_config = NetworkConfig {
182            keypair: self.identity_keypair,
183            local: self.local,
184            initial_contacts: self.initial_peers,
185            listen_addr: self.addr,
186            root_dir: self.root_dir.clone(),
187            shutdown_rx: shutdown_rx.clone(),
188            bootstrap_cache: self.bootstrap_cache,
189            no_upnp: self.no_upnp,
190            relay_client: self.relay_client,
191            custom_request_timeout: None,
192            #[cfg(feature = "open-metrics")]
193            metrics_registries,
194            #[cfg(feature = "open-metrics")]
195            metrics_server_port: self.metrics_server_port,
196        };
197        let (network, network_event_receiver) = Network::init(network_config)?;
198
199        // init node
200        let node_events_channel = NodeEventsChannel::default();
201        let node = NodeInner {
202            network: network.clone(),
203            events_channel: node_events_channel.clone(),
204            reward_address: self.evm_address,
205            #[cfg(feature = "open-metrics")]
206            metrics_recorder,
207            evm_network: self.evm_network,
208        };
209        let node = Node {
210            inner: Arc::new(node),
211        };
212
213        // Run the node
214        node.run(network_event_receiver, shutdown_rx);
215        let running_node = RunningNode {
216            shutdown_sender: shutdown_tx,
217            network,
218            node_events_channel,
219            root_dir_path: self.root_dir,
220            rewards_address: self.evm_address,
221        };
222
223        Ok(running_node)
224    }
225}
226
227/// `Node` represents a single node in the distributed network. It handles
228/// network events, processes incoming requests, interacts with the data
229/// storage, and broadcasts node-related events.
230#[derive(Clone)]
231pub(crate) struct Node {
232    inner: Arc<NodeInner>,
233}
234
235/// The actual implementation of the Node. The other is just a wrapper around this, so that we don't expose
236/// the Arc from the interface.
237struct NodeInner {
238    events_channel: NodeEventsChannel,
239    network: Network,
240    #[cfg(feature = "open-metrics")]
241    metrics_recorder: Option<NodeMetricsRecorder>,
242    reward_address: RewardsAddress,
243    evm_network: EvmNetwork,
244}
245
246impl Node {
247    /// Returns the NodeEventsChannel
248    pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
249        &self.inner.events_channel
250    }
251
252    /// Returns the instance of Network
253    pub(crate) fn network(&self) -> &Network {
254        &self.inner.network
255    }
256
257    #[cfg(feature = "open-metrics")]
258    /// Returns a reference to the NodeMetricsRecorder if the `open-metrics` feature flag is enabled
259    /// This is used to record various metrics for the node.
260    pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
261        self.inner.metrics_recorder.as_ref()
262    }
263
264    /// Returns the reward address of the node
265    pub(crate) fn reward_address(&self) -> &RewardsAddress {
266        &self.inner.reward_address
267    }
268
269    pub(crate) fn evm_network(&self) -> &EvmNetwork {
270        &self.inner.evm_network
271    }
272
273    /// Spawns a task to process for `NetworkEvents`.
274    /// Returns both tasks as JoinHandle<()>.
275    fn run(
276        self,
277        mut network_event_receiver: Receiver<NetworkEvent>,
278        mut shutdown_rx: watch::Receiver<bool>,
279    ) {
280        let mut rng = StdRng::from_entropy();
281
282        let peers_connected = Arc::new(AtomicUsize::new(0));
283
284        let _node_task = spawn(async move {
285            // use a random activity timeout to ensure that the nodes do not sync when messages
286            // are being transmitted.
287            let replication_interval: u64 = rng.gen_range(
288                PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
289            );
290            let replication_interval_time = Duration::from_secs(replication_interval);
291            debug!("Replication interval set to {replication_interval_time:?}");
292
293            let mut replication_interval = tokio::time::interval(replication_interval_time);
294            let _ = replication_interval.tick().await; // first tick completes immediately
295
296            let mut uptime_metrics_update_interval =
297                tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
298            let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately
299
300            // use a random activity timeout to ensure that the nodes do not sync on work,
301            // causing an overall CPU spike.
302            let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
303                UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
304                    ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
305            );
306            let irrelevant_records_cleanup_interval_time =
307                Duration::from_secs(irrelevant_records_cleanup_interval);
308            let mut irrelevant_records_cleanup_interval =
309                tokio::time::interval(irrelevant_records_cleanup_interval_time);
310            let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately
311
312            // use a random neighbour storage challenge ticker to ensure
313            // neighbours do not carryout challenges at the same time
314            let storage_challenge_interval: u64 =
315                rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
316            let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
317            debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
318
319            let mut storage_challenge_interval =
320                tokio::time::interval(storage_challenge_interval_time);
321            let _ = storage_challenge_interval.tick().await; // first tick completes immediately
322
323            loop {
324                let peers_connected = &peers_connected;
325
326                tokio::select! {
327                    // Check for a shutdown command.
328                    result = shutdown_rx.changed() => {
329                        if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
330                            info!("Shutdown signal received or sender dropped. Exiting network events loop.");
331                            break;
332                        }
333                    },
334                    net_event = network_event_receiver.recv() => {
335                        match net_event {
336                            Some(event) => {
337                                let start = Instant::now();
338                                let event_string = format!("{event:?}");
339
340                                self.handle_network_event(event, peers_connected);
341                                trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
342
343                            }
344                            None => {
345                                error!("The `NetworkEvent` channel is closed");
346                                self.events_channel().broadcast(NodeEvent::ChannelClosed);
347                                break;
348                            }
349                        }
350                    }
351                    // runs every replication_interval time
352                    _ = replication_interval.tick() => {
353                        let start = Instant::now();
354                        let network = self.network().clone();
355                        self.record_metrics(Marker::IntervalReplicationTriggered);
356
357                        let _handle = spawn(async move {
358                            Self::try_interval_replication(network);
359                            trace!("Periodic replication took {:?}", start.elapsed());
360                        });
361                    }
362                    _ = uptime_metrics_update_interval.tick() => {
363                        #[cfg(feature = "open-metrics")]
364                        if let Some(metrics_recorder) = self.metrics_recorder() {
365                            let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
366                        }
367                    }
368                    _ = irrelevant_records_cleanup_interval.tick() => {
369                        let network = self.network().clone();
370
371                        let _handle = spawn(async move {
372                            Self::trigger_irrelevant_record_cleanup(network);
373                        });
374                    }
375                    // runs every storage_challenge_interval time
376                    _ = storage_challenge_interval.tick() => {
377                        let start = Instant::now();
378                        debug!("Periodic storage challenge triggered");
379                        let network = self.network().clone();
380
381                        let _handle = spawn(async move {
382                            Self::storage_challenge(network).await;
383                            trace!("Periodic storage challenge took {:?}", start.elapsed());
384                        });
385                    }
386                }
387            }
388        });
389    }
390
391    /// Calls Marker::log() to insert the marker into the log files.
392    /// Also calls NodeMetrics::record() to record the metric if the `open-metrics` feature flag is enabled.
393    pub(crate) fn record_metrics(&self, marker: Marker) {
394        marker.log();
395        #[cfg(feature = "open-metrics")]
396        if let Some(metrics_recorder) = self.metrics_recorder() {
397            metrics_recorder.record(marker)
398        }
399    }
400
401    // **** Private helpers *****
402
403    /// Handle a network event.
404    /// Spawns a thread for any likely long running tasks
405    fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
406        let start = Instant::now();
407        let event_string = format!("{event:?}");
408        let event_header;
409
410        // Reducing non-mandatory logging
411        if let NetworkEvent::QueryRequestReceived {
412            query: Query::GetVersion { .. },
413            ..
414        } = event
415        {
416            trace!("Handling NetworkEvent {event_string}");
417        } else {
418            debug!("Handling NetworkEvent {event_string}");
419        }
420
421        match event {
422            NetworkEvent::PeerAdded(peer_id, connected_peers) => {
423                event_header = "PeerAdded";
424                // increment peers_connected and send ConnectedToNetwork event if have connected to K_VALUE peers
425                let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
426                if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
427                    self.events_channel()
428                        .broadcast(NodeEvent::ConnectedToNetwork);
429                }
430
431                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
432                self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
433
434                // try query peer version
435                let network = self.network().clone();
436                let _handle = spawn(async move {
437                    Self::try_query_peer_version(network, peer_id, Default::default()).await;
438                });
439
440                // try replication here
441                let network = self.network().clone();
442                self.record_metrics(Marker::IntervalReplicationTriggered);
443                let _handle = spawn(async move {
444                    Self::try_interval_replication(network);
445                });
446            }
447            NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
448                event_header = "PeerRemoved";
449                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
450                self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
451
452                let self_id = self.network().peer_id();
453                let distance =
454                    NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
455                info!(
456                    "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
457                    distance.ilog2()
458                );
459
460                let network = self.network().clone();
461                self.record_metrics(Marker::IntervalReplicationTriggered);
462                let _handle = spawn(async move {
463                    Self::try_interval_replication(network);
464                });
465            }
466            NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
467                event_header = "PeerWithUnsupportedProtocol";
468            }
469            NetworkEvent::NewListenAddr(_) => {
470                event_header = "NewListenAddr";
471            }
472            NetworkEvent::ResponseReceived { res } => {
473                event_header = "ResponseReceived";
474                if let Err(err) = self.handle_response(res) {
475                    error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
476                }
477            }
478            NetworkEvent::KeysToFetchForReplication(keys) => {
479                event_header = "KeysToFetchForReplication";
480                self.record_metrics(Marker::fetching_keys_for_replication(&keys));
481
482                if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
483                    error!("Error while trying to fetch replicated data {err:?}");
484                }
485            }
486            NetworkEvent::QueryRequestReceived { query, channel } => {
487                event_header = "QueryRequestReceived";
488                let node = self.clone();
489                let payment_address = *self.reward_address();
490
491                let _handle = spawn(async move {
492                    let network = node.network().clone();
493                    let res = Self::handle_query(node, query, payment_address).await;
494
495                    // Reducing non-mandatory logging
496                    if let Response::Query(QueryResponse::GetVersion { .. }) = res {
497                        trace!("Sending response {res:?}");
498                    } else {
499                        debug!("Sending response {res:?}");
500                    }
501
502                    network.send_response(res, channel);
503                });
504            }
505            NetworkEvent::UnverifiedRecord(record) => {
506                event_header = "UnverifiedRecord";
507                // queries can be long running and require validation, so we spawn a task to handle them
508                let self_clone = self.clone();
509                let _handle = spawn(async move {
510                    let key = PrettyPrintRecordKey::from(&record.key).into_owned();
511                    match self_clone.validate_and_store_record(record).await {
512                        Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
513                        Err(err) => {
514                            self_clone.record_metrics(Marker::RecordRejected(&key, &err));
515                        }
516                    }
517                });
518            }
519            NetworkEvent::TerminateNode { reason } => {
520                event_header = "TerminateNode";
521                error!("Received termination from swarm_driver due to {reason:?}");
522                self.events_channel()
523                    .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
524            }
525            NetworkEvent::FailedToFetchHolders(bad_nodes) => {
526                event_header = "FailedToFetchHolders";
527                let network = self.network().clone();
528                let pretty_log: Vec<_> = bad_nodes
529                    .iter()
530                    .map(|(peer_id, record_key)| {
531                        let pretty_key = PrettyPrintRecordKey::from(record_key);
532                        (peer_id, pretty_key)
533                    })
534                    .collect();
535                // Note: this log will be checked in CI, and expecting `not appear`.
536                //       any change to the keyword `failed to fetch` shall incur
537                //       correspondent CI script change as well.
538                debug!(
539                    "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
540                );
541                let _handle = spawn(async move {
542                    for (peer_id, record_key) in bad_nodes {
543                        // Obsoleted fetch request (due to flooded in fresh replicates) could result
544                        // in peer to be claimed as bad, as local copy blocks the entry to be cleared.
545                        if let Ok(false) = network.is_record_key_present_locally(&record_key).await
546                        {
547                            error!(
548                                "From peer {peer_id:?}, failed to fetch record {:?}",
549                                PrettyPrintRecordKey::from(&record_key)
550                            );
551                            network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
552                        }
553                    }
554                });
555            }
556            NetworkEvent::QuoteVerification { quotes } => {
557                event_header = "QuoteVerification";
558                let network = self.network().clone();
559
560                let _handle = spawn(async move {
561                    quotes_verification(&network, quotes).await;
562                });
563            }
564            NetworkEvent::FreshReplicateToFetch { holder, keys } => {
565                event_header = "FreshReplicateToFetch";
566                self.fresh_replicate_to_fetch(holder, keys);
567            }
568            NetworkEvent::PeersForVersionQuery(peers) => {
569                event_header = "PeersForVersionQuery";
570                let network = self.network().clone();
571                let _handle = spawn(async move {
572                    Self::query_peers_version(network, peers).await;
573                });
574            }
575        }
576
577        trace!(
578            "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
579            start.elapsed()
580        );
581    }
582
583    // Handle the response that was not awaited at the call site
584    fn handle_response(&self, response: Response) -> Result<()> {
585        match response {
586            Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
587                // This should actually have been short-circuted when received
588                warn!("Mishandled replicate response, should be handled earlier");
589            }
590            Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
591                error!(
592                    "Response to replication shall be handled by called not by common handler, {resp:?}"
593                );
594            }
595            Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
596                // No need to handle
597            }
598            other => {
599                warn!("handle_response not implemented for {other:?}");
600            }
601        };
602
603        Ok(())
604    }
605
606    async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
607        let network = node.network();
608        let resp: QueryResponse = match query {
609            Query::GetStoreQuote {
610                key,
611                data_type,
612                data_size,
613                nonce,
614                difficulty,
615            } => {
616                let record_key = key.to_record_key();
617                let self_id = network.peer_id();
618
619                let maybe_quoting_metrics = network
620                    .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
621                    .await;
622
623                let storage_proofs = if let Some(nonce) = nonce {
624                    Self::respond_x_closest_record_proof(
625                        network,
626                        key.clone(),
627                        nonce,
628                        difficulty,
629                        false,
630                    )
631                    .await
632                } else {
633                    vec![]
634                };
635
636                match maybe_quoting_metrics {
637                    Ok((quoting_metrics, is_already_stored)) => {
638                        if is_already_stored {
639                            QueryResponse::GetStoreQuote {
640                                quote: Err(ProtocolError::RecordExists(
641                                    PrettyPrintRecordKey::from(&record_key).into_owned(),
642                                )),
643                                peer_address: NetworkAddress::from(self_id),
644                                storage_proofs,
645                            }
646                        } else {
647                            QueryResponse::GetStoreQuote {
648                                quote: Self::create_quote_for_storecost(
649                                    network,
650                                    &key,
651                                    &quoting_metrics,
652                                    &payment_address,
653                                ),
654                                peer_address: NetworkAddress::from(self_id),
655                                storage_proofs,
656                            }
657                        }
658                    }
659                    Err(err) => {
660                        warn!("GetStoreQuote failed for {key:?}: {err}");
661                        QueryResponse::GetStoreQuote {
662                            quote: Err(ProtocolError::GetStoreQuoteFailed),
663                            peer_address: NetworkAddress::from(self_id),
664                            storage_proofs,
665                        }
666                    }
667                }
668            }
669            Query::GetReplicatedRecord { requester: _, key } => {
670                let our_address = NetworkAddress::from(network.peer_id());
671                let mut result = Err(ProtocolError::ReplicatedRecordNotFound {
672                    holder: Box::new(our_address.clone()),
673                    key: Box::new(key.clone()),
674                });
675                let record_key = key.as_record_key();
676
677                if let Some(record_key) = record_key
678                    && let Ok(Some(record)) = network.get_local_record(&record_key).await
679                {
680                    result = Ok((our_address, Bytes::from(record.value)));
681                }
682
683                QueryResponse::GetReplicatedRecord(result)
684            }
685            Query::GetChunkExistenceProof {
686                key,
687                nonce,
688                difficulty,
689            } => QueryResponse::GetChunkExistenceProof(
690                Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
691            ),
692            Query::CheckNodeInProblem(target_address) => {
693                debug!("Got CheckNodeInProblem for peer {target_address:?}");
694
695                let is_in_trouble =
696                    if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
697                        result
698                    } else {
699                        debug!("Could not get status of {target_address:?}.");
700                        false
701                    };
702
703                QueryResponse::CheckNodeInProblem {
704                    reporter_address: NetworkAddress::from(network.peer_id()),
705                    target_address,
706                    is_in_trouble,
707                }
708            }
709            Query::GetClosestPeers {
710                key,
711                num_of_peers,
712                range,
713                sign_result,
714            } => {
715                debug!(
716                    "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
717                );
718                Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
719                    .await
720            }
721            Query::GetVersion(_) => QueryResponse::GetVersion {
722                peer: NetworkAddress::from(network.peer_id()),
723                version: ant_build_info::package_version(),
724            },
725            Query::PutRecord {
726                holder,
727                address,
728                serialized_record,
729            } => {
730                let record = Record {
731                    key: address.to_record_key(),
732                    value: serialized_record,
733                    publisher: None,
734                    expires: None,
735                };
736
737                let key = PrettyPrintRecordKey::from(&record.key).into_owned();
738                let result = match node.validate_and_store_record(record).await {
739                    Ok(()) => Ok(()),
740                    Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
741                        node.record_metrics(Marker::RecordRejected(
742                            &key,
743                            &PutValidationError::OutdatedRecordCounter { counter, expected },
744                        ));
745                        Err(ProtocolError::OutdatedRecordCounter { counter, expected })
746                    }
747                    Err(err) => {
748                        node.record_metrics(Marker::RecordRejected(&key, &err));
749                        Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
750                    }
751                };
752                QueryResponse::PutRecord {
753                    result,
754                    peer_address: holder,
755                    record_addr: address,
756                }
757            }
758        };
759        Response::Query(resp)
760    }
761
762    async fn respond_get_closest_peers(
763        network: &Network,
764        target: NetworkAddress,
765        num_of_peers: Option<usize>,
766        range: Option<[u8; 32]>,
767        sign_result: bool,
768    ) -> QueryResponse {
769        let local_peers = network.get_local_peers_with_multiaddr().await;
770        let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
771            Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
772        } else {
773            vec![]
774        };
775
776        let signature = if sign_result {
777            let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
778            bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
779            network.sign(&bytes).ok()
780        } else {
781            None
782        };
783
784        QueryResponse::GetClosestPeers {
785            target,
786            peers,
787            signature,
788        }
789    }
790
791    fn calculate_get_closest_peers(
792        peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
793        target: NetworkAddress,
794        num_of_peers: Option<usize>,
795        range: Option<[u8; 32]>,
796    ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
797        match (num_of_peers, range) {
798            (_, Some(value)) => {
799                let distance = U256::from_big_endian(&value);
800                peer_addrs
801                    .iter()
802                    .filter_map(|(peer_id, multi_addrs)| {
803                        let addr = NetworkAddress::from(*peer_id);
804                        if target.distance(&addr).0 <= distance {
805                            Some((addr, multi_addrs.clone()))
806                        } else {
807                            None
808                        }
809                    })
810                    .collect()
811            }
812            (Some(num_of_peers), _) => {
813                let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
814                    .iter()
815                    .map(|(peer_id, multi_addrs)| {
816                        let addr = NetworkAddress::from(*peer_id);
817                        (addr, multi_addrs.clone())
818                    })
819                    .collect();
820                result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
821                result.into_iter().take(num_of_peers).collect()
822            }
823            (None, None) => vec![],
824        }
825    }
826
827    // Nodes only check ChunkProof each other, to avoid `multi-version` issue
828    // Client check proof against all records, as have to fetch from network anyway.
829    async fn respond_x_closest_record_proof(
830        network: &Network,
831        key: NetworkAddress,
832        nonce: Nonce,
833        difficulty: usize,
834        chunk_only: bool,
835    ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
836        let start = Instant::now();
837        let mut results = vec![];
838        if difficulty == 1 {
839            // Client checking existence of published chunk.
840            let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
841            if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
842                let proof = ChunkProof::new(&record.value, nonce);
843                debug!("Chunk proof for {key:?} is {proof:?}");
844                result = Ok(proof)
845            } else {
846                debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
847            }
848
849            results.push((key.clone(), result));
850        } else {
851            let all_local_records = network.get_all_local_record_addresses().await;
852
853            if let Ok(all_local_records) = all_local_records {
854                let mut all_chunk_addrs: Vec<_> = if chunk_only {
855                    all_local_records
856                        .iter()
857                        .filter_map(|(addr, record_type)| {
858                            if *record_type == ValidationType::Chunk {
859                                Some(addr.clone())
860                            } else {
861                                None
862                            }
863                        })
864                        .collect()
865                } else {
866                    all_local_records.keys().cloned().collect()
867                };
868
869                // Sort by distance and only take first X closest entries
870                all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
871
872                // TODO: this shall be deduced from resource usage dynamically
873                let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
874
875                for addr in all_chunk_addrs.iter().take(workload_factor) {
876                    if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
877                    {
878                        let proof = ChunkProof::new(&record.value, nonce);
879                        debug!("Chunk proof for {key:?} is {proof:?}");
880                        results.push((addr.clone(), Ok(proof)));
881                    }
882                }
883            }
884
885            info!(
886                "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
887                results.len(),
888                start.elapsed()
889            );
890        }
891
892        results
893    }
894
895    /// Check among all chunk type records that we have,
896    /// and randomly pick one as the verification candidate.
897    /// This will challenge all closest peers at once.
898    async fn storage_challenge(network: Network) {
899        let start = Instant::now();
900        let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
901            network.get_k_closest_local_peers_to_the_target(None).await
902        {
903            closest_peers
904                .into_iter()
905                .take(CLOSE_GROUP_SIZE)
906                .collect_vec()
907        } else {
908            error!("Cannot get local neighbours");
909            return;
910        };
911        if closest_peers.len() < CLOSE_GROUP_SIZE {
912            debug!(
913                "Not enough neighbours ({}/{}) to carry out storage challenge.",
914                closest_peers.len(),
915                CLOSE_GROUP_SIZE
916            );
917            return;
918        }
919
920        let mut verify_candidates: Vec<NetworkAddress> =
921            if let Ok(all_keys) = network.get_all_local_record_addresses().await {
922                all_keys
923                    .iter()
924                    .filter_map(|(addr, record_type)| {
925                        if ValidationType::Chunk == *record_type {
926                            Some(addr.clone())
927                        } else {
928                            None
929                        }
930                    })
931                    .collect()
932            } else {
933                error!("Failed to get local record addresses.");
934                return;
935            };
936        let num_of_targets = verify_candidates.len();
937        if num_of_targets < 50 {
938            debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
939            return;
940        }
941
942        // To ensure the neighbours sharing same knowledge as to us,
943        // The target is choosen to be not far from us.
944        let self_addr = NetworkAddress::from(network.peer_id());
945        verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
946        let index: usize = OsRng.gen_range(0..num_of_targets / 2);
947        let target = verify_candidates[index].clone();
948        // TODO: workload shall be dynamically deduced from resource usage
949        let difficulty = CLOSE_GROUP_SIZE;
950        verify_candidates.sort_by_key(|addr| target.distance(addr));
951        let expected_targets = verify_candidates.into_iter().take(difficulty);
952        let nonce: Nonce = thread_rng().r#gen::<u64>();
953        let mut expected_proofs = HashMap::new();
954        for addr in expected_targets {
955            if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
956                let expected_proof = ChunkProof::new(&record.value, nonce);
957                let _ = expected_proofs.insert(addr, expected_proof);
958            } else {
959                error!("Local record {addr:?} cann't be loaded from disk.");
960            }
961        }
962        let request = Request::Query(Query::GetChunkExistenceProof {
963            key: target.clone(),
964            nonce,
965            difficulty,
966        });
967
968        let mut tasks = JoinSet::new();
969        for (peer_id, addresses) in closest_peers {
970            if peer_id == network.peer_id() {
971                continue;
972            }
973            let network_clone = network.clone();
974            let request_clone = request.clone();
975            let expected_proofs_clone = expected_proofs.clone();
976            let _ = tasks.spawn(async move {
977                let res = scoring_peer(
978                    network_clone,
979                    (peer_id, addresses),
980                    request_clone,
981                    expected_proofs_clone,
982                )
983                .await;
984                (peer_id, res)
985            });
986        }
987
988        let mut peer_scores = vec![];
989        while let Some(res) = tasks.join_next().await {
990            match res {
991                Ok((peer_id, score)) => {
992                    let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
993                    if !is_healthy {
994                        info!(
995                            "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
996                        );
997                        // TODO: shall the challenge failure immediately triggers the node to be removed?
998                        network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
999                    }
1000                    peer_scores.push((peer_id, is_healthy));
1001                }
1002                Err(e) => {
1003                    info!("StorageChallenge task completed with error {e:?}");
1004                }
1005            }
1006        }
1007        if !peer_scores.is_empty() {
1008            network.notify_peer_scores(peer_scores);
1009        }
1010
1011        info!(
1012            "Completed node StorageChallenge against neighbours in {:?}!",
1013            start.elapsed()
1014        );
1015    }
1016
1017    /// Query peers' versions and update local knowledge.
1018    async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1019        // To avoid choking, carry out the queries one by one
1020        for (peer_id, addrs) in peers {
1021            Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1022        }
1023    }
1024
1025    /// Query peer's version and update local knowledge.
1026    async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1027        let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1028        // We can skip passing `addrs` here as the new peer should be part of the kad::RT and swarm can get the addr.
1029        let version = match network.send_request(request, peer, addrs).await {
1030            Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1031                trace!("Fetched peer version {peer:?} as {version:?}");
1032                version
1033            }
1034            Ok(other) => {
1035                info!("Not a fetched peer version {peer:?}, {other:?}");
1036                "none".to_string()
1037            }
1038            Err(err) => {
1039                info!("Failed to fetch peer version {peer:?} with error {err:?}");
1040                // Failed version fetch (which contains dial then re-attempt by itself)
1041                // with error of `DialFailure` indicates the peer could be dead with high chance.
1042                // In that case, the peer shall be removed from the routing table.
1043                if let NetworkError::OutboundError(OutboundFailure::DialFailure) = err {
1044                    network.remove_peer(peer);
1045                    return;
1046                }
1047                "old".to_string()
1048            }
1049        };
1050        network.notify_node_version(peer, version);
1051    }
1052}
1053
1054async fn scoring_peer(
1055    network: Network,
1056    peer: (PeerId, Addresses),
1057    request: Request,
1058    expected_proofs: HashMap<NetworkAddress, ChunkProof>,
1059) -> usize {
1060    let peer_id = peer.0;
1061    let start = Instant::now();
1062    let responses = network
1063        .send_and_get_responses(&[peer], &request, true)
1064        .await;
1065
1066    if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
1067        responses.get(&peer_id)
1068    {
1069        if answers.is_empty() {
1070            info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
1071            return 0;
1072        }
1073        let elapsed = start.elapsed();
1074
1075        let mut received_proofs = vec![];
1076        for (addr, proof) in answers {
1077            if let Ok(proof) = proof {
1078                received_proofs.push((addr.clone(), proof.clone()));
1079            }
1080        }
1081
1082        let score = mark_peer(elapsed, received_proofs, &expected_proofs);
1083        info!(
1084            "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
1085            answers.len()
1086        );
1087        score
1088    } else {
1089        info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
1090        0
1091    }
1092}
1093
1094// Based on following metrics:
1095//   * the duration
1096//   * is there false answer
1097//   * percentage of correct answers among the expected closest
1098// The higher the score, the better confidence on the peer
1099fn mark_peer(
1100    duration: Duration,
1101    answers: Vec<(NetworkAddress, ChunkProof)>,
1102    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1103) -> usize {
1104    let duration_score = duration_score_scheme(duration);
1105    let challenge_score = challenge_score_scheme(answers, expected_proofs);
1106
1107    duration_score * challenge_score
1108}
1109
1110// Less duration shall get higher score
1111fn duration_score_scheme(duration: Duration) -> usize {
1112    // So far just a simple stepped scheme, capped by HIGHEST_SCORE
1113    let in_ms = if let Some(value) = duration.as_millis().to_usize() {
1114        value
1115    } else {
1116        info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
1117        1000
1118    };
1119
1120    let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
1121    HIGHEST_SCORE - step
1122}
1123
1124// Any false answer shall result in 0 score immediately
1125fn challenge_score_scheme(
1126    answers: Vec<(NetworkAddress, ChunkProof)>,
1127    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1128) -> usize {
1129    let mut correct_answers = 0;
1130    for (addr, chunk_proof) in answers {
1131        if let Some(expected_proof) = expected_proofs.get(&addr) {
1132            if expected_proof.verify(&chunk_proof) {
1133                correct_answers += 1;
1134            } else {
1135                info!("Spot a false answer to the challenge regarding {addr:?}");
1136                // Any false answer shall result in 0 score immediately
1137                return 0;
1138            }
1139        }
1140    }
1141    // TODO: For those answers not among the expected_proofs,
1142    //       it could be due to having different knowledge of records to us.
1143    //       shall we:
1144    //         * set the target being close to us, so that neighbours sharing same knowledge in higher chance
1145    //         * fetch from local to testify
1146    //         * fetch from network to testify
1147    std::cmp::min(
1148        HIGHEST_SCORE,
1149        HIGHEST_SCORE * correct_answers / expected_proofs.len(),
1150    )
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155    use super::*;
1156    use std::str::FromStr;
1157
1158    #[test]
1159    fn test_no_local_peers() {
1160        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
1161        let target = NetworkAddress::from(PeerId::random());
1162        let num_of_peers = Some(5);
1163        let range = None;
1164        let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
1165
1166        assert_eq!(result, vec![]);
1167    }
1168
1169    #[test]
1170    fn test_fewer_local_peers_than_num_of_peers() {
1171        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1172            (
1173                PeerId::random(),
1174                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1175            ),
1176            (
1177                PeerId::random(),
1178                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1179            ),
1180            (
1181                PeerId::random(),
1182                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1183            ),
1184        ];
1185        let target = NetworkAddress::from(PeerId::random());
1186        let num_of_peers = Some(2);
1187        let range = None;
1188        let result = Node::calculate_get_closest_peers(
1189            local_peers.clone(),
1190            target.clone(),
1191            num_of_peers,
1192            range,
1193        );
1194
1195        // Result shall be sorted and truncated
1196        let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1197            .iter()
1198            .map(|(peer_id, multi_addrs)| {
1199                let addr = NetworkAddress::from(*peer_id);
1200                (addr, multi_addrs.clone())
1201            })
1202            .collect();
1203        expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
1204        let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
1205
1206        assert_eq!(expected_result, result);
1207    }
1208
1209    #[test]
1210    fn test_with_range_and_num_of_peers() {
1211        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1212            (
1213                PeerId::random(),
1214                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1215            ),
1216            (
1217                PeerId::random(),
1218                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1219            ),
1220            (
1221                PeerId::random(),
1222                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1223            ),
1224        ];
1225        let target = NetworkAddress::from(PeerId::random());
1226        let num_of_peers = Some(0);
1227        let range_value = [128; 32];
1228        let range = Some(range_value);
1229        let result = Node::calculate_get_closest_peers(
1230            local_peers.clone(),
1231            target.clone(),
1232            num_of_peers,
1233            range,
1234        );
1235
1236        // Range shall be preferred, i.e. the result peers shall all within the range
1237        let distance = U256::from_big_endian(&range_value);
1238        let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1239            .into_iter()
1240            .filter_map(|(peer_id, multi_addrs)| {
1241                let addr = NetworkAddress::from(peer_id);
1242                if target.distance(&addr).0 <= distance {
1243                    Some((addr, multi_addrs.clone()))
1244                } else {
1245                    None
1246                }
1247            })
1248            .collect();
1249
1250        assert_eq!(expected_result, result);
1251    }
1252}