ant_node/
node.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use super::{
10    Marker, NodeEvent, error::Result, event::NodeEventsChannel, quote::quotes_verification,
11};
12#[cfg(feature = "open-metrics")]
13use crate::metrics::NodeMetricsRecorder;
14#[cfg(feature = "open-metrics")]
15use crate::networking::MetricsRegistries;
16use crate::networking::{Addresses, Network, NetworkConfig, NetworkError, NetworkEvent, NodeIssue};
17use crate::{PutValidationError, RunningNode};
18use ant_bootstrap::bootstrap::Bootstrap;
19use ant_evm::EvmNetwork;
20use ant_evm::RewardsAddress;
21use ant_protocol::{
22    CLOSE_GROUP_SIZE, NetworkAddress, PrettyPrintRecordKey,
23    error::Error as ProtocolError,
24    messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
25    storage::ValidationType,
26};
27use bytes::Bytes;
28use itertools::Itertools;
29use libp2p::{
30    Multiaddr, PeerId,
31    identity::Keypair,
32    kad::{Record, U256},
33    request_response::OutboundFailure,
34};
35use num_traits::cast::ToPrimitive;
36use rand::{
37    Rng, SeedableRng,
38    rngs::{OsRng, StdRng},
39    thread_rng,
40};
41use std::{
42    collections::HashMap,
43    net::SocketAddr,
44    path::PathBuf,
45    sync::{
46        Arc,
47        atomic::{AtomicUsize, Ordering},
48    },
49    time::{Duration, Instant},
50};
51use tokio::sync::watch;
52use tokio::{
53    sync::mpsc::Receiver,
54    task::{JoinSet, spawn},
55};
56
57/// Interval to trigger replication of all records to all peers.
58/// This is the max time it should take. Minimum interval at any node will be half this
59pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
60
61/// Interval to trigger storage challenge.
62/// This is the max time it should take. Minimum interval at any node will be half this
63const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200;
64
65/// Interval to update the nodes uptime metric
66const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
67
68/// Interval to clean up unrelevant records
69/// This is the max time it should take. Minimum interval at any node will be half this
70const UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S: u64 = 7200;
71
72/// Highest score to achieve from each metric sub-sector during StorageChallenge.
73const HIGHEST_SCORE: usize = 100;
74
75/// Any nodes bearing a score below this shall be considered as bad.
76/// Max is to be 100 * 100
77const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 3000;
78
79/// in ms, expecting average StorageChallenge complete time to be around 250ms.
80const TIME_STEP: usize = 20;
81
82/// Helper to build and run a Node
83pub struct NodeBuilder {
84    addr: SocketAddr,
85    bootstrap: Bootstrap,
86    evm_address: RewardsAddress,
87    evm_network: EvmNetwork,
88    identity_keypair: Keypair,
89    local: bool,
90    #[cfg(feature = "open-metrics")]
91    /// Set to Some to enable the metrics server
92    metrics_server_port: Option<u16>,
93    no_upnp: bool,
94    relay_client: bool,
95    root_dir: PathBuf,
96}
97
98impl NodeBuilder {
99    /// Instantiate the builder. The initial peers can either be supplied via the `initial_peers` method
100    /// or fetched from the bootstrap cache set using `bootstrap_cache` method.
101    pub fn new(
102        identity_keypair: Keypair,
103        bootstrap_flow: Bootstrap,
104        evm_address: RewardsAddress,
105        evm_network: EvmNetwork,
106        addr: SocketAddr,
107        root_dir: PathBuf,
108    ) -> Self {
109        Self {
110            addr,
111            bootstrap: bootstrap_flow,
112            evm_address,
113            evm_network,
114            identity_keypair,
115            local: false,
116            #[cfg(feature = "open-metrics")]
117            metrics_server_port: None,
118            no_upnp: false,
119            relay_client: false,
120            root_dir,
121        }
122    }
123
124    /// Set the flag to indicate if the node is running in local mode
125    pub fn local(&mut self, local: bool) {
126        self.local = local;
127    }
128
129    #[cfg(feature = "open-metrics")]
130    /// Set the port for the OpenMetrics server. Defaults to a random port if not set
131    pub fn metrics_server_port(&mut self, port: Option<u16>) {
132        self.metrics_server_port = port;
133    }
134
135    /// Set the flag to make the node act as a relay client
136    pub fn relay_client(&mut self, relay_client: bool) {
137        self.relay_client = relay_client;
138    }
139
140    /// Set the flag to disable UPnP for the node
141    pub fn no_upnp(&mut self, no_upnp: bool) {
142        self.no_upnp = no_upnp;
143    }
144
145    /// Asynchronously runs a new node instance, setting up the swarm driver,
146    /// creating a data storage, and handling network events. Returns the
147    /// created `RunningNode` which contains a `NodeEventsChannel` for listening
148    /// to node-related events.
149    ///
150    /// # Returns
151    ///
152    /// A `RunningNode` instance.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if there is a problem initializing the Network.
157    pub fn build_and_run(self) -> Result<RunningNode> {
158        // setup metrics
159        #[cfg(feature = "open-metrics")]
160        let (metrics_recorder, metrics_registries) = if self.metrics_server_port.is_some() {
161            // metadata registry
162            let mut metrics_registries = MetricsRegistries::default();
163            let metrics_recorder = NodeMetricsRecorder::new(&mut metrics_registries);
164
165            (Some(metrics_recorder), metrics_registries)
166        } else {
167            (None, MetricsRegistries::default())
168        };
169
170        // create a shutdown signal channel
171        let (shutdown_tx, shutdown_rx) = watch::channel(false);
172
173        // init network
174        let network_config = NetworkConfig {
175            keypair: self.identity_keypair,
176            local: self.local,
177            listen_addr: self.addr,
178            root_dir: self.root_dir.clone(),
179            shutdown_rx: shutdown_rx.clone(),
180            bootstrap: self.bootstrap,
181            no_upnp: self.no_upnp,
182            relay_client: self.relay_client,
183            custom_request_timeout: None,
184            #[cfg(feature = "open-metrics")]
185            metrics_registries,
186            #[cfg(feature = "open-metrics")]
187            metrics_server_port: self.metrics_server_port,
188        };
189        let (network, network_event_receiver) = Network::init(network_config)?;
190
191        // init node
192        let node_events_channel = NodeEventsChannel::default();
193        let node = NodeInner {
194            network: network.clone(),
195            events_channel: node_events_channel.clone(),
196            reward_address: self.evm_address,
197            #[cfg(feature = "open-metrics")]
198            metrics_recorder,
199            evm_network: self.evm_network,
200        };
201        let node = Node {
202            inner: Arc::new(node),
203        };
204
205        // Run the node
206        node.run(network_event_receiver, shutdown_rx);
207        let running_node = RunningNode {
208            shutdown_sender: shutdown_tx,
209            network,
210            node_events_channel,
211            root_dir_path: self.root_dir,
212            rewards_address: self.evm_address,
213        };
214
215        Ok(running_node)
216    }
217}
218
219/// `Node` represents a single node in the distributed network. It handles
220/// network events, processes incoming requests, interacts with the data
221/// storage, and broadcasts node-related events.
222#[derive(Clone)]
223pub(crate) struct Node {
224    inner: Arc<NodeInner>,
225}
226
227/// The actual implementation of the Node. The other is just a wrapper around this, so that we don't expose
228/// the Arc from the interface.
229struct NodeInner {
230    events_channel: NodeEventsChannel,
231    network: Network,
232    #[cfg(feature = "open-metrics")]
233    metrics_recorder: Option<NodeMetricsRecorder>,
234    reward_address: RewardsAddress,
235    evm_network: EvmNetwork,
236}
237
238impl Node {
239    /// Returns the NodeEventsChannel
240    pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
241        &self.inner.events_channel
242    }
243
244    /// Returns the instance of Network
245    pub(crate) fn network(&self) -> &Network {
246        &self.inner.network
247    }
248
249    #[cfg(feature = "open-metrics")]
250    /// Returns a reference to the NodeMetricsRecorder if the `open-metrics` feature flag is enabled
251    /// This is used to record various metrics for the node.
252    pub(crate) fn metrics_recorder(&self) -> Option<&NodeMetricsRecorder> {
253        self.inner.metrics_recorder.as_ref()
254    }
255
256    /// Returns the reward address of the node
257    pub(crate) fn reward_address(&self) -> &RewardsAddress {
258        &self.inner.reward_address
259    }
260
261    pub(crate) fn evm_network(&self) -> &EvmNetwork {
262        &self.inner.evm_network
263    }
264
265    /// Spawns a task to process for `NetworkEvents`.
266    /// Returns both tasks as JoinHandle<()>.
267    fn run(
268        self,
269        mut network_event_receiver: Receiver<NetworkEvent>,
270        mut shutdown_rx: watch::Receiver<bool>,
271    ) {
272        let mut rng = StdRng::from_entropy();
273
274        let peers_connected = Arc::new(AtomicUsize::new(0));
275
276        let _node_task = spawn(async move {
277            // use a random activity timeout to ensure that the nodes do not sync when messages
278            // are being transmitted.
279            let replication_interval: u64 = rng.gen_range(
280                PERIODIC_REPLICATION_INTERVAL_MAX_S / 2..PERIODIC_REPLICATION_INTERVAL_MAX_S,
281            );
282            let replication_interval_time = Duration::from_secs(replication_interval);
283            debug!("Replication interval set to {replication_interval_time:?}");
284
285            let mut replication_interval = tokio::time::interval(replication_interval_time);
286            let _ = replication_interval.tick().await; // first tick completes immediately
287
288            let mut uptime_metrics_update_interval =
289                tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
290            let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately
291
292            // use a random activity timeout to ensure that the nodes do not sync on work,
293            // causing an overall CPU spike.
294            let irrelevant_records_cleanup_interval: u64 = rng.gen_range(
295                UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S / 2
296                    ..UNRELEVANT_RECORDS_CLEANUP_INTERVAL_MAX_S,
297            );
298            let irrelevant_records_cleanup_interval_time =
299                Duration::from_secs(irrelevant_records_cleanup_interval);
300            let mut irrelevant_records_cleanup_interval =
301                tokio::time::interval(irrelevant_records_cleanup_interval_time);
302            let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately
303
304            // use a random neighbour storage challenge ticker to ensure
305            // neighbours do not carryout challenges at the same time
306            let storage_challenge_interval: u64 =
307                rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S);
308            let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval);
309            debug!("Storage challenge interval set to {storage_challenge_interval_time:?}");
310
311            let mut storage_challenge_interval =
312                tokio::time::interval(storage_challenge_interval_time);
313            let _ = storage_challenge_interval.tick().await; // first tick completes immediately
314
315            loop {
316                let peers_connected = &peers_connected;
317
318                tokio::select! {
319                    // Check for a shutdown command.
320                    result = shutdown_rx.changed() => {
321                        if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
322                            info!("Shutdown signal received or sender dropped. Exiting network events loop.");
323                            break;
324                        }
325                    },
326                    net_event = network_event_receiver.recv() => {
327                        match net_event {
328                            Some(event) => {
329                                let start = Instant::now();
330                                let event_string = format!("{event:?}");
331
332                                self.handle_network_event(event, peers_connected);
333                                trace!("Handled non-blocking network event in {:?}: {:?}", start.elapsed(), event_string);
334
335                            }
336                            None => {
337                                error!("The `NetworkEvent` channel is closed");
338                                self.events_channel().broadcast(NodeEvent::ChannelClosed);
339                                break;
340                            }
341                        }
342                    }
343                    // runs every replication_interval time
344                    _ = replication_interval.tick() => {
345                        let start = Instant::now();
346                        let network = self.network().clone();
347                        self.record_metrics(Marker::IntervalReplicationTriggered);
348
349                        let _handle = spawn(async move {
350                            Self::try_interval_replication(network);
351                            trace!("Periodic replication took {:?}", start.elapsed());
352                        });
353                    }
354                    _ = uptime_metrics_update_interval.tick() => {
355                        #[cfg(feature = "open-metrics")]
356                        if let Some(metrics_recorder) = self.metrics_recorder() {
357                            let _ = metrics_recorder.uptime.set(metrics_recorder.started_instant.elapsed().as_secs() as i64);
358                        }
359                    }
360                    _ = irrelevant_records_cleanup_interval.tick() => {
361                        let network = self.network().clone();
362
363                        let _handle = spawn(async move {
364                            Self::trigger_irrelevant_record_cleanup(network);
365                        });
366                    }
367                    // runs every storage_challenge_interval time
368                    _ = storage_challenge_interval.tick() => {
369                        let start = Instant::now();
370                        debug!("Periodic storage challenge triggered");
371                        let network = self.network().clone();
372
373                        let _handle = spawn(async move {
374                            Self::storage_challenge(network).await;
375                            trace!("Periodic storage challenge took {:?}", start.elapsed());
376                        });
377                    }
378                }
379            }
380        });
381    }
382
383    /// Calls Marker::log() to insert the marker into the log files.
384    /// Also calls NodeMetrics::record() to record the metric if the `open-metrics` feature flag is enabled.
385    pub(crate) fn record_metrics(&self, marker: Marker) {
386        marker.log();
387        #[cfg(feature = "open-metrics")]
388        if let Some(metrics_recorder) = self.metrics_recorder() {
389            metrics_recorder.record(marker)
390        }
391    }
392
393    // **** Private helpers *****
394
395    /// Handle a network event.
396    /// Spawns a thread for any likely long running tasks
397    fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
398        let start = Instant::now();
399        let event_string = format!("{event:?}");
400        let event_header;
401
402        // Reducing non-mandatory logging
403        if let NetworkEvent::QueryRequestReceived {
404            query: Query::GetVersion { .. },
405            ..
406        } = event
407        {
408            trace!("Handling NetworkEvent {event_string}");
409        } else {
410            debug!("Handling NetworkEvent {event_string}");
411        }
412
413        match event {
414            NetworkEvent::PeerAdded(peer_id, connected_peers) => {
415                event_header = "PeerAdded";
416                // increment peers_connected and send ConnectedToNetwork event if have connected to K_VALUE peers
417                let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
418                if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
419                    self.events_channel()
420                        .broadcast(NodeEvent::ConnectedToNetwork);
421                }
422
423                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
424                self.record_metrics(Marker::PeerAddedToRoutingTable(&peer_id));
425
426                // try query peer version
427                let network = self.network().clone();
428                let _handle = spawn(async move {
429                    Self::try_query_peer_version(network, peer_id, Default::default()).await;
430                });
431
432                // try replication here
433                let network = self.network().clone();
434                self.record_metrics(Marker::IntervalReplicationTriggered);
435                let _handle = spawn(async move {
436                    Self::try_interval_replication(network);
437                });
438            }
439            NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
440                event_header = "PeerRemoved";
441                self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
442                self.record_metrics(Marker::PeerRemovedFromRoutingTable(&peer_id));
443
444                let self_id = self.network().peer_id();
445                let distance =
446                    NetworkAddress::from(self_id).distance(&NetworkAddress::from(peer_id));
447                info!(
448                    "Node {self_id:?} removed peer from routing table: {peer_id:?}. It has a {:?} distance to us.",
449                    distance.ilog2()
450                );
451
452                let network = self.network().clone();
453                self.record_metrics(Marker::IntervalReplicationTriggered);
454                let _handle = spawn(async move {
455                    Self::try_interval_replication(network);
456                });
457            }
458            NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
459                event_header = "PeerWithUnsupportedProtocol";
460            }
461            NetworkEvent::NewListenAddr(_) => {
462                event_header = "NewListenAddr";
463            }
464            NetworkEvent::ResponseReceived { res } => {
465                event_header = "ResponseReceived";
466                if let Err(err) = self.handle_response(res) {
467                    error!("Error while handling NetworkEvent::ResponseReceived {err:?}");
468                }
469            }
470            NetworkEvent::KeysToFetchForReplication(keys) => {
471                event_header = "KeysToFetchForReplication";
472                self.record_metrics(Marker::fetching_keys_for_replication(&keys));
473
474                if let Err(err) = self.fetch_replication_keys_without_wait(keys) {
475                    error!("Error while trying to fetch replicated data {err:?}");
476                }
477            }
478            NetworkEvent::QueryRequestReceived { query, channel } => {
479                event_header = "QueryRequestReceived";
480                let node = self.clone();
481                let payment_address = *self.reward_address();
482
483                let _handle = spawn(async move {
484                    let network = node.network().clone();
485                    let res = Self::handle_query(node, query, payment_address).await;
486
487                    // Reducing non-mandatory logging
488                    if let Response::Query(QueryResponse::GetVersion { .. }) = res {
489                        trace!("Sending response {res:?}");
490                    } else {
491                        debug!("Sending response {res:?}");
492                    }
493
494                    network.send_response(res, channel);
495                });
496            }
497            NetworkEvent::UnverifiedRecord(record) => {
498                event_header = "UnverifiedRecord";
499                // queries can be long running and require validation, so we spawn a task to handle them
500                let self_clone = self.clone();
501                let _handle = spawn(async move {
502                    let key = PrettyPrintRecordKey::from(&record.key).into_owned();
503                    match self_clone.validate_and_store_record(record).await {
504                        Ok(()) => debug!("UnverifiedRecord {key} has been stored"),
505                        Err(err) => {
506                            self_clone.record_metrics(Marker::RecordRejected(&key, &err));
507                        }
508                    }
509                });
510            }
511            NetworkEvent::TerminateNode { reason } => {
512                event_header = "TerminateNode";
513                error!("Received termination from swarm_driver due to {reason:?}");
514                self.events_channel()
515                    .broadcast(NodeEvent::TerminateNode(format!("{reason}")));
516            }
517            NetworkEvent::FailedToFetchHolders(bad_nodes) => {
518                event_header = "FailedToFetchHolders";
519                let network = self.network().clone();
520                let pretty_log: Vec<_> = bad_nodes
521                    .iter()
522                    .map(|(peer_id, record_key)| {
523                        let pretty_key = PrettyPrintRecordKey::from(record_key);
524                        (peer_id, pretty_key)
525                    })
526                    .collect();
527                // Note: this log will be checked in CI, and expecting `not appear`.
528                //       any change to the keyword `failed to fetch` shall incur
529                //       correspondent CI script change as well.
530                debug!(
531                    "Received notification from replication_fetcher, notifying {pretty_log:?} failed to fetch replication copies from."
532                );
533                let _handle = spawn(async move {
534                    for (peer_id, record_key) in bad_nodes {
535                        // Obsoleted fetch request (due to flooded in fresh replicates) could result
536                        // in peer to be claimed as bad, as local copy blocks the entry to be cleared.
537                        if let Ok(false) = network.is_record_key_present_locally(&record_key).await
538                        {
539                            error!(
540                                "From peer {peer_id:?}, failed to fetch record {:?}",
541                                PrettyPrintRecordKey::from(&record_key)
542                            );
543                            network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
544                        }
545                    }
546                });
547            }
548            NetworkEvent::QuoteVerification { quotes } => {
549                event_header = "QuoteVerification";
550                let network = self.network().clone();
551
552                let _handle = spawn(async move {
553                    quotes_verification(&network, quotes).await;
554                });
555            }
556            NetworkEvent::FreshReplicateToFetch { holder, keys } => {
557                event_header = "FreshReplicateToFetch";
558                self.fresh_replicate_to_fetch(holder, keys);
559            }
560            NetworkEvent::PeersForVersionQuery(peers) => {
561                event_header = "PeersForVersionQuery";
562                let network = self.network().clone();
563                let _handle = spawn(async move {
564                    Self::query_peers_version(network, peers).await;
565                });
566            }
567        }
568
569        trace!(
570            "Network handling statistics, Event {event_header:?} handled in {:?} : {event_string:?}",
571            start.elapsed()
572        );
573    }
574
575    // Handle the response that was not awaited at the call site
576    fn handle_response(&self, response: Response) -> Result<()> {
577        match response {
578            Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
579                // This should actually have been short-circuted when received
580                warn!("Mishandled replicate response, should be handled earlier");
581            }
582            Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
583                error!(
584                    "Response to replication shall be handled by called not by common handler, {resp:?}"
585                );
586            }
587            Response::Cmd(CmdResponse::FreshReplicate(Ok(()))) => {
588                // No need to handle
589            }
590            other => {
591                warn!("handle_response not implemented for {other:?}");
592            }
593        };
594
595        Ok(())
596    }
597
598    async fn handle_query(node: Self, query: Query, payment_address: RewardsAddress) -> Response {
599        let network = node.network();
600        let resp: QueryResponse = match query {
601            Query::GetStoreQuote {
602                key,
603                data_type,
604                data_size,
605                nonce,
606                difficulty,
607            } => {
608                let record_key = key.to_record_key();
609                let self_id = network.peer_id();
610
611                let maybe_quoting_metrics = network
612                    .get_local_quoting_metrics(record_key.clone(), data_type, data_size)
613                    .await;
614
615                let storage_proofs = if let Some(nonce) = nonce {
616                    Self::respond_x_closest_record_proof(
617                        network,
618                        key.clone(),
619                        nonce,
620                        difficulty,
621                        false,
622                    )
623                    .await
624                } else {
625                    vec![]
626                };
627
628                match maybe_quoting_metrics {
629                    Ok((quoting_metrics, is_already_stored)) => {
630                        if is_already_stored {
631                            QueryResponse::GetStoreQuote {
632                                quote: Err(ProtocolError::RecordExists(
633                                    PrettyPrintRecordKey::from(&record_key).into_owned(),
634                                )),
635                                peer_address: NetworkAddress::from(self_id),
636                                storage_proofs,
637                            }
638                        } else {
639                            QueryResponse::GetStoreQuote {
640                                quote: Self::create_quote_for_storecost(
641                                    network,
642                                    &key,
643                                    &quoting_metrics,
644                                    &payment_address,
645                                ),
646                                peer_address: NetworkAddress::from(self_id),
647                                storage_proofs,
648                            }
649                        }
650                    }
651                    Err(err) => {
652                        warn!("GetStoreQuote failed for {key:?}: {err}");
653                        QueryResponse::GetStoreQuote {
654                            quote: Err(ProtocolError::GetStoreQuoteFailed),
655                            peer_address: NetworkAddress::from(self_id),
656                            storage_proofs,
657                        }
658                    }
659                }
660            }
661            Query::GetReplicatedRecord { requester: _, key } => {
662                let our_address = NetworkAddress::from(network.peer_id());
663                let mut result = Err(ProtocolError::ReplicatedRecordNotFound {
664                    holder: Box::new(our_address.clone()),
665                    key: Box::new(key.clone()),
666                });
667                let record_key = key.to_record_key();
668
669                if let Ok(Some(record)) = network.get_local_record(&record_key).await {
670                    result = Ok((our_address, Bytes::from(record.value)));
671                }
672
673                QueryResponse::GetReplicatedRecord(result)
674            }
675            Query::GetChunkExistenceProof {
676                key,
677                nonce,
678                difficulty,
679            } => QueryResponse::GetChunkExistenceProof(
680                Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true).await,
681            ),
682            Query::CheckNodeInProblem(target_address) => {
683                debug!("Got CheckNodeInProblem for peer {target_address:?}");
684
685                let is_in_trouble =
686                    if let Ok(result) = network.is_peer_shunned(target_address.clone()).await {
687                        result
688                    } else {
689                        debug!("Could not get status of {target_address:?}.");
690                        false
691                    };
692
693                QueryResponse::CheckNodeInProblem {
694                    reporter_address: NetworkAddress::from(network.peer_id()),
695                    target_address,
696                    is_in_trouble,
697                }
698            }
699            Query::GetClosestPeers {
700                key,
701                num_of_peers,
702                range,
703                sign_result,
704            } => {
705                debug!(
706                    "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required."
707                );
708                Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result)
709                    .await
710            }
711            Query::GetVersion(_) => QueryResponse::GetVersion {
712                peer: NetworkAddress::from(network.peer_id()),
713                version: ant_build_info::package_version(),
714            },
715            Query::PutRecord {
716                holder,
717                address,
718                serialized_record,
719            } => {
720                let record = Record {
721                    key: address.to_record_key(),
722                    value: serialized_record,
723                    publisher: None,
724                    expires: None,
725                };
726
727                let key = PrettyPrintRecordKey::from(&record.key).into_owned();
728                let result = match node.validate_and_store_record(record).await {
729                    Ok(()) => Ok(()),
730                    Err(PutValidationError::OutdatedRecordCounter { counter, expected }) => {
731                        node.record_metrics(Marker::RecordRejected(
732                            &key,
733                            &PutValidationError::OutdatedRecordCounter { counter, expected },
734                        ));
735                        Err(ProtocolError::OutdatedRecordCounter { counter, expected })
736                    }
737                    Err(err) => {
738                        node.record_metrics(Marker::RecordRejected(&key, &err));
739                        Err(ProtocolError::PutRecordFailed(format!("{err:?}")))
740                    }
741                };
742                QueryResponse::PutRecord {
743                    result,
744                    peer_address: holder,
745                    record_addr: address,
746                }
747            }
748        };
749        Response::Query(resp)
750    }
751
752    async fn respond_get_closest_peers(
753        network: &Network,
754        target: NetworkAddress,
755        num_of_peers: Option<usize>,
756        range: Option<[u8; 32]>,
757        sign_result: bool,
758    ) -> QueryResponse {
759        let local_peers = network.get_local_peers_with_multiaddr().await;
760        let peers: Vec<(NetworkAddress, Vec<Multiaddr>)> = if let Ok(local_peers) = local_peers {
761            Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range)
762        } else {
763            vec![]
764        };
765
766        let signature = if sign_result {
767            let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default();
768            bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default());
769            network.sign(&bytes).ok()
770        } else {
771            None
772        };
773
774        QueryResponse::GetClosestPeers {
775            target,
776            peers,
777            signature,
778        }
779    }
780
781    fn calculate_get_closest_peers(
782        peer_addrs: Vec<(PeerId, Vec<Multiaddr>)>,
783        target: NetworkAddress,
784        num_of_peers: Option<usize>,
785        range: Option<[u8; 32]>,
786    ) -> Vec<(NetworkAddress, Vec<Multiaddr>)> {
787        match (num_of_peers, range) {
788            (_, Some(value)) => {
789                let distance = U256::from_big_endian(&value);
790                peer_addrs
791                    .iter()
792                    .filter_map(|(peer_id, multi_addrs)| {
793                        let addr = NetworkAddress::from(*peer_id);
794                        if target.distance(&addr).0 <= distance {
795                            Some((addr, multi_addrs.clone()))
796                        } else {
797                            None
798                        }
799                    })
800                    .collect()
801            }
802            (Some(num_of_peers), _) => {
803                let mut result: Vec<(NetworkAddress, Vec<Multiaddr>)> = peer_addrs
804                    .iter()
805                    .map(|(peer_id, multi_addrs)| {
806                        let addr = NetworkAddress::from(*peer_id);
807                        (addr, multi_addrs.clone())
808                    })
809                    .collect();
810                result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
811                result.into_iter().take(num_of_peers).collect()
812            }
813            (None, None) => vec![],
814        }
815    }
816
817    // Nodes only check ChunkProof each other, to avoid `multi-version` issue
818    // Client check proof against all records, as have to fetch from network anyway.
819    async fn respond_x_closest_record_proof(
820        network: &Network,
821        key: NetworkAddress,
822        nonce: Nonce,
823        difficulty: usize,
824        chunk_only: bool,
825    ) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
826        let start = Instant::now();
827        let mut results = vec![];
828        if difficulty == 1 {
829            // Client checking existence of published chunk.
830            let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
831            if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
832                let proof = ChunkProof::new(&record.value, nonce);
833                debug!("Chunk proof for {key:?} is {proof:?}");
834                result = Ok(proof)
835            } else {
836                debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
837            }
838
839            results.push((key.clone(), result));
840        } else {
841            let all_local_records = network.get_all_local_record_addresses().await;
842
843            if let Ok(all_local_records) = all_local_records {
844                let mut all_chunk_addrs: Vec<_> = if chunk_only {
845                    all_local_records
846                        .iter()
847                        .filter_map(|(addr, record_type)| {
848                            if *record_type == ValidationType::Chunk {
849                                Some(addr.clone())
850                            } else {
851                                None
852                            }
853                        })
854                        .collect()
855                } else {
856                    all_local_records.keys().cloned().collect()
857                };
858
859                // Sort by distance and only take first X closest entries
860                all_chunk_addrs.sort_by_key(|addr| key.distance(addr));
861
862                // TODO: this shall be deduced from resource usage dynamically
863                let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE);
864
865                for addr in all_chunk_addrs.iter().take(workload_factor) {
866                    if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
867                    {
868                        let proof = ChunkProof::new(&record.value, nonce);
869                        debug!("Chunk proof for {key:?} is {proof:?}");
870                        results.push((addr.clone(), Ok(proof)));
871                    }
872                }
873            }
874
875            info!(
876                "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
877                results.len(),
878                start.elapsed()
879            );
880        }
881
882        results
883    }
884
885    /// Check among all chunk type records that we have,
886    /// and randomly pick one as the verification candidate.
887    /// This will challenge all closest peers at once.
888    async fn storage_challenge(network: Network) {
889        let start = Instant::now();
890        let closest_peers: Vec<(PeerId, Addresses)> = if let Ok(closest_peers) =
891            network.get_k_closest_local_peers_to_the_target(None).await
892        {
893            closest_peers
894                .into_iter()
895                .take(CLOSE_GROUP_SIZE)
896                .collect_vec()
897        } else {
898            error!("Cannot get local neighbours");
899            return;
900        };
901        if closest_peers.len() < CLOSE_GROUP_SIZE {
902            debug!(
903                "Not enough neighbours ({}/{}) to carry out storage challenge.",
904                closest_peers.len(),
905                CLOSE_GROUP_SIZE
906            );
907            return;
908        }
909
910        let mut verify_candidates: Vec<NetworkAddress> =
911            if let Ok(all_keys) = network.get_all_local_record_addresses().await {
912                all_keys
913                    .iter()
914                    .filter_map(|(addr, record_type)| {
915                        if ValidationType::Chunk == *record_type {
916                            Some(addr.clone())
917                        } else {
918                            None
919                        }
920                    })
921                    .collect()
922            } else {
923                error!("Failed to get local record addresses.");
924                return;
925            };
926        let num_of_targets = verify_candidates.len();
927        if num_of_targets < 50 {
928            debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours.");
929            return;
930        }
931
932        // To ensure the neighbours sharing same knowledge as to us,
933        // The target is choosen to be not far from us.
934        let self_addr = NetworkAddress::from(network.peer_id());
935        verify_candidates.sort_by_key(|addr| self_addr.distance(addr));
936        let index: usize = OsRng.gen_range(0..num_of_targets / 2);
937        let target = verify_candidates[index].clone();
938        // TODO: workload shall be dynamically deduced from resource usage
939        let difficulty = CLOSE_GROUP_SIZE;
940        verify_candidates.sort_by_key(|addr| target.distance(addr));
941        let expected_targets = verify_candidates.into_iter().take(difficulty);
942        let nonce: Nonce = thread_rng().r#gen::<u64>();
943        let mut expected_proofs = HashMap::new();
944        for addr in expected_targets {
945            if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
946                let expected_proof = ChunkProof::new(&record.value, nonce);
947                let _ = expected_proofs.insert(addr, expected_proof);
948            } else {
949                error!("Local record {addr:?} cann't be loaded from disk.");
950            }
951        }
952        let request = Request::Query(Query::GetChunkExistenceProof {
953            key: target.clone(),
954            nonce,
955            difficulty,
956        });
957
958        let mut tasks = JoinSet::new();
959        for (peer_id, addresses) in closest_peers {
960            if peer_id == network.peer_id() {
961                continue;
962            }
963            let network_clone = network.clone();
964            let request_clone = request.clone();
965            let expected_proofs_clone = expected_proofs.clone();
966            let _ = tasks.spawn(async move {
967                let res = scoring_peer(
968                    network_clone,
969                    (peer_id, addresses),
970                    request_clone,
971                    expected_proofs_clone,
972                )
973                .await;
974                (peer_id, res)
975            });
976        }
977
978        let mut peer_scores = vec![];
979        while let Some(res) = tasks.join_next().await {
980            match res {
981                Ok((peer_id, score)) => {
982                    let is_healthy = score > MIN_ACCEPTABLE_HEALTHY_SCORE;
983                    if !is_healthy {
984                        info!(
985                            "Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."
986                        );
987                        // TODO: shall the challenge failure immediately triggers the node to be removed?
988                        network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
989                    }
990                    peer_scores.push((peer_id, is_healthy));
991                }
992                Err(e) => {
993                    info!("StorageChallenge task completed with error {e:?}");
994                }
995            }
996        }
997        if !peer_scores.is_empty() {
998            network.notify_peer_scores(peer_scores);
999        }
1000
1001        info!(
1002            "Completed node StorageChallenge against neighbours in {:?}!",
1003            start.elapsed()
1004        );
1005    }
1006
1007    /// Query peers' versions and update local knowledge.
1008    async fn query_peers_version(network: Network, peers: Vec<(PeerId, Addresses)>) {
1009        // To avoid choking, carry out the queries one by one
1010        for (peer_id, addrs) in peers {
1011            Self::try_query_peer_version(network.clone(), peer_id, addrs).await;
1012        }
1013    }
1014
1015    /// Query peer's version and update local knowledge.
1016    async fn try_query_peer_version(network: Network, peer: PeerId, addrs: Addresses) {
1017        let request = Request::Query(Query::GetVersion(NetworkAddress::from(peer)));
1018        // We can skip passing `addrs` here as the new peer should be part of the kad::RT and swarm can get the addr.
1019        let version = match network.send_request(request, peer, addrs).await {
1020            Ok((Response::Query(QueryResponse::GetVersion { version, .. }), _conn_info)) => {
1021                trace!("Fetched peer version {peer:?} as {version:?}");
1022                version
1023            }
1024            Ok(other) => {
1025                info!("Not a fetched peer version {peer:?}, {other:?}");
1026                "none".to_string()
1027            }
1028            Err(err) => {
1029                info!("Failed to fetch peer version {peer:?} with error {err:?}");
1030                // Failed version fetch (which contains dial then re-attempt by itself)
1031                // with error of `DialFailure` indicates the peer could be dead with high chance.
1032                // In that case, the peer shall be removed from the routing table.
1033                if let NetworkError::OutboundError(OutboundFailure::DialFailure) = err {
1034                    network.remove_peer(peer);
1035                    return;
1036                }
1037                "old".to_string()
1038            }
1039        };
1040        network.notify_node_version(peer, version);
1041    }
1042}
1043
1044async fn scoring_peer(
1045    network: Network,
1046    peer: (PeerId, Addresses),
1047    request: Request,
1048    expected_proofs: HashMap<NetworkAddress, ChunkProof>,
1049) -> usize {
1050    let peer_id = peer.0;
1051    let start = Instant::now();
1052    let responses = network
1053        .send_and_get_responses(&[peer], &request, true)
1054        .await;
1055
1056    if let Some(Ok((Response::Query(QueryResponse::GetChunkExistenceProof(answers)), _conn_info))) =
1057        responses.get(&peer_id)
1058    {
1059        if answers.is_empty() {
1060            info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
1061            return 0;
1062        }
1063        let elapsed = start.elapsed();
1064
1065        let mut received_proofs = vec![];
1066        for (addr, proof) in answers {
1067            if let Ok(proof) = proof {
1068                received_proofs.push((addr.clone(), proof.clone()));
1069            }
1070        }
1071
1072        let score = mark_peer(elapsed, received_proofs, &expected_proofs);
1073        info!(
1074            "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
1075            answers.len()
1076        );
1077        score
1078    } else {
1079        info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
1080        0
1081    }
1082}
1083
1084// Based on following metrics:
1085//   * the duration
1086//   * is there false answer
1087//   * percentage of correct answers among the expected closest
1088// The higher the score, the better confidence on the peer
1089fn mark_peer(
1090    duration: Duration,
1091    answers: Vec<(NetworkAddress, ChunkProof)>,
1092    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1093) -> usize {
1094    let duration_score = duration_score_scheme(duration);
1095    let challenge_score = challenge_score_scheme(answers, expected_proofs);
1096
1097    duration_score * challenge_score
1098}
1099
1100// Less duration shall get higher score
1101fn duration_score_scheme(duration: Duration) -> usize {
1102    // So far just a simple stepped scheme, capped by HIGHEST_SCORE
1103    let in_ms = if let Some(value) = duration.as_millis().to_usize() {
1104        value
1105    } else {
1106        info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
1107        1000
1108    };
1109
1110    let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
1111    HIGHEST_SCORE - step
1112}
1113
1114// Any false answer shall result in 0 score immediately
1115fn challenge_score_scheme(
1116    answers: Vec<(NetworkAddress, ChunkProof)>,
1117    expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
1118) -> usize {
1119    let mut correct_answers = 0;
1120    for (addr, chunk_proof) in answers {
1121        if let Some(expected_proof) = expected_proofs.get(&addr) {
1122            if expected_proof.verify(&chunk_proof) {
1123                correct_answers += 1;
1124            } else {
1125                info!("Spot a false answer to the challenge regarding {addr:?}");
1126                // Any false answer shall result in 0 score immediately
1127                return 0;
1128            }
1129        }
1130    }
1131    // TODO: For those answers not among the expected_proofs,
1132    //       it could be due to having different knowledge of records to us.
1133    //       shall we:
1134    //         * set the target being close to us, so that neighbours sharing same knowledge in higher chance
1135    //         * fetch from local to testify
1136    //         * fetch from network to testify
1137    std::cmp::min(
1138        HIGHEST_SCORE,
1139        HIGHEST_SCORE * correct_answers / expected_proofs.len(),
1140    )
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145    use super::*;
1146    use std::str::FromStr;
1147
1148    #[test]
1149    fn test_no_local_peers() {
1150        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
1151        let target = NetworkAddress::from(PeerId::random());
1152        let num_of_peers = Some(5);
1153        let range = None;
1154        let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range);
1155
1156        assert_eq!(result, vec![]);
1157    }
1158
1159    #[test]
1160    fn test_fewer_local_peers_than_num_of_peers() {
1161        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1162            (
1163                PeerId::random(),
1164                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1165            ),
1166            (
1167                PeerId::random(),
1168                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1169            ),
1170            (
1171                PeerId::random(),
1172                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1173            ),
1174        ];
1175        let target = NetworkAddress::from(PeerId::random());
1176        let num_of_peers = Some(2);
1177        let range = None;
1178        let result = Node::calculate_get_closest_peers(
1179            local_peers.clone(),
1180            target.clone(),
1181            num_of_peers,
1182            range,
1183        );
1184
1185        // Result shall be sorted and truncated
1186        let mut expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1187            .iter()
1188            .map(|(peer_id, multi_addrs)| {
1189                let addr = NetworkAddress::from(*peer_id);
1190                (addr, multi_addrs.clone())
1191            })
1192            .collect();
1193        expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr));
1194        let expected_result: Vec<_> = expected_result.into_iter().take(2).collect();
1195
1196        assert_eq!(expected_result, result);
1197    }
1198
1199    #[test]
1200    fn test_with_range_and_num_of_peers() {
1201        let local_peers: Vec<(PeerId, Vec<Multiaddr>)> = vec![
1202            (
1203                PeerId::random(),
1204                vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()],
1205            ),
1206            (
1207                PeerId::random(),
1208                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1209            ),
1210            (
1211                PeerId::random(),
1212                vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()],
1213            ),
1214        ];
1215        let target = NetworkAddress::from(PeerId::random());
1216        let num_of_peers = Some(0);
1217        let range_value = [128; 32];
1218        let range = Some(range_value);
1219        let result = Node::calculate_get_closest_peers(
1220            local_peers.clone(),
1221            target.clone(),
1222            num_of_peers,
1223            range,
1224        );
1225
1226        // Range shall be preferred, i.e. the result peers shall all within the range
1227        let distance = U256::from_big_endian(&range_value);
1228        let expected_result: Vec<(NetworkAddress, Vec<Multiaddr>)> = local_peers
1229            .into_iter()
1230            .filter_map(|(peer_id, multi_addrs)| {
1231                let addr = NetworkAddress::from(peer_id);
1232                if target.distance(&addr).0 <= distance {
1233                    Some((addr, multi_addrs.clone()))
1234                } else {
1235                    None
1236                }
1237            })
1238            .collect();
1239
1240        assert_eq!(expected_result, result);
1241    }
1242}