ant_node/
node.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use super::{
10    Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkError, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::bootstrap::Bootstrap;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_protocol::{
22    CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
23    error::Error as ProtocolError,
24    messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
25    storage::ValidationType,
26};
27use bytes::Bytes;
28use itertools::Itertools;
29use libp2p::{
30    Multiaddr, PeerId,
31    identity::Keypair,
32    kad::{Record, U256},
33    request_response::OutboundFailure,
34};
35use num_traits::cast::ToPrimitive;
36use rand::{
37    Rng, SeedableRng,
38    rngs::{OsRng, StdRng},
39    thread_rng,
40};
41use std::{
42    collections::HashMap,
43    net::SocketAddr,
44    path::PathBuf,
45    sync::{
46        Arc,
47        atomic::{AtomicUsize, Ordering},
48    },
49    time::{Duration, Instant},
50};
51use tokio::sync::watch;
52use tokio::{
53    sync::mpsc::Receiver,
54    task::{JoinSet, spawn},
55};
56
57/// Interval to trigger replication of all records to all peers.
58/// This is the max time it should take. Minimum interval at any node will be half this
59pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
60
61/// Interval to trigger storage challenge.
62/// This is the max time it should take. Minimum interval at any node will be half this
63const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
64
65/// Interval to update the nodes uptime metric
66const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
67
68/// Interval to clean up unrelevant records
69/// This is the max time it should take. Minimum interval at any node will be half this
70const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
71
72/// Highest score to achieve from each metric sub-sector during StorageChallenge.
73const HIGHEST_SCORE: usize = 100;
74
75/// Any nodes bearing a score below this shall be considered as bad.
76/// Max is to be 100 * 100
77const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
78
79/// in ms, expecting average StorageChallenge complete time to be around 250ms.
80const TIME_STEP: usize = 20;
81
82/// Helper to build and run a Node
83pub struct NodeBuilder {
84    addr: SocketAddr,
85    bootstrap: Bootstrap,
86    evm_address: RewardsAddress,
87    evm_network: EvmNetwork,
88    identity_keypair: Keypair,
89    local: bool,
90    #[cfg(feature = "open-metrics")]
91    /// Set to Some to enable the metrics server
92    metrics_server_port: Option<u16>,
93    no_upnp: bool,
94    relay_client: bool,
95    root_dir: PathBuf,
96}
97
98impl NodeBuilder {
99    /// Instantiate the builder. The initial peers can either be supplied via the `initial_peers` method
100    /// or fetched from the bootstrap cache set using `bootstrap_cache` method.
101    pub fn new(
102        identity_keypair: Keypair,
103        bootstrap_flow: Bootstrap,
104        evm_address: RewardsAddress,
105        evm_network: EvmNetwork,
106        addr: SocketAddr,
107        root_dir: PathBuf,
108    ) -> Self {
109        Self {
110            addr,
111            bootstrap: bootstrap_flow,
112            evm_address,
113            evm_network,
114            identity_keypair,
115            local: false,
116            #[cfg(feature = "open-metrics")]
117            metrics_server_port: None,
118            no_upnp: false,
119            relay_client: false,
120            root_dir,
121        }
122    }
123
124    /// Set the flag to indicate if the node is running in local mode
125    pub fn local(&mut self, local: bool) {
126        self.local = local;
127    }
128
129    #[cfg(feature = "open-metrics")]
130    /// Set the port for the OpenMetrics server. Defaults to a random port if not set
131    pub fn metrics_server_port(&mut self, port: Option<u16>) {
132        self.metrics_server_port = port;
133    }
134
135    /// Set the flag to make the node act as a relay client
136    pub fn relay_client(&mut self, relay_client: bool) {
137        self.relay_client = relay_client;
138    }
139
140    /// Set the flag to disable UPnP for the node
141    pub fn no_upnp(&mut self, no_upnp: bool) {
142        self.no_upnp = no_upnp;
143    }
144
145    /// Asynchronously runs a new node instance, setting up the swarm driver,
146    /// creating a data storage, and handling network events. Returns the
147    /// created `RunningNode` which contains a `NodeEventsChannel` for listening
148    /// to node-related events.
149    ///
150    /// # Returns
151    ///
152    /// A `RunningNode` instance.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if there is a problem initializing the Network.
157    pub fn build_and_run(self) -> Result<RunningNode> {
158        // setup metrics
159        #[cfg(feature = "open-metrics")]
160        let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
161            // metadata registry
162            let mut metrics_registries = MetricsRegistries::default();
163            let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
164
165            (Some(metrics_recorder), metrics_registries)
166        } else {
167            (None, MetricsRegistries::default())
168        };
169
170        // create a shutdown signal channel
171        let (shutdown_tx, shutdown_rx) = watch::channel(false);
172
173        // init network
174        let network_config = NetworkConfig {
175            keypair: self.identity_keypair,
176            local: self.local,
177            listen_addr: self.addr,
178            root_dir: self.root_dir.clone(),
179            shutdown_rx: shutdown_rx.clone(),
180            bootstrap: self.bootstrap,
181            no_upnp: self.no_upnp,
182            relay_client: self.relay_client,
183            custom_request_timeout: None,
184            #[cfg(feature = "open-metrics")]
185            metrics_registries,
186            #[cfg(feature = "open-metrics")]
187            metrics_server_port: self.metrics_server_port,
188        };
189        let (network, network_event_receiver) = Network::init(network_config)?;
190
191        // init node
192        let node_events_channel = NodeEventsChannel::default();
193        let node = NodeInner {
194            network: network.clone(),
195            events_channel: node_events_channel.clone(),
196            reward_address: self.evm_address,
197            #[cfg(feature = "open-metrics")]
198            metrics_recorder,
199            evm_network: self.evm_network,
200        };
201        let node = Node {
202            inner: Arc::new(node),
203        };
204
205        // Run the node
206        node.run(network_event_receiver, shutdown_rx);
207        let running_node = RunningNode {
208            shutdown_sender: shutdown_tx,
209            network,
210            node_events_channel,
211            root_dir_path: self.root_dir,
212            rewards_address: self.evm_address,
213        };
214
215        Ok(running_node)
216    }
217}
218
219/// `Node` represents a single node in the distributed network. It handles
220/// network events, processes incoming requests, interacts with the data
221/// storage, and broadcasts node-related events.
222#[derive(Clone)]
223pub(crate) struct Node {
224    inner: Arc<NodeInner>,
225}
226
227/// The actual implementation of the Node. The other is just a wrapper around this, so that we don't expose
228/// the Arc from the interface.
229struct NodeInner {
230    events_channel: NodeEventsChannel,
231    network: Network,
232    #[cfg(feature = "open-metrics")]
233    metrics_recorder: Option<NodeMetricsRecorder>,
234    reward_address: RewardsAddress,
235    evm_network: EvmNetwork,
236}
237
238impl Node {
239    /// Returns the NodeEventsChannel
240    pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
241        &self.inner.events_channel
242    }
243
244    /// Returns the instance of Network
245    pub(crate) fn network(&self) -> &Network {
246        &self.inner.network
247    }
248
249    #[cfg(feature = "open-metrics")]
250    /// Returns a reference to the NodeMetricsRecorder if the `open-metrics` feature flag is enabled
251    /// This is used to record various metrics for the node.
252    pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
253        self.inner.metrics_recorder.as_ref()
254    }
255
256    /// Returns the reward address of the node
257    pub(crate) fn reward_address(&self) -> &RewardsAddress {
258        &self.inner.reward_address
259    }
260
261    pub(crate) fn evm_network(&self) -> &EvmNetwork {
262        &self.inner.evm_network
263    }
264
265    /// Spawns a task to process for `NetworkEvents`.
266    /// Returns both tasks as JoinHandle<()>.
267    fn run(
268        self,
269        mut network_event_receiver: Receiver<NetworkEvent>,
270        mut shutdown_rx: watch::Receiver<bool>,
271    ) {
272        let mut rng = StdRng::from_entropy();
273
274        let peers_connected = Arc::new(AtomicUsize::new(0));
275
276        let _node_task = spawn(async move {
277            // use a random activity timeout to ensure that the nodes do not sync when messages
278            // are being transmitted.
279            let replication_interval: u64 = rng.gen_range(
280                PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
281            );
282            let replication_interval_time = Duration::from_secs(replication_interval);
283            debug!("Replication interval set to {replication_interval_time:?}");
284
285            let mut replication_interval = tokio::time::interval(replication_interval_time);
286            let _ = replication_interval.tick().await; // first tick completes immediately
287
288            let mut uptime_metrics_update_interval =
289                tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
290            let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately
291
292            // use a random activity timeout to ensure that the nodes do not sync on work,
293            // causing an overall CPU spike.
294            let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
295                UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
296                    ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
297            );
298            let irrelevant_records_cleanup_interval_time =
299                Duration::from_secs(irrelevant_records_cleanup_interval);
300            let mut irrelevant_records_cleanup_interval =
301                tokio::time::interval(irrelevant_records_cleanup_interval_time);
302            let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately
303
304            // use a random neighbour storage challenge ticker to ensure
305            // neighbours do not carryout challenges at the same time
306            let storage_challenge_interval: u64 =
307                rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
308            let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
309            debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
310
311            let mut storage_challenge_interval =
312                tokio::time::interval(storage_challenge_interval_time);
313            let _ = storage_challenge_interval.tick().await; // first tick completes immediately
314
315            loop {
316                let peers_connected = &peers_connected;
317
318                tokio::select! {
319                    // Check for a shutdown command.
320                    result = shutdown_rx.changed() => {
321                        if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
322                            info!("Shutdown signal received or sender dropped. Exiting network events loop.");
323                            break;
324                        }
325                    },
326                    net_event = network_event_receiver.recv() => {
327                        match net_event {
328                            Some(event) => {
329                                let start = Instant::now();
330                                let event_string = format!("{event:?}");
331
332                                self.handle_network_event(event, peers_connected);
333                                trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
334
335                            }
336                            None => {
337                                error!("The `NetworkEvent` channel is closed");
338                                self.events_channel().broadcast(NodeEvent::ChannelClosed);
339                                break;
340                            }
341                        }
342                    }
343                    // runs every replication_interval time
344                    _ = replication_interval.tick() => {
345                        let start = Instant::now();
346                        let network = self.network().clone();
347                        self.record_metrics(Marker::IntervalReplicationTriggered);
348
349                        let _handle = spawn(async move {
350                            Self::try_interval_replication(network);
351                            trace!("Periodic replication took {:?}", start.elapsed());
352                        });
353                    }
354                    _ = uptime_metrics_update_interval.tick() => {
355                        #[cfg(feature = "open-metrics")]
356                        if let Some(metrics_recorder) = self.metrics_recorder() {
357                            let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
358                        }
359                    }
360                    _ = irrelevant_records_cleanup_interval.tick() => {
361                        let network = self.network().clone();
362
363                        let _handle = spawn(async move {
364                            Self::trigger_irrelevant_record_cleanup(network);
365                        });
366                    }
367                    // runs every storage_challenge_interval time
368                    _ = storage_challenge_interval.tick() => {
369                        let start = Instant::now();
370                        debug!("Periodic storage challenge triggered");
371                        let network = self.network().clone();
372
373                        let _handle = spawn(async move {
374                            Self::storage_challenge(network).await;
375                            trace!("Periodic storage challenge took {:?}", start.elapsed());
376                        });
377                    }
378                }
379            }
380        });
381    }
382
383    /// Calls Marker::log() to insert the marker into the log files.
384    /// Also calls NodeMetrics::record() to record the metric if the `open-metrics` feature flag is enabled.
385    pub(crate) fn record_metrics(&self, marker: Marker) {
386        marker.log();
387        #[cfg(feature = "open-metrics")]
388        if let Some(metrics_recorder) = self.metrics_recorder() {
389            metrics_recorder.record(marker)
390        }
391    }
392
393    // **** Private helpers *****
394
395    /// Handle a network event.
396    /// Spawns a thread for any likely long running tasks
397    fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
398        let start = Instant::now();
399        let event_string = format!("{event:?}");
400        let event_header;
401
402        // Reducing non-mandatory logging
403        if let NetworkEvent::QueryRequestReceived {
404            query: Query::GetVersion { .. },
405            ..
406        } = event
407        {
408            trace!("Handling NetworkEvent {event_string}");
409        } else {
410            debug!("Handling NetworkEvent {event_string}");
411        }
412
413        match event {
414            NetworkEvent::PeerAdded(peer_id, connected_peers) => {
415                event_header = "PeerAdded";
416                // increment peers_connected and send ConnectedToNetwork event if have connected to K_VALUE peers
417                let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
418                if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
419                    self.events_channel()
420                        .broadcast(NodeEvent::ConnectedToNetwork);
421                }
422
423                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
424                self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
425
426                // try query peer version
427                let network = self.network().clone();
428                let _handle = spawn(async move {
429                    Self::try_query_peer_version(network, peer_id, Default::default()).await;
430                });
431
432                // try replication here
433                let network = self.network().clone();
434                self.record_metrics(Marker::IntervalReplicationTriggered);
435                let _handle = spawn(async move {
436                    Self::try_interval_replication(network);
437                });
438            }
439            NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
440                event_header = "PeerRemoved";
441                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
442                self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
443
444                let self_id = self.network().peer_id();
445                let distance =
446                    NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
447                info!(
448                    "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
449                    distance.ilog2()
450                );
451
452                let network = self.network().clone();
453                self.record_metrics(Marker::IntervalReplicationTriggered);
454                let _handle = spawn(async move {
455                    Self::try_interval_replication(network);
456                });
457            }
458            NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
459                event_header = "PeerWithUnsupportedProtocol";
460            }
461            NetworkEvent::NewListenAddr(_) => {
462                event_header = "NewListenAddr";
463            }
464            NetworkEvent::ResponseReceived { res } => {
465                event_header = "ResponseReceived";
466                if let Err(err) = self.handle_response(res) {
467                    error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
468                }
469            }
470            NetworkEvent::KeysToFetchForReplication(keys) => {
471                event_header = "KeysToFetchForReplication";
472                self.record_metrics(Marker::fetching_keys_for_replication(&keys));
473
474                if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
475                    error!("Error while trying to fetch replicated data {err:?}");
476                }
477            }
478            NetworkEvent::QueryRequestReceived { query, channel } => {
479                event_header = "QueryRequestReceived";
480                let node = self.clone();
481                let payment_address = *self.reward_address();
482
483                let _handle = spawn(async move {
484                    let network = node.network().clone();
485                    let res = Self::handle_query(node, query, payment_address).await;
486
487                    // Reducing non-mandatory logging
488                    if let Response::Query(QueryResponse::GetVersion { .. }) = res {
489                        trace!("Sending response {res:?}");
490                    } else {
491                        debug!("Sending response {res:?}");
492                    }
493
494                    network.send_response(res, channel);
495                });
496            }
497            NetworkEvent::UnverifiedRecord(record) => {
498                event_header = "UnverifiedRecord";
499                // queries can be long running and require validation, so we spawn a task to handle them
500                let self_clone = self.clone();
501                let _handle = spawn(async move {
502                    let key = PrettyPrintRecordKey::from(&record.key).into_owned();
503                    match self_clone.validate_and_store_record(record).await {
504                        Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
505                        Err(err) => {
506                            self_clone.record_metrics(Marker::RecordRejected(&key, &err));
507                        }
508                    }
509                });
510            }
511            NetworkEvent::TerminateNode { reason } => {
512                event_header = "TerminateNode";
513                error!("Received termination from swarm_driver due to {reason:?}");
514                self.events_channel()
515                    .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
516            }
517            NetworkEvent::FailedToFetchHolders(bad_nodes) => {
518                event_header = "FailedToFetchHolders";
519                let network = self.network().clone();
520                let pretty_log: Vec<_> = bad_nodes
521                    .iter()
522                    .map(|(peer_id, record_key)| {
523                        let pretty_key = PrettyPrintRecordKey::from(record_key);
524                        (peer_id, pretty_key)
525                    })
526                    .collect();
527                // Note: this log will be checked in CI, and expecting `not appear`.
528                //       any change to the keyword `failed to fetch` shall incur
529                //       correspondent CI script change as well.
530                debug!(
531                    "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
532                );
533                let _handle = spawn(async move {
534                    for (peer_id, record_key) in bad_nodes {
535                        // Obsoleted fetch request (due to flooded in fresh replicates) could result
536                        // in peer to be claimed as bad, as local copy blocks the entry to be cleared.
537                        if let Ok(false) = network.is_record_key_present_locally(&record_key).await
538                        {
539                            error!(
540                                "From peer {peer_id:?}, failed to fetch record {:?}",
541                                PrettyPrintRecordKey::from(&record_key)
542                            );
543                            network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
544                        }
545                    }
546                });
547            }
548            NetworkEvent::QuoteVerification { quotes } => {
549                event_header = "QuoteVerification";
550                let network = self.network().clone();
551
552                let _handle = spawn(async move {
553                    quotes_verification(&network, quotes).await;
554                });
555            }
556            NetworkEvent::FreshReplicateToFetch { holder, keys } => {
557                event_header = "FreshReplicateToFetch";
558                self.fresh_replicate_to_fetch(holder, keys);
559            }
560            NetworkEvent::PeersForVersionQuery(peers) => {
561                event_header = "PeersForVersionQuery";
562                let network = self.network().clone();
563                let _handle = spawn(async move {
564                    Self::query_peers_version(network, peers).await;
565                });
566            }
567            NetworkEvent::NetworkWideReplication { keys } => {
568                event_header = "NetworkWideReplication";
569                self.perform_network_wide_replication(keys);
570            }
571        }
572
573        trace!(
574            "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
575            start.elapsed()
576        );
577    }
578
579    // Handle the response that was not awaited at the call site
580    fn handle_response(&self, response: Response) -> Result<()> {
581        match response {
582            Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
583                // This should actually have been short-circuted when received
584                warn!("Mishandled replicate response, should be handled earlier");
585            }
586            Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
587                error!(
588                    "Response to replication shall be handled by called not by common handler, {resp:?}"
589                );
590            }
591            Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
592                // No need to handle
593            }
594            other => {
595                warn!("handle_response not implemented for {other:?}");
596            }
597        };
598
599        Ok(())
600    }
601
602    async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
603        let network = node.network();
604        let resp: QueryResponse = match query {
605            Query::GetStoreQuote {
606                key,
607                data_type,
608                data_size,
609                nonce,
610                difficulty,
611            } => {
612                let record_key = key.to_record_key();
613                let self_id = network.peer_id();
614
615                let maybe_quoting_metrics = network
616                    .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
617                    .await;
618
619                let storage_proofs = if let Some(nonce) = nonce {
620                    Self::respond_x_closest_record_proof(
621                        network,
622                        key.clone(),
623                        nonce,
624                        difficulty,
625                        false,
626                    )
627                    .await
628                } else {
629                    vec![]
630                };
631
632                match maybe_quoting_metrics {
633                    Ok((quoting_metrics, is_already_stored)) => {
634                        if is_already_stored {
635                            QueryResponse::GetStoreQuote {
636                                quote: Err(ProtocolError::RecordExists(
637                                    PrettyPrintRecordKey::from(&record_key).into_owned(),
638                                )),
639                                peer_address: NetworkAddress::from(self_id),
640                                storage_proofs,
641                            }
642                        } else {
643                            QueryResponse::GetStoreQuote {
644                                quote: Self::create_quote_for_storecost(
645                                    network,
646                                    &key,
647                                    &quoting_metrics,
648                                    &payment_address,
649                                ),
650                                peer_address: NetworkAddress::from(self_id),
651                                storage_proofs,
652                            }
653                        }
654                    }
655                    Err(err) => {
656                        warn!("GetStoreQuote failed for {key:?}: {err}");
657                        QueryResponse::GetStoreQuote {
658                            quote: Err(ProtocolError::GetStoreQuoteFailed),
659                            peer_address: NetworkAddress::from(self_id),
660                            storage_proofs,
661                        }
662                    }
663                }
664            }
665            Query::GetReplicatedRecord { requester: _, key } => {
666                let our_address = NetworkAddress::from(network.peer_id());
667                let record_key = key.to_record_key();
668
669                let result = match network.get_local_record(&record_key).await {
670                    Ok(Some(record)) => Ok((our_address, Bytes::from(record.value))),
671                    Ok(None) => Err(ProtocolError::ReplicatedRecordNotFound {
672                        holder: Box::new(our_address),
673                        key: Box::new(key.clone()),
674                    }),
675                    // Use `PutRecordFailed` as place holder
676                    Err(err) => Err(ProtocolError::PutRecordFailed(format!(
677                        "Error to fetch local record for GetReplicatedRecord {err:?}"
678                    ))),
679                };
680
681                QueryResponse::GetReplicatedRecord(result)
682            }
683            Query::GetChunkExistenceProof {
684                key,
685                nonce,
686                difficulty,
687            } => QueryResponse::GetChunkExistenceProof(
688                Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
689            ),
690            Query::CheckNodeInProblem(target_address) => {
691                debug!("Got CheckNodeInProblem for peer {target_address:?}");
692
693                let is_in_trouble =
694                    if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
695                        result
696                    } else {
697                        debug!("Could not get status of {target_address:?}.");
698                        false
699                    };
700
701                QueryResponse::CheckNodeInProblem {
702                    reporter_address: NetworkAddress::from(network.peer_id()),
703                    target_address,
704                    is_in_trouble,
705                }
706            }
707            Query::GetClosestPeers {
708                key,
709                num_of_peers,
710                range,
711                sign_result,
712            } => {
713                debug!(
714                    "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
715                );
716                Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
717                    .await
718            }
719            Query::GetVersion(_) => QueryResponse::GetVersion {
720                peer: NetworkAddress::from(network.peer_id()),
721                version: ant_build_info::package_version(),
722            },
723            Query::PutRecord {
724                holder,
725                address,
726                serialized_record,
727            } => {
728                let record = Record {
729                    key: address.to_record_key(),
730                    value: serialized_record,
731                    publisher: None,
732                    expires: None,
733                };
734
735                let key = PrettyPrintRecordKey::from(&record.key).into_owned();
736                let result = match node.validate_and_store_record(record).await {
737                    Ok(()) => Ok(()),
738                    Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
739                        node.record_metrics(Marker::RecordRejected(
740                            &key,
741                            &PutValidationError::OutdatedRecordCounter { counter, expected },
742                        ));
743                        Err(ProtocolError::OutdatedRecordCounter { counter, expected })
744                    }
745                    Err(err) => {
746                        node.record_metrics(Marker::RecordRejected(&key, &err));
747                        Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
748                    }
749                };
750                QueryResponse::PutRecord {
751                    result,
752                    peer_address: holder,
753                    record_addr: address,
754                }
755            }
756        };
757        Response::Query(resp)
758    }
759
760    async fn respond_get_closest_peers(
761        network: &Network,
762        target: NetworkAddress,
763        num_of_peers: Option<usize>,
764        range: Option<[u8; 32]>,
765        sign_result: bool,
766    ) -> QueryResponse {
767        let local_peers = network.get_local_peers_with_multiaddr().await;
768        let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
769            Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
770        } else {
771            vec![]
772        };
773
774        let signature = if sign_result {
775            let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
776            bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
777            network.sign(&bytes).ok()
778        } else {
779            None
780        };
781
782        QueryResponse::GetClosestPeers {
783            target,
784            peers,
785            signature,
786        }
787    }
788
789    fn calculate_get_closest_peers(
790        peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
791        target: NetworkAddress,
792        num_of_peers: Option<usize>,
793        range: Option<[u8; 32]>,
794    ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
795        match (num_of_peers, range) {
796            (_, Some(value)) => {
797                let distance = U256::from_big_endian(&value);
798                peer_addrs
799                    .iter()
800                    .filter_map(|(peer_id, multi_addrs)| {
801                        let addr = NetworkAddress::from(*peer_id);
802                        if target.distance(&addr).0 <= distance {
803                            Some((addr, multi_addrs.clone()))
804                        } else {
805                            None
806                        }
807                    })
808                    .collect()
809            }
810            (Some(num_of_peers), _) => {
811                let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
812                    .iter()
813                    .map(|(peer_id, multi_addrs)| {
814                        let addr = NetworkAddress::from(*peer_id);
815                        (addr, multi_addrs.clone())
816                    })
817                    .collect();
818                result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
819                result.into_iter().take(num_of_peers).collect()
820            }
821            (None, None) => vec![],
822        }
823    }
824
825    // Nodes only check ChunkProof each other, to avoid `multi-version` issue
826    // Client check proof against all records, as have to fetch from network anyway.
827    async fn respond_x_closest_record_proof(
828        network: &Network,
829        key: NetworkAddress,
830        nonce: Nonce,
831        difficulty: usize,
832        chunk_only: bool,
833    ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
834        let start = Instant::now();
835        let mut results = vec![];
836        if difficulty == 1 {
837            // Client checking existence of published chunk.
838            let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
839            if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
840                let proof = ChunkProof::new(&record.value, nonce);
841                debug!("Chunk proof for {key:?} is {proof:?}");
842                result = Ok(proof)
843            } else {
844                debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
845            }
846
847            results.push((key.clone(), result));
848        } else {
849            let all_local_records = network.get_all_local_record_addresses().await;
850
851            if let Ok(all_local_records) = all_local_records {
852                let mut all_chunk_addrs: Vec<_> = if chunk_only {
853                    all_local_records
854                        .iter()
855                        .filter_map(|(addr, record_type)| {
856                            if *record_type == ValidationType::Chunk {
857                                Some(addr.clone())
858                            } else {
859                                None
860                            }
861                        })
862                        .collect()
863                } else {
864                    all_local_records.keys().cloned().collect()
865                };
866
867                // Sort by distance and only take first X closest entries
868                all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
869
870                // TODO: this shall be deduced from resource usage dynamically
871                let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
872
873                for addr in all_chunk_addrs.iter().take(workload_factor) {
874                    if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
875                    {
876                        let proof = ChunkProof::new(&record.value, nonce);
877                        debug!("Chunk proof for {key:?} is {proof:?}");
878                        results.push((addr.clone(), Ok(proof)));
879                    }
880                }
881            }
882
883            info!(
884                "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
885                results.len(),
886                start.elapsed()
887            );
888        }
889
890        results
891    }
892
893    /// Check among all chunk type records that we have,
894    /// and randomly pick one as the verification candidate.
895    /// This will challenge all closest peers at once.
896    async fn storage_challenge(network: Network) {
897        let start = Instant::now();
898        let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
899            network.get_k_closest_local_peers_to_the_target(None).await
900        {
901            closest_peers
902                .into_iter()
903                .take(CLOSE_GROUP_SIZE)
904                .collect_vec()
905        } else {
906            error!("Cannot get local neighbours");
907            return;
908        };
909        if closest_peers.len() < CLOSE_GROUP_SIZE {
910            debug!(
911                "Not enough neighbours ({}/{}) to carry out storage challenge.",
912                closest_peers.len(),
913                CLOSE_GROUP_SIZE
914            );
915            return;
916        }
917
918        let mut verify_candidates: Vec<NetworkAddress> =
919            if let Ok(all_keys) = network.get_all_local_record_addresses().await {
920                all_keys
921                    .iter()
922                    .filter_map(|(addr, record_type)| {
923                        if ValidationType::Chunk == *record_type {
924                            Some(addr.clone())
925                        } else {
926                            None
927                        }
928                    })
929                    .collect()
930            } else {
931                error!("Failed to get local record addresses.");
932                return;
933            };
934        let num_of_targets = verify_candidates.len();
935        if num_of_targets < 50 {
936            debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
937            return;
938        }
939
940        // To ensure the neighbours sharing same knowledge as to us,
941        // The target is choosen to be not far from us.
942        let self_addr = NetworkAddress::from(network.peer_id());
943        verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
944        let index: usize = OsRng.gen_range(0..num_of_targets / 2);
945        let target = verify_candidates[index].clone();
946        // TODO: workload shall be dynamically deduced from resource usage
947        let difficulty = CLOSE_GROUP_SIZE;
948        verify_candidates.sort_by_key(|addr| target.distance(addr));
949        let expected_targets = verify_candidates.into_iter().take(difficulty);
950        let nonce: Nonce = thread_rng().r#gen::<u64>();
951        let mut expected_proofs = HashMap::new();
952        for addr in expected_targets {
953            if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
954                let expected_proof = ChunkProof::new(&record.value, nonce);
955                let _ = expected_proofs.insert(addr, expected_proof);
956            } else {
957                error!("Local record {addr:?} cann't be loaded from disk.");
958            }
959        }
960        let request = Request::Query(Query::GetChunkExistenceProof {
961            key: target.clone(),
962            nonce,
963            difficulty,
964        });
965
966        let mut tasks = JoinSet::new();
967        for (peer_id, addresses) in closest_peers {
968            if peer_id == network.peer_id() {
969                continue;
970            }
971            let network_clone = network.clone();
972            let request_clone = request.clone();
973            let expected_proofs_clone = expected_proofs.clone();
974            let _ = tasks.spawn(async move {
975                let res = scoring_peer(
976                    network_clone,
977                    (peer_id, addresses),
978                    request_clone,
979                    expected_proofs_clone,
980                )
981                .await;
982                (peer_id, res)
983            });
984        }
985
986        let mut peer_scores = vec![];
987        while let Some(res) = tasks.join_next().await {
988            match res {
989                Ok((peer_id, score)) => {
990                    let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
991                    if !is_healthy {
992                        info!(
993                            "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
994                        );
995                        // TODO: shall the challenge failure immediately triggers the node to be removed?
996                        network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
997                    }
998                    peer_scores.push((peer_id, is_healthy));
999                }
1000                Err(e) => {
1001                    info!("StorageChallenge task completed with error {e:?}");
1002                }
1003            }
1004        }
1005        if !peer_scores.is_empty() {
1006            network.notify_peer_scores(peer_scores);
1007        }
1008
1009        info!(
1010            "Completed node StorageChallenge against neighbours in {:?}!",
1011            start.elapsed()
1012        );
1013    }
1014
1015    /// Query peers' versions and update local knowledge.
1016    async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1017        // To avoid choking, carry out the queries one by one
1018        for (peer_id, addrs) in peers {
1019            Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1020        }
1021    }
1022
1023    /// Query peer's version and update local knowledge.
1024    async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1025        let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1026        // We can skip passing `addrs` here as the new peer should be part of the kad::RT and swarm can get the addr.
1027        let version = match network.send_request(request, peer, addrs).await {
1028            Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1029                trace!("Fetched peer version {peer:?} as {version:?}");
1030                version
1031            }
1032            Ok(other) => {
1033                info!("Not a fetched peer version {peer:?}, {other:?}");
1034                "none".to_string()
1035            }
1036            Err(err) => {
1037                info!("Failed to fetch peer version {peer:?} with error {err:?}");
1038                // Failed version fetch (which contains dial then re-attempt by itself)
1039                // with error of `DialFailure` indicates the peer could be dead with high chance.
1040                // In that case, the peer shall be removed from the routing table.
1041                if let NetworkError::OutboundError(OutboundFailure::DialFailure) = err {
1042                    network.remove_peer(peer);
1043                    return;
1044                }
1045                "old".to_string()
1046            }
1047        };
1048        network.notify_node_version(peer, version);
1049    }
1050}
1051
1052async fn scoring_peer(
1053    network: Network,
1054    peer: (PeerId, Addresses),
1055    request: Request,
1056    expected_proofs: HashMap<NetworkAddress, ChunkProof>,
1057) -> usize {
1058    let peer_id = peer.0;
1059    let start = Instant::now();
1060    let responses = network
1061        .send_and_get_responses(&[peer], &request, true)
1062        .await;
1063
1064    if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
1065        responses.get(&peer_id)
1066    {
1067        if answers.is_empty() {
1068            info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
1069            return 0;
1070        }
1071        let elapsed = start.elapsed();
1072
1073        let mut received_proofs = vec![];
1074        for (addr, proof) in answers {
1075            if let Ok(proof) = proof {
1076                received_proofs.push((addr.clone(), proof.clone()));
1077            }
1078        }
1079
1080        let score = mark_peer(elapsed, received_proofs, &expected_proofs);
1081        info!(
1082            "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
1083            answers.len()
1084        );
1085        score
1086    } else {
1087        info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
1088        0
1089    }
1090}
1091
1092// Based on following metrics:
1093//   * the duration
1094//   * is there false answer
1095//   * percentage of correct answers among the expected closest
1096// The higher the score, the better confidence on the peer
1097fn mark_peer(
1098    duration: Duration,
1099    answers: Vec<(NetworkAddress, ChunkProof)>,
1100    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1101) -> usize {
1102    let duration_score = duration_score_scheme(duration);
1103    let challenge_score = challenge_score_scheme(answers, expected_proofs);
1104
1105    duration_score * challenge_score
1106}
1107
1108// Less duration shall get higher score
1109fn duration_score_scheme(duration: Duration) -> usize {
1110    // So far just a simple stepped scheme, capped by HIGHEST_SCORE
1111    let in_ms = if let Some(value) = duration.as_millis().to_usize() {
1112        value
1113    } else {
1114        info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
1115        1000
1116    };
1117
1118    let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
1119    HIGHEST_SCORE - step
1120}
1121
1122// Any false answer shall result in 0 score immediately
1123fn challenge_score_scheme(
1124    answers: Vec<(NetworkAddress, ChunkProof)>,
1125    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1126) -> usize {
1127    let mut correct_answers = 0;
1128    for (addr, chunk_proof) in answers {
1129        if let Some(expected_proof) = expected_proofs.get(&addr) {
1130            if expected_proof.verify(&chunk_proof) {
1131                correct_answers += 1;
1132            } else {
1133                info!("Spot a false answer to the challenge regarding {addr:?}");
1134                // Any false answer shall result in 0 score immediately
1135                return 0;
1136            }
1137        }
1138    }
1139    // TODO: For those answers not among the expected_proofs,
1140    //       it could be due to having different knowledge of records to us.
1141    //       shall we:
1142    //         * set the target being close to us, so that neighbours sharing same knowledge in higher chance
1143    //         * fetch from local to testify
1144    //         * fetch from network to testify
1145    std::cmp::min(
1146        HIGHEST_SCORE,
1147        HIGHEST_SCORE * correct_answers / expected_proofs.len(),
1148    )
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153    use super::*;
1154    use std::str::FromStr;
1155
1156    #[test]
1157    fn test_no_local_peers() {
1158        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
1159        let target = NetworkAddress::from(PeerId::random());
1160        let num_of_peers = Some(5);
1161        let range = None;
1162        let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
1163
1164        assert_eq!(result, vec![]);
1165    }
1166
1167    #[test]
1168    fn test_fewer_local_peers_than_num_of_peers() {
1169        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1170            (
1171                PeerId::random(),
1172                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1173            ),
1174            (
1175                PeerId::random(),
1176                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1177            ),
1178            (
1179                PeerId::random(),
1180                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1181            ),
1182        ];
1183        let target = NetworkAddress::from(PeerId::random());
1184        let num_of_peers = Some(2);
1185        let range = None;
1186        let result = Node::calculate_get_closest_peers(
1187            local_peers.clone(),
1188            target.clone(),
1189            num_of_peers,
1190            range,
1191        );
1192
1193        // Result shall be sorted and truncated
1194        let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1195            .iter()
1196            .map(|(peer_id, multi_addrs)| {
1197                let addr = NetworkAddress::from(*peer_id);
1198                (addr, multi_addrs.clone())
1199            })
1200            .collect();
1201        expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
1202        let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
1203
1204        assert_eq!(expected_result, result);
1205    }
1206
1207    #[test]
1208    fn test_with_range_and_num_of_peers() {
1209        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1210            (
1211                PeerId::random(),
1212                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1213            ),
1214            (
1215                PeerId::random(),
1216                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1217            ),
1218            (
1219                PeerId::random(),
1220                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1221            ),
1222        ];
1223        let target = NetworkAddress::from(PeerId::random());
1224        let num_of_peers = Some(0);
1225        let range_value = [128; 32];
1226        let range = Some(range_value);
1227        let result = Node::calculate_get_closest_peers(
1228            local_peers.clone(),
1229            target.clone(),
1230            num_of_peers,
1231            range,
1232        );
1233
1234        // Range shall be preferred, i.e. the result peers shall all within the range
1235        let distance = U256::from_big_endian(&range_value);
1236        let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1237            .into_iter()
1238            .filter_map(|(peer_id, multi_addrs)| {
1239                let addr = NetworkAddress::from(peer_id);
1240                if target.distance(&addr).0 <= distance {
1241                    Some((addr, multi_addrs.clone()))
1242                } else {
1243                    None
1244                }
1245            })
1246            .collect();
1247
1248        assert_eq!(expected_result, result);
1249    }
1250}