ant_node/
node.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use super::{
10    error::Result, event::NodeEventsChannel, quote::quotes_verification, Marker, NodeEvent,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14use crate::RunningNode;
15use ant_bootstrap::BootstrapCacheStore;
16use ant_evm::EvmNetwork;
17use ant_evm::RewardsAddress;
18use ant_networking::Addresses;
19#[cfg(feature = "open-metrics")]
20use ant_networking::MetricsRegistries;
21use ant_networking::{
22    Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, SwarmDriver,
23};
24use ant_protocol::{
25    error::Error as ProtocolError,
26    messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
27    storage::ValidationType,
28    NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
29};
30use bytes::Bytes;
31use itertools::Itertools;
32use libp2p::{identity::Keypair, kad::U256, request_response::OutboundFailure, Multiaddr, PeerId};
33use num_traits::cast::ToPrimitive;
34use rand::{
35    rngs::{OsRng, StdRng},
36    thread_rng, Rng, SeedableRng,
37};
38use std::{
39    collections::HashMap,
40    net::SocketAddr,
41    path::PathBuf,
42    sync::{
43        atomic::{AtomicUsize, Ordering},
44        Arc,
45    },
46    time::Duration,
47};
48use tokio::sync::watch;
49use tokio::{
50    sync::mpsc::Receiver,
51    task::{spawn, JoinSet},
52};
53
54/// Interval to trigger replication of all records to all peers.
55/// This is the max time it should take. Minimum interval at any node will be half this
56pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
57
58/// Interval to trigger storage challenge.
59/// This is the max time it should take. Minimum interval at any node will be half this
60const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
61
62/// Interval to update the nodes uptime metric
63const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
64
65/// Interval to clean up unrelevant records
66/// This is the max time it should take. Minimum interval at any node will be half this
67const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
68
69/// Highest score to achieve from each metric sub-sector during StorageChallenge.
70const HIGHEST_SCORE: usize = 100;
71
72/// Any nodes bearing a score below this shall be considered as bad.
73/// Max is to be 100 * 100
74const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
75
76/// in ms, expecting average StorageChallenge complete time to be around 250ms.
77const TIME_STEP: usize = 20;
78
79/// Helper to build and run a Node
80pub struct NodeBuilder {
81    addr: SocketAddr,
82    bootstrap_cache: Option<BootstrapCacheStore>,
83    evm_address: RewardsAddress,
84    evm_network: EvmNetwork,
85    initial_peers: Vec<Multiaddr>,
86    identity_keypair: Keypair,
87    local: bool,
88    #[cfg(feature = "open-metrics")]
89    /// Set to Some to enable the metrics server
90    metrics_server_port: Option<u16>,
91    no_upnp: bool,
92    relay_client: bool,
93    root_dir: PathBuf,
94}
95
96impl NodeBuilder {
97    /// Instantiate the builder. The initial peers can either be supplied via the `initial_peers` method
98    /// or fetched from the bootstrap cache set using `bootstrap_cache` method.
99    pub fn new(
100        identity_keypair: Keypair,
101        initial_peers: Vec<Multiaddr>,
102        evm_address: RewardsAddress,
103        evm_network: EvmNetwork,
104        addr: SocketAddr,
105        root_dir: PathBuf,
106    ) -> Self {
107        Self {
108            addr,
109            bootstrap_cache: None,
110            evm_address,
111            evm_network,
112            initial_peers,
113            identity_keypair,
114            local: false,
115            #[cfg(feature = "open-metrics")]
116            metrics_server_port: None,
117            no_upnp: false,
118            relay_client: false,
119            root_dir,
120        }
121    }
122
123    /// Set the flag to indicate if the node is running in local mode
124    pub fn local(&mut self, local: bool) {
125        self.local = local;
126    }
127
128    #[cfg(feature = "open-metrics")]
129    /// Set the port for the OpenMetrics server. Defaults to a random port if not set
130    pub fn metrics_server_port(&mut self, port: Option<u16>) {
131        self.metrics_server_port = port;
132    }
133
134    /// Set the initialized bootstrap cache.
135    pub fn bootstrap_cache(&mut self, cache: BootstrapCacheStore) {
136        self.bootstrap_cache = Some(cache);
137    }
138
139    /// Set the flag to make the node act as a relay client
140    pub fn relay_client(&mut self, relay_client: bool) {
141        self.relay_client = relay_client;
142    }
143
144    /// Set the flag to disable UPnP for the node
145    pub fn no_upnp(&mut self, no_upnp: bool) {
146        self.no_upnp = no_upnp;
147    }
148
149    /// Asynchronously runs a new node instance, setting up the swarm driver,
150    /// creating a data storage, and handling network events. Returns the
151    /// created `RunningNode` which contains a `NodeEventsChannel` for listening
152    /// to node-related events.
153    ///
154    /// # Returns
155    ///
156    /// A `RunningNode` instance.
157    ///
158    /// # Errors
159    ///
160    /// Returns an error if there is a problem initializing the `SwarmDriver`.
161    pub fn build_and_run(self) -> Result<RunningNode> {
162        let mut network_builder =
163            NetworkBuilder::new(self.identity_keypair, self.local, self.initial_peers);
164
165        #[cfg(feature = "open-metrics")]
166        let metrics_recorder = if self.metrics_server_port.is_some() {
167            // metadata registry
168            let mut metrics_registries = MetricsRegistries::default();
169            let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
170
171            network_builder.metrics_registries(metrics_registries);
172
173            Some(metrics_recorder)
174        } else {
175            None
176        };
177
178        network_builder.listen_addr(self.addr);
179        #[cfg(feature = "open-metrics")]
180        network_builder.metrics_server_port(self.metrics_server_port);
181        network_builder.relay_client(self.relay_client);
182        if let Some(cache) = self.bootstrap_cache {
183            network_builder.bootstrap_cache(cache);
184        }
185
186        network_builder.no_upnp(self.no_upnp);
187
188        let (network, network_event_receiver, swarm_driver) =
189            network_builder.build_node(self.root_dir.clone())?;
190
191        let node_events_channel = NodeEventsChannel::default();
192
193        let node = NodeInner {
194            network: network.clone(),
195            events_channel: node_events_channel.clone(),
196            reward_address: self.evm_address,
197            #[cfg(feature = "open-metrics")]
198            metrics_recorder,
199            evm_network: self.evm_network,
200        };
201
202        let node = Node {
203            inner: Arc::new(node),
204        };
205
206        // Create a shutdown signal channel
207        let (shutdown_tx, shutdown_rx) = watch::channel(false);
208
209        // Run the node
210        node.run(swarm_driver, network_event_receiver, shutdown_rx);
211
212        let running_node = RunningNode {
213            shutdown_sender: shutdown_tx,
214            network,
215            node_events_channel,
216            root_dir_path: self.root_dir,
217            rewards_address: self.evm_address,
218        };
219
220        Ok(running_node)
221    }
222}
223
224/// `Node` represents a single node in the distributed network. It handles
225/// network events, processes incoming requests, interacts with the data
226/// storage, and broadcasts node-related events.
227#[derive(Clone)]
228pub(crate) struct Node {
229    inner: Arc<NodeInner>,
230}
231
232/// The actual implementation of the Node. The other is just a wrapper around this, so that we don't expose
233/// the Arc from the interface.
234struct NodeInner {
235    events_channel: NodeEventsChannel,
236    network: Network,
237    #[cfg(feature = "open-metrics")]
238    metrics_recorder: Option<NodeMetricsRecorder>,
239    reward_address: RewardsAddress,
240    evm_network: EvmNetwork,
241}
242
243impl Node {
244    /// Returns the NodeEventsChannel
245    pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
246        &self.inner.events_channel
247    }
248
249    /// Returns the instance of Network
250    pub(crate) fn network(&self) -> &Network {
251        &self.inner.network
252    }
253
254    #[cfg(feature = "open-metrics")]
255    /// Returns a reference to the NodeMetricsRecorder if the `open-metrics` feature flag is enabled
256    /// This is used to record various metrics for the node.
257    pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
258        self.inner.metrics_recorder.as_ref()
259    }
260
261    /// Returns the reward address of the node
262    pub(crate) fn reward_address(&self) -> &RewardsAddress {
263        &self.inner.reward_address
264    }
265
266    pub(crate) fn evm_network(&self) -> &EvmNetwork {
267        &self.inner.evm_network
268    }
269
270    /// Runs a task for the provided `SwarmDriver` and spawns a task to process for `NetworkEvents`.
271    /// Returns both tasks as JoinHandle<()>.
272    fn run(
273        self,
274        swarm_driver: SwarmDriver,
275        mut network_event_receiver: Receiver<NetworkEvent>,
276        mut shutdown_rx: watch::Receiver<bool>,
277    ) {
278        let mut rng = StdRng::from_entropy();
279
280        let peers_connected = Arc::new(AtomicUsize::new(0));
281
282        let _swarm_driver_task = spawn(swarm_driver.run(shutdown_rx.clone()));
283        let _node_task = spawn(async move {
284            // use a random activity timeout to ensure that the nodes do not sync when messages
285            // are being transmitted.
286            let replication_interval: u64 = rng.gen_range(
287                PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
288            );
289            let replication_interval_time = Duration::from_secs(replication_interval);
290            debug!("Replication interval set to {replication_interval_time:?}");
291
292            let mut replication_interval = tokio::time::interval(replication_interval_time);
293            let _ = replication_interval.tick().await; // first tick completes immediately
294
295            let mut uptime_metrics_update_interval =
296                tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
297            let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately
298
299            // use a random activity timeout to ensure that the nodes do not sync on work,
300            // causing an overall CPU spike.
301            let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
302                UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
303                    ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
304            );
305            let irrelevant_records_cleanup_interval_time =
306                Duration::from_secs(irrelevant_records_cleanup_interval);
307            let mut irrelevant_records_cleanup_interval =
308                tokio::time::interval(irrelevant_records_cleanup_interval_time);
309            let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately
310
311            // use a random neighbour storage challenge ticker to ensure
312            // neighbours do not carryout challenges at the same time
313            let storage_challenge_interval: u64 =
314                rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
315            let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
316            debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
317
318            let mut storage_challenge_interval =
319                tokio::time::interval(storage_challenge_interval_time);
320            let _ = storage_challenge_interval.tick().await; // first tick completes immediately
321
322            loop {
323                let peers_connected = &peers_connected;
324
325                tokio::select! {
326                    // Check for a shutdown command.
327                    result = shutdown_rx.changed() => {
328                        if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
329                            info!("Shutdown signal received or sender dropped. Exiting network events loop.");
330                            break;
331                        }
332                    },
333                    net_event = network_event_receiver.recv() => {
334                        match net_event {
335                            Some(event) => {
336                                let start = Instant::now();
337                                let event_string = format!("{event:?}");
338
339                                self.handle_network_event(event, peers_connected);
340                                trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
341
342                            }
343                            None => {
344                                error!("The `NetworkEvent` channel is closed");
345                                self.events_channel().broadcast(NodeEvent::ChannelClosed);
346                                break;
347                            }
348                        }
349                    }
350                    // runs every replication_interval time
351                    _ = replication_interval.tick() => {
352                        let start = Instant::now();
353                        let network = self.network().clone();
354                        self.record_metrics(Marker::IntervalReplicationTriggered);
355
356                        let _handle = spawn(async move {
357                            Self::try_interval_replication(network);
358                            trace!("Periodic replication took {:?}", start.elapsed());
359                        });
360                    }
361                    _ = uptime_metrics_update_interval.tick() => {
362                        #[cfg(feature = "open-metrics")]
363                        if let Some(metrics_recorder) = self.metrics_recorder() {
364                            let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
365                        }
366                    }
367                    _ = irrelevant_records_cleanup_interval.tick() => {
368                        let network = self.network().clone();
369
370                        let _handle = spawn(async move {
371                            Self::trigger_irrelevant_record_cleanup(network);
372                        });
373                    }
374                    // runs every storage_challenge_interval time
375                    _ = storage_challenge_interval.tick() => {
376                        let start = Instant::now();
377                        debug!("Periodic storage challenge triggered");
378                        let network = self.network().clone();
379
380                        let _handle = spawn(async move {
381                            Self::storage_challenge(network).await;
382                            trace!("Periodic storage challenge took {:?}", start.elapsed());
383                        });
384                    }
385                }
386            }
387        });
388    }
389
390    /// Calls Marker::log() to insert the marker into the log files.
391    /// Also calls NodeMetrics::record() to record the metric if the `open-metrics` feature flag is enabled.
392    pub(crate) fn record_metrics(&self, marker: Marker) {
393        marker.log();
394        #[cfg(feature = "open-metrics")]
395        if let Some(metrics_recorder) = self.metrics_recorder() {
396            metrics_recorder.record(marker)
397        }
398    }
399
400    // **** Private helpers *****
401
402    /// Handle a network event.
403    /// Spawns a thread for any likely long running tasks
404    fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
405        let start = Instant::now();
406        let event_string = format!("{event:?}");
407        let event_header;
408
409        // Reducing non-mandatory logging
410        if let NetworkEvent::QueryRequestReceived {
411            query: Query::GetVersion { .. },
412            ..
413        } = event
414        {
415            trace!("Handling NetworkEvent {event_string}");
416        } else {
417            debug!("Handling NetworkEvent {event_string}");
418        }
419
420        match event {
421            NetworkEvent::PeerAdded(peer_id, connected_peers) => {
422                event_header = "PeerAdded";
423                // increment peers_connected and send ConnectedToNetwork event if have connected to K_VALUE peers
424                let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
425                if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
426                    self.events_channel()
427                        .broadcast(NodeEvent::ConnectedToNetwork);
428                }
429
430                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
431                self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
432
433                // try query peer version
434                let network = self.network().clone();
435                let _handle = spawn(async move {
436                    Self::try_query_peer_version(network, peer_id, Default::default()).await;
437                });
438
439                // try replication here
440                let network = self.network().clone();
441                self.record_metrics(Marker::IntervalReplicationTriggered);
442                let _handle = spawn(async move {
443                    Self::try_interval_replication(network);
444                });
445            }
446            NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
447                event_header = "PeerRemoved";
448                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
449                self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
450
451                let self_id = self.network().peer_id();
452                let distance =
453                    NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
454                info!("Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.", distance.ilog2());
455
456                let network = self.network().clone();
457                self.record_metrics(Marker::IntervalReplicationTriggered);
458                let _handle = spawn(async move {
459                    Self::try_interval_replication(network);
460                });
461            }
462            NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
463                event_header = "PeerWithUnsupportedProtocol";
464            }
465            NetworkEvent::NewListenAddr(_) => {
466                event_header = "NewListenAddr";
467            }
468            NetworkEvent::ResponseReceived { res } => {
469                event_header = "ResponseReceived";
470                if let Err(err) = self.handle_response(res) {
471                    error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
472                }
473            }
474            NetworkEvent::KeysToFetchForReplication(keys) => {
475                event_header = "KeysToFetchForReplication";
476                self.record_metrics(Marker::fetching_keys_for_replication(&keys));
477
478                if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
479                    error!("Error while trying to fetch replicated data {err:?}");
480                }
481            }
482            NetworkEvent::QueryRequestReceived { query, channel } => {
483                event_header = "QueryRequestReceived";
484                let network = self.network().clone();
485                let payment_address = *self.reward_address();
486
487                let _handle = spawn(async move {
488                    let res = Self::handle_query(&network, query, payment_address).await;
489
490                    // Reducing non-mandatory logging
491                    if let Response::Query(QueryResponse::GetVersion { .. }) = res {
492                        trace!("Sending response {res:?}");
493                    } else {
494                        debug!("Sending response {res:?}");
495                    }
496
497                    network.send_response(res, channel);
498                });
499            }
500            NetworkEvent::UnverifiedRecord(record) => {
501                event_header = "UnverifiedRecord";
502                // queries can be long running and require validation, so we spawn a task to handle them
503                let self_clone = self.clone();
504                let _handle = spawn(async move {
505                    let key = PrettyPrintRecordKey::from(&record.key).into_owned();
506                    match self_clone.validate_and_store_record(record).await {
507                        Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
508                        Err(err) => {
509                            self_clone.record_metrics(Marker::RecordRejected(&key, &err));
510                        }
511                    }
512                });
513            }
514            NetworkEvent::TerminateNode { reason } => {
515                event_header = "TerminateNode";
516                error!("Received termination from swarm_driver due to {reason:?}");
517                self.events_channel()
518                    .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
519            }
520            NetworkEvent::FailedToFetchHolders(bad_nodes) => {
521                event_header = "FailedToFetchHolders";
522                let network = self.network().clone();
523                let pretty_log: Vec<_> = bad_nodes
524                    .iter()
525                    .map(|(peer_id, record_key)| {
526                        let pretty_key = PrettyPrintRecordKey::from(record_key);
527                        (peer_id, pretty_key)
528                    })
529                    .collect();
530                // Note: this log will be checked in CI, and expecting `not appear`.
531                //       any change to the keyword `failed to fetch` shall incur
532                //       correspondent CI script change as well.
533                debug!("Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from.");
534                let _handle = spawn(async move {
535                    for (peer_id, record_key) in bad_nodes {
536                        // Obsoleted fetch request (due to flooded in fresh replicates) could result
537                        // in peer to be claimed as bad, as local copy blocks the entry to be cleared.
538                        if let Ok(false) = network.is_record_key_present_locally(&record_key).await
539                        {
540                            error!(
541                                "From peer {peer_id:?}, failed to fetch record {:?}",
542                                PrettyPrintRecordKey::from(&record_key)
543                            );
544                            network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
545                        }
546                    }
547                });
548            }
549            NetworkEvent::QuoteVerification { quotes } => {
550                event_header = "QuoteVerification";
551                let network = self.network().clone();
552
553                let _handle = spawn(async move {
554                    quotes_verification(&network, quotes).await;
555                });
556            }
557            NetworkEvent::FreshReplicateToFetch { holder, keys } => {
558                event_header = "FreshReplicateToFetch";
559                self.fresh_replicate_to_fetch(holder, keys);
560            }
561            NetworkEvent::PeersForVersionQuery(peers) => {
562                event_header = "PeersForVersionQuery";
563                let network = self.network().clone();
564                let _handle = spawn(async move {
565                    Self::query_peers_version(network, peers).await;
566                });
567            }
568        }
569
570        trace!(
571            "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
572            start.elapsed()
573        );
574    }
575
576    // Handle the response that was not awaited at the call site
577    fn handle_response(&self, response: Response) -> Result<()> {
578        match response {
579            Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
580                // This should actually have been short-circuted when received
581                warn!("Mishandled replicate response, should be handled earlier");
582            }
583            Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
584                error!("Response to replication shall be handled by called not by common handler, {resp:?}");
585            }
586            Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
587                // No need to handle
588            }
589            other => {
590                warn!("handle_response not implemented for {other:?}");
591            }
592        };
593
594        Ok(())
595    }
596
597    async fn handle_query(
598        network: &Network,
599        query: Query,
600        payment_address: RewardsAddress,
601    ) -> Response {
602        let resp: QueryResponse = match query {
603            Query::GetStoreQuote {
604                key,
605                data_type,
606                data_size,
607                nonce,
608                difficulty,
609            } => {
610                let record_key = key.to_record_key();
611                let self_id = network.peer_id();
612
613                let maybe_quoting_metrics = network
614                    .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
615                    .await;
616
617                let storage_proofs = if let Some(nonce) = nonce {
618                    Self::respond_x_closest_record_proof(
619                        network,
620                        key.clone(),
621                        nonce,
622                        difficulty,
623                        false,
624                    )
625                    .await
626                } else {
627                    vec![]
628                };
629
630                match maybe_quoting_metrics {
631                    Ok((quoting_metrics, is_already_stored)) => {
632                        if is_already_stored {
633                            QueryResponse::GetStoreQuote {
634                                quote: Err(ProtocolError::RecordExists(
635                                    PrettyPrintRecordKey::from(&record_key).into_owned(),
636                                )),
637                                peer_address: NetworkAddress::from(self_id),
638                                storage_proofs,
639                            }
640                        } else {
641                            QueryResponse::GetStoreQuote {
642                                quote: Self::create_quote_for_storecost(
643                                    network,
644                                    &key,
645                                    &quoting_metrics,
646                                    &payment_address,
647                                ),
648                                peer_address: NetworkAddress::from(self_id),
649                                storage_proofs,
650                            }
651                        }
652                    }
653                    Err(err) => {
654                        warn!("GetStoreQuote failed for {key:?}: {err}");
655                        QueryResponse::GetStoreQuote {
656                            quote: Err(ProtocolError::GetStoreQuoteFailed),
657                            peer_address: NetworkAddress::from(self_id),
658                            storage_proofs,
659                        }
660                    }
661                }
662            }
663            Query::GetReplicatedRecord { requester: _, key } => {
664                let our_address = NetworkAddress::from(network.peer_id());
665                let mut result = Err(ProtocolError::ReplicatedRecordNotFound {
666                    holder: Box::new(our_address.clone()),
667                    key: Box::new(key.clone()),
668                });
669                let record_key = key.as_record_key();
670
671                if let Some(record_key) = record_key {
672                    if let Ok(Some(record)) = network.get_local_record(&record_key).await {
673                        result = Ok((our_address, Bytes::from(record.value)));
674                    }
675                }
676
677                QueryResponse::GetReplicatedRecord(result)
678            }
679            Query::GetChunkExistenceProof {
680                key,
681                nonce,
682                difficulty,
683            } => QueryResponse::GetChunkExistenceProof(
684                Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
685            ),
686            Query::CheckNodeInProblem(target_address) => {
687                debug!("Got CheckNodeInProblem for peer {target_address:?}");
688
689                let is_in_trouble =
690                    if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
691                        result
692                    } else {
693                        debug!("Could not get status of {target_address:?}.");
694                        false
695                    };
696
697                QueryResponse::CheckNodeInProblem {
698                    reporter_address: NetworkAddress::from(network.peer_id()),
699                    target_address,
700                    is_in_trouble,
701                }
702            }
703            Query::GetClosestPeers {
704                key,
705                num_of_peers,
706                range,
707                sign_result,
708            } => {
709                debug!(
710                    "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
711                );
712                Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
713                    .await
714            }
715            Query::GetVersion(_) => QueryResponse::GetVersion {
716                peer: NetworkAddress::from(network.peer_id()),
717                version: ant_build_info::package_version(),
718            },
719        };
720        Response::Query(resp)
721    }
722
723    async fn respond_get_closest_peers(
724        network: &Network,
725        target: NetworkAddress,
726        num_of_peers: Option<usize>,
727        range: Option<[u8; 32]>,
728        sign_result: bool,
729    ) -> QueryResponse {
730        let local_peers = network.get_local_peers_with_multiaddr().await;
731        let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
732            Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
733        } else {
734            vec![]
735        };
736
737        let signature = if sign_result {
738            let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
739            bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
740            network.sign(&bytes).ok()
741        } else {
742            None
743        };
744
745        QueryResponse::GetClosestPeers {
746            target,
747            peers,
748            signature,
749        }
750    }
751
752    fn calculate_get_closest_peers(
753        peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
754        target: NetworkAddress,
755        num_of_peers: Option<usize>,
756        range: Option<[u8; 32]>,
757    ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
758        match (num_of_peers, range) {
759            (_, Some(value)) => {
760                let distance = U256::from_big_endian(&value);
761                peer_addrs
762                    .iter()
763                    .filter_map(|(peer_id, multi_addrs)| {
764                        let addr = NetworkAddress::from(*peer_id);
765                        if target.distance(&addr).0 <= distance {
766                            Some((addr, multi_addrs.clone()))
767                        } else {
768                            None
769                        }
770                    })
771                    .collect()
772            }
773            (Some(num_of_peers), _) => {
774                let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
775                    .iter()
776                    .map(|(peer_id, multi_addrs)| {
777                        let addr = NetworkAddress::from(*peer_id);
778                        (addr, multi_addrs.clone())
779                    })
780                    .collect();
781                result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
782                result.into_iter().take(num_of_peers).collect()
783            }
784            (None, None) => vec![],
785        }
786    }
787
788    // Nodes only check ChunkProof each other, to avoid `multi-version` issue
789    // Client check proof against all records, as have to fetch from network anyway.
790    async fn respond_x_closest_record_proof(
791        network: &Network,
792        key: NetworkAddress,
793        nonce: Nonce,
794        difficulty: usize,
795        chunk_only: bool,
796    ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
797        let start = Instant::now();
798        let mut results = vec![];
799        if difficulty == 1 {
800            // Client checking existence of published chunk.
801            let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
802            if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
803                let proof = ChunkProof::new(&record.value, nonce);
804                debug!("Chunk proof for {key:?} is {proof:?}");
805                result = Ok(proof)
806            } else {
807                debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
808            }
809
810            results.push((key.clone(), result));
811        } else {
812            let all_local_records = network.get_all_local_record_addresses().await;
813
814            if let Ok(all_local_records) = all_local_records {
815                let mut all_chunk_addrs: Vec<_> = if chunk_only {
816                    all_local_records
817                        .iter()
818                        .filter_map(|(addr, record_type)| {
819                            if *record_type == ValidationType::Chunk {
820                                Some(addr.clone())
821                            } else {
822                                None
823                            }
824                        })
825                        .collect()
826                } else {
827                    all_local_records.keys().cloned().collect()
828                };
829
830                // Sort by distance and only take first X closest entries
831                all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
832
833                // TODO: this shall be deduced from resource usage dynamically
834                let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
835
836                for addr in all_chunk_addrs.iter().take(workload_factor) {
837                    if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
838                    {
839                        let proof = ChunkProof::new(&record.value, nonce);
840                        debug!("Chunk proof for {key:?} is {proof:?}");
841                        results.push((addr.clone(), Ok(proof)));
842                    }
843                }
844            }
845
846            info!(
847                "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
848                results.len(), start.elapsed()
849            );
850        }
851
852        results
853    }
854
855    /// Check among all chunk type records that we have,
856    /// and randomly pick one as the verification candidate.
857    /// This will challenge all closest peers at once.
858    async fn storage_challenge(network: Network) {
859        let start = Instant::now();
860        let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
861            network.get_k_closest_local_peers_to_the_target(None).await
862        {
863            closest_peers
864                .into_iter()
865                .take(CLOSE_GROUP_SIZE)
866                .collect_vec()
867        } else {
868            error!("Cannot get local neighbours");
869            return;
870        };
871        if closest_peers.len() < CLOSE_GROUP_SIZE {
872            debug!(
873                "Not enough neighbours ({}/{}) to carry out storage challenge.",
874                closest_peers.len(),
875                CLOSE_GROUP_SIZE
876            );
877            return;
878        }
879
880        let mut verify_candidates: Vec<NetworkAddress> =
881            if let Ok(all_keys) = network.get_all_local_record_addresses().await {
882                all_keys
883                    .iter()
884                    .filter_map(|(addr, record_type)| {
885                        if ValidationType::Chunk == *record_type {
886                            Some(addr.clone())
887                        } else {
888                            None
889                        }
890                    })
891                    .collect()
892            } else {
893                error!("Failed to get local record addresses.");
894                return;
895            };
896        let num_of_targets = verify_candidates.len();
897        if num_of_targets < 50 {
898            debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
899            return;
900        }
901
902        // To ensure the neighbours sharing same knowledge as to us,
903        // The target is choosen to be not far from us.
904        let self_addr = NetworkAddress::from(network.peer_id());
905        verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
906        let index: usize = OsRng.gen_range(0..num_of_targets / 2);
907        let target = verify_candidates[index].clone();
908        // TODO: workload shall be dynamically deduced from resource usage
909        let difficulty = CLOSE_GROUP_SIZE;
910        verify_candidates.sort_by_key(|addr| target.distance(addr));
911        let expected_targets = verify_candidates.into_iter().take(difficulty);
912        let nonce: Nonce = thread_rng().gen::<u64>();
913        let mut expected_proofs = HashMap::new();
914        for addr in expected_targets {
915            if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
916                let expected_proof = ChunkProof::new(&record.value, nonce);
917                let _ = expected_proofs.insert(addr, expected_proof);
918            } else {
919                error!("Local record {addr:?} cann't be loaded from disk.");
920            }
921        }
922        let request = Request::Query(Query::GetChunkExistenceProof {
923            key: target.clone(),
924            nonce,
925            difficulty,
926        });
927
928        let mut tasks = JoinSet::new();
929        for (peer_id, addresses) in closest_peers {
930            if peer_id == network.peer_id() {
931                continue;
932            }
933            let network_clone = network.clone();
934            let request_clone = request.clone();
935            let expected_proofs_clone = expected_proofs.clone();
936            let _ = tasks.spawn(async move {
937                let res = scoring_peer(
938                    network_clone,
939                    (peer_id, addresses),
940                    request_clone,
941                    expected_proofs_clone,
942                )
943                .await;
944                (peer_id, res)
945            });
946        }
947
948        let mut peer_scores = vec![];
949        while let Some(res) = tasks.join_next().await {
950            match res {
951                Ok((peer_id, score)) => {
952                    let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
953                    if !is_healthy {
954                        info!("Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}.");
955                        // TODO: shall the challenge failure immediately triggers the node to be removed?
956                        network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
957                    }
958                    peer_scores.push((peer_id, is_healthy));
959                }
960                Err(e) => {
961                    info!("StorageChallenge task completed with error {e:?}");
962                }
963            }
964        }
965        if !peer_scores.is_empty() {
966            network.notify_peer_scores(peer_scores);
967        }
968
969        info!(
970            "Completed node StorageChallenge against neighbours in {:?}!",
971            start.elapsed()
972        );
973    }
974
975    /// Query peers' versions and update local knowledge.
976    async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
977        // To avoid choking, carry out the queries one by one
978        for (peer_id, addrs) in peers {
979            Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
980        }
981    }
982
983    /// Query peer's version and update local knowledge.
984    async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
985        let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
986        // We can skip passing `addrs` here as the new peer should be part of the kad::RT and swarm can get the addr.
987        let version = match network.send_request(request, peer, addrs).await {
988            Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
989                trace!("Fetched peer version {peer:?} as {version:?}");
990                version
991            }
992            Ok(other) => {
993                info!("Not a fetched peer version {peer:?}, {other:?}");
994                "none".to_string()
995            }
996            Err(err) => {
997                info!("Failed to fetch peer version {peer:?} with error {err:?}");
998                // Failed version fetch (which contains dial then re-attempt by itself)
999                // with error of `DialFailure` indicates the peer could be dead with high chance.
1000                // In that case, the peer shall be removed from the routing table.
1001                if let NetworkError::OutboundError(OutboundFailure::DialFailure) = err {
1002                    network.remove_peer(peer);
1003                    return;
1004                }
1005                "old".to_string()
1006            }
1007        };
1008        network.notify_node_version(peer, version);
1009    }
1010}
1011
1012async fn scoring_peer(
1013    network: Network,
1014    peer: (PeerId, Addresses),
1015    request: Request,
1016    expected_proofs: HashMap<NetworkAddress, ChunkProof>,
1017) -> usize {
1018    let peer_id = peer.0;
1019    let start = Instant::now();
1020    let responses = network
1021        .send_and_get_responses(&[peer], &request, true)
1022        .await;
1023
1024    if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
1025        responses.get(&peer_id)
1026    {
1027        if answers.is_empty() {
1028            info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
1029            return 0;
1030        }
1031        let elapsed = start.elapsed();
1032
1033        let mut received_proofs = vec![];
1034        for (addr, proof) in answers {
1035            if let Ok(proof) = proof {
1036                received_proofs.push((addr.clone(), proof.clone()));
1037            }
1038        }
1039
1040        let score = mark_peer(elapsed, received_proofs, &expected_proofs);
1041        info!(
1042            "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
1043            answers.len()
1044        );
1045        score
1046    } else {
1047        info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
1048        0
1049    }
1050}
1051
1052// Based on following metrics:
1053//   * the duration
1054//   * is there false answer
1055//   * percentage of correct answers among the expected closest
1056// The higher the score, the better confidence on the peer
1057fn mark_peer(
1058    duration: Duration,
1059    answers: Vec<(NetworkAddress, ChunkProof)>,
1060    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1061) -> usize {
1062    let duration_score = duration_score_scheme(duration);
1063    let challenge_score = challenge_score_scheme(answers, expected_proofs);
1064
1065    duration_score * challenge_score
1066}
1067
1068// Less duration shall get higher score
1069fn duration_score_scheme(duration: Duration) -> usize {
1070    // So far just a simple stepped scheme, capped by HIGHEST_SCORE
1071    let in_ms = if let Some(value) = duration.as_millis().to_usize() {
1072        value
1073    } else {
1074        info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
1075        1000
1076    };
1077
1078    let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
1079    HIGHEST_SCORE - step
1080}
1081
1082// Any false answer shall result in 0 score immediately
1083fn challenge_score_scheme(
1084    answers: Vec<(NetworkAddress, ChunkProof)>,
1085    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1086) -> usize {
1087    let mut correct_answers = 0;
1088    for (addr, chunk_proof) in answers {
1089        if let Some(expected_proof) = expected_proofs.get(&addr) {
1090            if expected_proof.verify(&chunk_proof) {
1091                correct_answers += 1;
1092            } else {
1093                info!("Spot a false answer to the challenge regarding {addr:?}");
1094                // Any false answer shall result in 0 score immediately
1095                return 0;
1096            }
1097        }
1098    }
1099    // TODO: For those answers not among the expected_proofs,
1100    //       it could be due to having different knowledge of records to us.
1101    //       shall we:
1102    //         * set the target being close to us, so that neighbours sharing same knowledge in higher chance
1103    //         * fetch from local to testify
1104    //         * fetch from network to testify
1105    std::cmp::min(
1106        HIGHEST_SCORE,
1107        HIGHEST_SCORE * correct_answers / expected_proofs.len(),
1108    )
1109}
1110
1111#[cfg(test)]
1112mod tests {
1113    use super::*;
1114    use std::str::FromStr;
1115
1116    #[test]
1117    fn test_no_local_peers() {
1118        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
1119        let target = NetworkAddress::from(PeerId::random());
1120        let num_of_peers = Some(5);
1121        let range = None;
1122        let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
1123
1124        assert_eq!(result, vec![]);
1125    }
1126
1127    #[test]
1128    fn test_fewer_local_peers_than_num_of_peers() {
1129        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1130            (
1131                PeerId::random(),
1132                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1133            ),
1134            (
1135                PeerId::random(),
1136                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1137            ),
1138            (
1139                PeerId::random(),
1140                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1141            ),
1142        ];
1143        let target = NetworkAddress::from(PeerId::random());
1144        let num_of_peers = Some(2);
1145        let range = None;
1146        let result = Node::calculate_get_closest_peers(
1147            local_peers.clone(),
1148            target.clone(),
1149            num_of_peers,
1150            range,
1151        );
1152
1153        // Result shall be sorted and truncated
1154        let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1155            .iter()
1156            .map(|(peer_id, multi_addrs)| {
1157                let addr = NetworkAddress::from(*peer_id);
1158                (addr, multi_addrs.clone())
1159            })
1160            .collect();
1161        expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
1162        let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
1163
1164        assert_eq!(expected_result, result);
1165    }
1166
1167    #[test]
1168    fn test_with_range_and_num_of_peers() {
1169        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1170            (
1171                PeerId::random(),
1172                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1173            ),
1174            (
1175                PeerId::random(),
1176                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1177            ),
1178            (
1179                PeerId::random(),
1180                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1181            ),
1182        ];
1183        let target = NetworkAddress::from(PeerId::random());
1184        let num_of_peers = Some(0);
1185        let range_value = [128; 32];
1186        let range = Some(range_value);
1187        let result = Node::calculate_get_closest_peers(
1188            local_peers.clone(),
1189            target.clone(),
1190            num_of_peers,
1191            range,
1192        );
1193
1194        // Range shall be preferred, i.e. the result peers shall all within the range
1195        let distance = U256::from_big_endian(&range_value);
1196        let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1197            .into_iter()
1198            .filter_map(|(peer_id, multi_addrs)| {
1199                let addr = NetworkAddress::from(peer_id);
1200                if target.distance(&addr).0 <= distance {
1201                    Some((addr, multi_addrs.clone()))
1202                } else {
1203                    None
1204                }
1205            })
1206            .collect();
1207
1208        assert_eq!(expected_result, result);
1209    }
1210}