ant_networking/
cmd.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::time::Instant;
10use crate::{
11    config::GetRecordCfg,
12    driver::{PendingGetClosestType, SwarmDriver},
13    error::{NetworkError, Result},
14    event::TerminateNodeReason,
15    log_markers::Marker,
16    Addresses, GetRecordError, MsgResponder, NetworkEvent, ResponseQuorum, CLOSE_GROUP_SIZE,
17};
18use ant_evm::{PaymentQuote, QuotingMetrics};
19use ant_protocol::messages::ConnectionInfo;
20use ant_protocol::{
21    messages::{Cmd, Request, Response},
22    storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
23    NetworkAddress, PrettyPrintRecordKey,
24};
25use libp2p::{
26    kad::{
27        store::{Error as StoreError, RecordStore},
28        KBucketDistance as Distance, Record, RecordKey, K_VALUE,
29    },
30    swarm::dial_opts::{DialOpts, PeerCondition},
31    Multiaddr, PeerId,
32};
33use std::{
34    collections::{BTreeMap, HashMap},
35    fmt::Debug,
36    time::Duration,
37};
38use tokio::sync::oneshot;
39use xor_name::XorName;
40
41const MAX_CONTINUOUS_HDD_WRITE_ERROR: usize = 5;
42
43// Shall be synced with `ant_node::PERIODIC_REPLICATION_INTERVAL_MAX_S`
44const REPLICATION_TIMEOUT: Duration = Duration::from_secs(45);
45
46// Throttles replication to at most once every 30 seconds
47const MIN_REPLICATION_INTERVAL_S: Duration = Duration::from_secs(30);
48
49#[derive(Debug, Eq, PartialEq, Clone)]
50pub enum NodeIssue {
51    /// Some connections might be considered to be critical and should be tracked.
52    ConnectionIssue,
53    /// Data Replication failed
54    ReplicationFailure,
55    /// Close nodes have reported this peer as bad
56    CloseNodesShunning,
57    /// Provided a bad quote
58    BadQuoting,
59    /// Peer failed to pass the chunk proof verification
60    FailedChunkProofCheck,
61}
62
63impl std::fmt::Display for NodeIssue {
64    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
65        match self {
66            NodeIssue::ConnectionIssue => write!(f, "CriticalConnectionIssue"),
67            NodeIssue::ReplicationFailure => write!(f, "ReplicationFailure"),
68            NodeIssue::CloseNodesShunning => write!(f, "CloseNodesShunning"),
69            NodeIssue::BadQuoting => write!(f, "BadQuoting"),
70            NodeIssue::FailedChunkProofCheck => write!(f, "FailedChunkProofCheck"),
71        }
72    }
73}
74
75/// Commands to send to the Swarm
76pub enum LocalSwarmCmd {
77    /// Get a list of all peers in local RT, with correspondent Multiaddr info attached as well.
78    GetPeersWithMultiaddr {
79        sender: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
80    },
81    /// Get a map where each key is the ilog2 distance of that Kbucket
82    /// and each value is a vector of peers in that bucket.
83    GetKBuckets {
84        sender: oneshot::Sender<BTreeMap<u32, Vec<PeerId>>>,
85    },
86    // Returns up to K_VALUE peers from all the k-buckets from the local Routing Table.
87    // And our PeerId as well.
88    GetClosestKLocalPeers {
89        sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
90    },
91    // Get X closest peers to target from the local RoutingTable, self not included
92    GetCloseLocalPeersToTarget {
93        key: NetworkAddress,
94        num_of_peers: usize,
95        sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
96    },
97    GetSwarmLocalState(oneshot::Sender<SwarmLocalState>),
98    /// Check if the local RecordStore contains the provided key
99    RecordStoreHasKey {
100        key: RecordKey,
101        sender: oneshot::Sender<Result<bool>>,
102    },
103    /// Get the Addresses of all the Records held locally
104    GetAllLocalRecordAddresses {
105        sender: oneshot::Sender<Result<HashMap<NetworkAddress, ValidationType>>>,
106    },
107    /// Get data from the local RecordStore
108    GetLocalRecord {
109        key: RecordKey,
110        sender: oneshot::Sender<Option<Record>>,
111    },
112    /// GetLocalQuotingMetrics for this node
113    /// Returns the quoting metrics and whether the record at `key` is already stored locally
114    GetLocalQuotingMetrics {
115        key: RecordKey,
116        data_type: u32,
117        data_size: usize,
118        sender: oneshot::Sender<Result<(QuotingMetrics, bool)>>,
119    },
120    /// Notify the node received a payment.
121    PaymentReceived,
122    /// Put record to the local RecordStore
123    PutLocalRecord {
124        record: Record,
125        is_client_put: bool,
126    },
127    /// Remove a local record from the RecordStore
128    /// Typically because the write failed
129    RemoveFailedLocalRecord {
130        key: RecordKey,
131    },
132    /// Add a local record to the RecordStore's HashSet of stored records
133    /// This should be done after the record has been stored to disk
134    AddLocalRecordAsStored {
135        key: RecordKey,
136        record_type: ValidationType,
137        data_type: DataTypes,
138    },
139    /// Add a peer to the blocklist
140    AddPeerToBlockList {
141        peer_id: PeerId,
142    },
143    /// Notify whether peer is in trouble
144    RecordNodeIssue {
145        peer_id: PeerId,
146        issue: NodeIssue,
147    },
148    // Whether peer is considered as `in trouble` by self
149    IsPeerShunned {
150        target: NetworkAddress,
151        sender: oneshot::Sender<bool>,
152    },
153    // Quote verification agaisnt historical collected quotes
154    QuoteVerification {
155        quotes: Vec<(PeerId, PaymentQuote)>,
156    },
157    // Notify a fetch completion
158    FetchCompleted((RecordKey, ValidationType)),
159    /// Triggers interval repliation
160    /// NOTE: This does result in outgoing messages, but is produced locally
161    TriggerIntervalReplication,
162    /// Triggers unrelevant record cleanup
163    TriggerIrrelevantRecordCleanup,
164    /// Add a network density sample
165    AddNetworkDensitySample {
166        distance: Distance,
167    },
168    /// Send peer scores (collected from storage challenge) to replication_fetcher
169    NotifyPeerScores {
170        peer_scores: Vec<(PeerId, bool)>,
171    },
172    /// Add fresh replicate records into replication_fetcher
173    AddFreshReplicateRecords {
174        holder: NetworkAddress,
175        keys: Vec<(NetworkAddress, ValidationType)>,
176    },
177    /// Notify a fetched peer's version
178    NotifyPeerVersion {
179        peer: PeerId,
180        version: String,
181    },
182    /// Get responsible distance range.
183    GetNetworkDensity {
184        sender: oneshot::Sender<Option<Distance>>,
185    },
186    /// Remove peer from the routing table
187    RemovePeer {
188        peer: PeerId,
189    },
190}
191
192/// Commands to send to the Swarm
193pub enum NetworkSwarmCmd {
194    // Get closest peers from the network
195    GetClosestPeersToAddressFromNetwork {
196        key: NetworkAddress,
197        sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
198    },
199
200    // Send Request to the PeerId.
201    SendRequest {
202        req: Request,
203        peer: PeerId,
204        /// If the address is provided, we will try to perform a dial before sending the request.
205        addrs: Option<Addresses>,
206
207        // If a `sender` is provided, the requesting node will await for a `Response` from the
208        // Peer. The result is then returned at the call site.
209        //
210        // If a `sender` is not provided, the requesting node will not wait for the Peer's
211        // response. Instead we trigger a `NetworkEvent::ResponseReceived` which calls the common
212        // `response_handler`
213        #[allow(clippy::type_complexity)]
214        sender: Option<oneshot::Sender<Result<(Response, Option<ConnectionInfo>)>>>,
215    },
216    SendResponse {
217        resp: Response,
218        channel: MsgResponder,
219    },
220
221    /// Get Record from the Kad network
222    GetNetworkRecord {
223        key: RecordKey,
224        sender: oneshot::Sender<std::result::Result<Record, GetRecordError>>,
225        cfg: GetRecordCfg,
226    },
227
228    /// Put record to network
229    PutRecord {
230        record: Record,
231        sender: oneshot::Sender<Result<()>>,
232        quorum: ResponseQuorum,
233    },
234    /// Put record to specific node
235    PutRecordTo {
236        peers: Vec<PeerId>,
237        record: Record,
238        sender: oneshot::Sender<Result<()>>,
239        quorum: ResponseQuorum,
240    },
241
242    // Attempt to dial specific peer.
243    DialPeer {
244        peer: PeerId,
245        addrs: Addresses,
246    },
247}
248
249/// Debug impl for LocalSwarmCmd to avoid printing full Record, instead only RecodKey
250/// and RecordKind are printed.
251impl Debug for LocalSwarmCmd {
252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253        match self {
254            LocalSwarmCmd::PutLocalRecord {
255                record,
256                is_client_put,
257            } => {
258                write!(
259                    f,
260                    "LocalSwarmCmd::PutLocalRecord {{ key: {:?}, is_client_put: {is_client_put:?} }}",
261                    PrettyPrintRecordKey::from(&record.key)
262                )
263            }
264            LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
265                write!(
266                    f,
267                    "LocalSwarmCmd::RemoveFailedLocalRecord {{ key: {:?} }}",
268                    PrettyPrintRecordKey::from(key)
269                )
270            }
271            LocalSwarmCmd::AddLocalRecordAsStored {
272                key,
273                record_type,
274                data_type,
275            } => {
276                write!(
277                    f,
278                    "LocalSwarmCmd::AddLocalRecordAsStored {{ key: {:?}, record_type: {record_type:?}, data_type: {data_type:?} }}",
279                    PrettyPrintRecordKey::from(key)
280                )
281            }
282            LocalSwarmCmd::GetClosestKLocalPeers { .. } => {
283                write!(f, "LocalSwarmCmd::GetClosestKLocalPeers")
284            }
285            LocalSwarmCmd::GetCloseLocalPeersToTarget {
286                key, num_of_peers, ..
287            } => {
288                write!(
289                    f,
290                    "LocalSwarmCmd::GetCloseLocalPeersToTarget {{ key: {key:?}, num_of_peers: {num_of_peers} }}"
291                )
292            }
293            LocalSwarmCmd::GetLocalQuotingMetrics { .. } => {
294                write!(f, "LocalSwarmCmd::GetLocalQuotingMetrics")
295            }
296            LocalSwarmCmd::PaymentReceived => {
297                write!(f, "LocalSwarmCmd::PaymentReceived")
298            }
299            LocalSwarmCmd::GetLocalRecord { key, .. } => {
300                write!(
301                    f,
302                    "LocalSwarmCmd::GetLocalRecord {{ key: {:?} }}",
303                    PrettyPrintRecordKey::from(key)
304                )
305            }
306            LocalSwarmCmd::GetAllLocalRecordAddresses { .. } => {
307                write!(f, "LocalSwarmCmd::GetAllLocalRecordAddresses")
308            }
309            LocalSwarmCmd::GetPeersWithMultiaddr { .. } => {
310                write!(f, "LocalSwarmCmd::GetPeersWithMultiaddr")
311            }
312            LocalSwarmCmd::GetKBuckets { .. } => {
313                write!(f, "LocalSwarmCmd::GetKBuckets")
314            }
315            LocalSwarmCmd::GetSwarmLocalState { .. } => {
316                write!(f, "LocalSwarmCmd::GetSwarmLocalState")
317            }
318            LocalSwarmCmd::RecordStoreHasKey { key, .. } => {
319                write!(
320                    f,
321                    "LocalSwarmCmd::RecordStoreHasKey {:?}",
322                    PrettyPrintRecordKey::from(key)
323                )
324            }
325            LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
326                write!(f, "LocalSwarmCmd::AddPeerToBlockList {peer_id:?}")
327            }
328            LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
329                write!(
330                    f,
331                    "LocalSwarmCmd::SendNodeStatus peer {peer_id:?}, issue: {issue:?}"
332                )
333            }
334            LocalSwarmCmd::IsPeerShunned { target, .. } => {
335                write!(f, "LocalSwarmCmd::IsPeerInTrouble target: {target:?}")
336            }
337            LocalSwarmCmd::QuoteVerification { quotes } => {
338                write!(
339                    f,
340                    "LocalSwarmCmd::QuoteVerification of {} quotes",
341                    quotes.len()
342                )
343            }
344            LocalSwarmCmd::FetchCompleted((key, record_type)) => {
345                write!(
346                    f,
347                    "LocalSwarmCmd::FetchCompleted({record_type:?} : {:?})",
348                    PrettyPrintRecordKey::from(key)
349                )
350            }
351            LocalSwarmCmd::TriggerIntervalReplication => {
352                write!(f, "LocalSwarmCmd::TriggerIntervalReplication")
353            }
354            LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
355                write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup")
356            }
357            LocalSwarmCmd::AddNetworkDensitySample { distance } => {
358                write!(f, "LocalSwarmCmd::AddNetworkDensitySample({distance:?})")
359            }
360            LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
361                write!(f, "LocalSwarmCmd::NotifyPeerScores({peer_scores:?})")
362            }
363            LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
364                write!(
365                    f,
366                    "LocalSwarmCmd::AddFreshReplicateRecords({holder:?}, {keys:?})"
367                )
368            }
369            LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
370                write!(f, "LocalSwarmCmd::NotifyPeerVersion({peer:?}, {version:?})")
371            }
372            LocalSwarmCmd::GetNetworkDensity { .. } => {
373                write!(f, "LocalSwarmCmd::GetNetworkDensity")
374            }
375            LocalSwarmCmd::RemovePeer { peer } => {
376                write!(f, "LocalSwarmCmd::RemovePeer({peer:?})")
377            }
378        }
379    }
380}
381
382/// Debug impl for NetworkSwarmCmd to avoid printing full Record, instead only RecodKey
383/// and RecordKind are printed.
384impl Debug for NetworkSwarmCmd {
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        match self {
387            NetworkSwarmCmd::GetNetworkRecord { key, cfg, .. } => {
388                write!(
389                    f,
390                    "NetworkSwarmCmd::GetNetworkRecord {{ key: {:?}, cfg: {cfg:?}",
391                    PrettyPrintRecordKey::from(key)
392                )
393            }
394            NetworkSwarmCmd::PutRecord { record, .. } => {
395                write!(
396                    f,
397                    "NetworkSwarmCmd::PutRecord {{ key: {:?} }}",
398                    PrettyPrintRecordKey::from(&record.key)
399                )
400            }
401            NetworkSwarmCmd::PutRecordTo { peers, record, .. } => {
402                write!(
403                    f,
404                    "NetworkSwarmCmd::PutRecordTo {{ peers: {peers:?}, key: {:?} }}",
405                    PrettyPrintRecordKey::from(&record.key)
406                )
407            }
408            NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, .. } => {
409                write!(f, "NetworkSwarmCmd::GetClosestPeers {{ key: {key:?} }}")
410            }
411            NetworkSwarmCmd::SendResponse { resp, .. } => {
412                write!(f, "NetworkSwarmCmd::SendResponse resp: {resp:?}")
413            }
414            NetworkSwarmCmd::SendRequest { req, peer, .. } => {
415                write!(
416                    f,
417                    "NetworkSwarmCmd::SendRequest req: {req:?}, peer: {peer:?}"
418                )
419            }
420            NetworkSwarmCmd::DialPeer { peer, .. } => {
421                write!(f, "NetworkSwarmCmd::DialPeer peer: {peer:?}")
422            }
423        }
424    }
425}
426/// Snapshot of information kept in the Swarm's local state
427#[derive(Debug, Clone)]
428pub struct SwarmLocalState {
429    /// List of peers that we have an established connection with.
430    pub connected_peers: Vec<PeerId>,
431    /// The number of peers in the routing table
432    pub peers_in_routing_table: usize,
433    /// List of addresses the node is currently listening on
434    pub listeners: Vec<Multiaddr>,
435}
436
437impl SwarmDriver {
438    pub(crate) fn handle_network_cmd(&mut self, cmd: NetworkSwarmCmd) -> Result<(), NetworkError> {
439        let start = Instant::now();
440        let cmd_string;
441        match cmd {
442            NetworkSwarmCmd::GetNetworkRecord { key, sender, cfg } => {
443                cmd_string = "GetNetworkRecord";
444
445                for (pending_query, (inflight_record_query_key, senders, _, _)) in
446                    self.pending_get_record.iter_mut()
447                {
448                    if *inflight_record_query_key == key {
449                        debug!(
450                            "GetNetworkRecord for {:?} is already in progress. Adding sender to {pending_query:?}",
451                            PrettyPrintRecordKey::from(&key)
452                        );
453                        senders.push(sender);
454
455                        // early exit as we're already processing this query
456                        return Ok(());
457                    }
458                }
459
460                let query_id = self.swarm.behaviour_mut().kademlia.get_record(key.clone());
461
462                debug!(
463                    "Record {:?} with task {query_id:?} expected to be held by {:?}",
464                    PrettyPrintRecordKey::from(&key),
465                    cfg.expected_holders
466                );
467
468                if self
469                    .pending_get_record
470                    .insert(query_id, (key, vec![sender], Default::default(), cfg))
471                    .is_some()
472                {
473                    warn!("An existing get_record task {query_id:?} got replaced");
474                }
475                // Logging the status of the `pending_get_record`.
476                // We also interested in the status of `result_map` (which contains record) inside.
477                let total_records: usize = self
478                    .pending_get_record
479                    .iter()
480                    .map(|(_, (_, _, result_map, _))| result_map.len())
481                    .sum();
482                info!("We now have {} pending get record attempts and cached {total_records} fetched copies",
483                      self.pending_get_record.len());
484            }
485            NetworkSwarmCmd::PutRecord {
486                record,
487                sender,
488                quorum,
489            } => {
490                cmd_string = "PutRecord";
491                let record_key = PrettyPrintRecordKey::from(&record.key).into_owned();
492                debug!(
493                    "Putting record sized: {:?} to network {:?}",
494                    record.value.len(),
495                    record_key
496                );
497                let res = match self
498                    .swarm
499                    .behaviour_mut()
500                    .kademlia
501                    .put_record(record, quorum.get_kad_quorum())
502                {
503                    Ok(request_id) => {
504                        debug!("Sent record {record_key:?} to network. Request id: {request_id:?} to network");
505                        Ok(())
506                    }
507                    Err(error) => {
508                        error!("Error sending record {record_key:?} to network");
509                        Err(NetworkError::from(error))
510                    }
511                };
512
513                if let Err(err) = sender.send(res) {
514                    error!("Could not send response to PutRecord cmd: {:?}", err);
515                }
516            }
517            NetworkSwarmCmd::PutRecordTo {
518                peers,
519                record,
520                sender,
521                quorum,
522            } => {
523                cmd_string = "PutRecordTo";
524                let record_key = PrettyPrintRecordKey::from(&record.key).into_owned();
525                debug!(
526                    "Putting record {record_key:?} sized: {:?} to {peers:?}",
527                    record.value.len(),
528                );
529                let peers_count = peers.len();
530                let request_id = self.swarm.behaviour_mut().kademlia.put_record_to(
531                    record,
532                    peers.into_iter(),
533                    quorum.get_kad_quorum(),
534                );
535                info!("Sent record {record_key:?} to {peers_count:?} peers. Request id: {request_id:?}");
536
537                if let Err(err) = sender.send(Ok(())) {
538                    error!("Could not send response to PutRecordTo cmd: {:?}", err);
539                }
540            }
541            NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, sender } => {
542                cmd_string = "GetClosestPeersToAddressFromNetwork";
543                let query_id = self
544                    .swarm
545                    .behaviour_mut()
546                    .kademlia
547                    .get_closest_peers(key.as_bytes());
548                let _ = self.pending_get_closest_peers.insert(
549                    query_id,
550                    (
551                        PendingGetClosestType::FunctionCall(sender),
552                        Default::default(),
553                    ),
554                );
555            }
556
557            NetworkSwarmCmd::SendRequest {
558                req,
559                peer,
560                addrs,
561                sender,
562            } => {
563                cmd_string = "SendRequest";
564                // If `self` is the recipient, forward the request directly to our upper layer to
565                // be handled.
566                // `self` then handles the request and sends a response back again to itself.
567                if peer == *self.swarm.local_peer_id() {
568                    trace!("Sending query request to self");
569                    if let Request::Query(query) = req {
570                        self.send_event(NetworkEvent::QueryRequestReceived {
571                            query,
572                            channel: MsgResponder::FromSelf(sender),
573                        });
574                    } else {
575                        // We should never receive a Replicate request from ourselves.
576                        // we already hold this data if we do... so we can ignore
577                        trace!("Replicate cmd to self received, ignoring");
578                    }
579                } else {
580                    if let Some(addrs) = addrs {
581                        // dial the peer and send the request
582                        if addrs.0.is_empty() {
583                            info!("No addresses for peer {peer:?} to send request. This could cause dial failure if swarm could not find the peer's addrs.");
584                        } else {
585                            let opts = DialOpts::peer_id(peer)
586                                // If we have a peer ID, we can prevent simultaneous dials.
587                                .condition(PeerCondition::NotDialing)
588                                .addresses(addrs.0.clone())
589                                .build();
590
591                            match self.swarm.dial(opts) {
592                                Ok(()) => {
593                                    info!("Dialing peer {peer:?} for req_resp with address: {addrs:?}",);
594                                }
595                                Err(err) => {
596                                    error!("Failed to dial peer {peer:?} for req_resp with address: {addrs:?} error: {err}",);
597                                }
598                            }
599                        }
600                    }
601
602                    let request_id = self
603                        .swarm
604                        .behaviour_mut()
605                        .request_response
606                        .send_request(&peer, req);
607                    trace!("Sending request {request_id:?} to peer {peer:?}");
608                    let _ = self.pending_requests.insert(request_id, sender);
609
610                    trace!("Pending Requests now: {:?}", self.pending_requests.len());
611                }
612            }
613            NetworkSwarmCmd::SendResponse { resp, channel } => {
614                cmd_string = "SendResponse";
615                match channel {
616                    // If the response is for `self`, send it directly through the oneshot channel.
617                    MsgResponder::FromSelf(channel) => {
618                        trace!("Sending response to self");
619                        match channel {
620                            Some(channel) => {
621                                channel
622                                    .send(Ok((resp, None)))
623                                    .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
624                            }
625                            None => {
626                                // responses that are not awaited at the call site must be handled
627                                // separately
628                                self.send_event(NetworkEvent::ResponseReceived { res: resp });
629                            }
630                        }
631                    }
632                    MsgResponder::FromPeer(channel) => {
633                        self.swarm
634                            .behaviour_mut()
635                            .request_response
636                            .send_response(channel, resp)
637                            .map_err(NetworkError::OutgoingResponseDropped)?;
638                    }
639                }
640            }
641            NetworkSwarmCmd::DialPeer { peer, addrs } => {
642                cmd_string = "DialPeer";
643                let opts = DialOpts::peer_id(peer)
644                    // If we have a peer ID, we can prevent simultaneous dials.
645                    .condition(PeerCondition::NotDialing)
646                    .addresses(addrs.0.clone())
647                    .build();
648
649                match self.swarm.dial(opts) {
650                    Ok(()) => {
651                        info!("Manual dialing peer {peer:?} with address: {addrs:?}",);
652                    }
653                    Err(err) => {
654                        error!("Failed to manual dial peer {peer:?} with address: {addrs:?} error: {err}",);
655                    }
656                }
657            }
658        }
659
660        self.log_handling(cmd_string.to_string(), start.elapsed());
661
662        Ok(())
663    }
664    pub(crate) fn handle_local_cmd(&mut self, cmd: LocalSwarmCmd) -> Result<(), NetworkError> {
665        let start = Instant::now();
666        let mut cmd_string;
667        match cmd {
668            LocalSwarmCmd::TriggerIntervalReplication => {
669                cmd_string = "TriggerIntervalReplication";
670                self.try_interval_replication()?;
671            }
672            LocalSwarmCmd::GetLocalQuotingMetrics {
673                key,
674                data_type,
675                data_size,
676                sender,
677            } => {
678                cmd_string = "GetLocalQuotingMetrics";
679                let kbucket_status = self.get_kbuckets_status();
680                self.update_on_kbucket_status(&kbucket_status);
681                let (quoting_metrics, is_already_stored) = match self
682                    .swarm
683                    .behaviour_mut()
684                    .kademlia
685                    .store_mut()
686                    .quoting_metrics(
687                        &key,
688                        data_type,
689                        data_size,
690                        Some(kbucket_status.estimated_network_size as u64),
691                    ) {
692                    Ok(res) => res,
693                    Err(err) => {
694                        let _res = sender.send(Err(err));
695                        return Ok(());
696                    }
697                };
698                self.record_metrics(Marker::QuotingMetrics {
699                    quoting_metrics: &quoting_metrics,
700                });
701
702                // To avoid sending entire list to client, sending those that:
703                //     closer than the CLOSE_GROUP_SIZEth closest node to the target
704                let mut bad_nodes: Vec<_> = self
705                    .bad_nodes
706                    .iter()
707                    .filter_map(|(peer_id, (_issue_list, is_bad))| {
708                        if *is_bad {
709                            Some(NetworkAddress::from(*peer_id))
710                        } else {
711                            None
712                        }
713                    })
714                    .collect();
715
716                // List is ordered already, hence the last one is always the one wanted
717                let kbucket_key = NetworkAddress::from(&key).as_kbucket_key();
718                let closest_peers: Vec<_> = self
719                    .swarm
720                    .behaviour_mut()
721                    .kademlia
722                    .get_closest_local_peers(&kbucket_key)
723                    .map(|peer| peer.into_preimage())
724                    .take(CLOSE_GROUP_SIZE)
725                    .collect();
726                // In case of not enough clsest_peers, send the entire list
727                if closest_peers.len() >= CLOSE_GROUP_SIZE {
728                    let boundary_peer = closest_peers[CLOSE_GROUP_SIZE - 1];
729                    let key_address = NetworkAddress::from(&key);
730                    let boundary_distance =
731                        key_address.distance(&NetworkAddress::from(boundary_peer));
732                    bad_nodes
733                        .retain(|peer_addr| key_address.distance(peer_addr) < boundary_distance);
734                }
735
736                let _res = sender.send(Ok((quoting_metrics, is_already_stored)));
737            }
738            LocalSwarmCmd::PaymentReceived => {
739                cmd_string = "PaymentReceived";
740                self.swarm
741                    .behaviour_mut()
742                    .kademlia
743                    .store_mut()
744                    .payment_received();
745            }
746            LocalSwarmCmd::GetLocalRecord { key, sender } => {
747                cmd_string = "GetLocalRecord";
748                let record = self
749                    .swarm
750                    .behaviour_mut()
751                    .kademlia
752                    .store_mut()
753                    .get(&key)
754                    .map(|rec| rec.into_owned());
755                let _ = sender.send(record);
756            }
757
758            LocalSwarmCmd::PutLocalRecord {
759                record,
760                is_client_put,
761            } => {
762                cmd_string = "PutLocalRecord";
763                let key = record.key.clone();
764                let record_key = PrettyPrintRecordKey::from(&key);
765
766                let record_type = match RecordHeader::from_record(&record) {
767                    Ok(record_header) => {
768                        match record_header.kind {
769                            RecordKind::DataOnly(DataTypes::Chunk) => ValidationType::Chunk,
770                            RecordKind::DataOnly(_) => {
771                                let content_hash = XorName::from_content(&record.value);
772                                ValidationType::NonChunk(content_hash)
773                            }
774                            RecordKind::DataWithPayment(_) => {
775                                error!("Record {record_key:?} with payment shall not be stored locally.");
776                                return Err(NetworkError::InCorrectRecordHeader);
777                            }
778                        }
779                    }
780                    Err(err) => {
781                        error!("For record {record_key:?}, failed to parse record_header {err:?}");
782                        return Err(NetworkError::InCorrectRecordHeader);
783                    }
784                };
785
786                let result = self
787                    .swarm
788                    .behaviour_mut()
789                    .kademlia
790                    .store_mut()
791                    .put_verified(record, record_type.clone(), is_client_put);
792
793                match result {
794                    Ok(_) => {
795                        // `replication_fetcher.farthest_acceptable_distance` shall only get
796                        // shrinked, instead of expanding, even with more nodes joined to share
797                        // the responsibility. Hence no need to reset it.
798                        // Also, as `record_store` is `prune 1 on 1 success put`, which means
799                        // once capacity reached max_records, there is only chance of rising slowly.
800                        // Due to the async/parrellel handling in replication_fetcher & record_store.
801                    }
802                    Err(StoreError::MaxRecords) => {
803                        // In case the capacity reaches full, restrict replication_fetcher to
804                        // only fetch entries not farther than the current farthest record
805                        let farthest = self
806                            .swarm
807                            .behaviour_mut()
808                            .kademlia
809                            .store_mut()
810                            .get_farthest()?;
811                        self.replication_fetcher.set_farthest_on_full(farthest);
812                    }
813                    Err(_) => {
814                        // Nothing special to do for these errors,
815                        // All error cases are further logged and bubbled up below
816                    }
817                }
818
819                // No matter storing the record succeeded or not,
820                // the entry shall be removed from the `replication_fetcher`.
821                // In case of local store error, re-attempt will be carried out
822                // within the next replication round.
823                let new_keys_to_fetch = self
824                    .replication_fetcher
825                    .notify_about_new_put(key.clone(), record_type);
826
827                if !new_keys_to_fetch.is_empty() {
828                    self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
829                }
830
831                // The record_store will prune far records and setup a `distance range`,
832                // once reached the `max_records` cap.
833                if let Some(distance) = self
834                    .swarm
835                    .behaviour_mut()
836                    .kademlia
837                    .store_mut()
838                    .get_farthest_replication_distance()?
839                {
840                    self.replication_fetcher
841                        .set_replication_distance_range(distance);
842                }
843
844                if let Err(err) = result {
845                    error!("Can't store verified record {record_key:?} locally: {err:?}");
846                    cmd_string = "PutLocalRecord error";
847                    self.log_handling(cmd_string.to_string(), start.elapsed());
848                    return Err(err.into());
849                };
850            }
851            LocalSwarmCmd::AddLocalRecordAsStored {
852                key,
853                record_type,
854                data_type,
855            } => {
856                cmd_string = "AddLocalRecordAsStored";
857                self.swarm
858                    .behaviour_mut()
859                    .kademlia
860                    .store_mut()
861                    .mark_as_stored(key, record_type, data_type);
862                // Reset counter on any success HDD write.
863                self.hard_disk_write_error = 0;
864            }
865            LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
866                info!("Removing Record locally, for {key:?}");
867                cmd_string = "RemoveFailedLocalRecord";
868                self.swarm.behaviour_mut().kademlia.store_mut().remove(&key);
869                self.hard_disk_write_error = self.hard_disk_write_error.saturating_add(1);
870                // When there is certain amount of continuous HDD write error,
871                // the hard disk is considered as full, and the node shall be terminated.
872                if self.hard_disk_write_error > MAX_CONTINUOUS_HDD_WRITE_ERROR {
873                    self.send_event(NetworkEvent::TerminateNode {
874                        reason: TerminateNodeReason::HardDiskWriteError,
875                    });
876                }
877            }
878            LocalSwarmCmd::RecordStoreHasKey { key, sender } => {
879                cmd_string = "RecordStoreHasKey";
880                let has_key = self
881                    .swarm
882                    .behaviour_mut()
883                    .kademlia
884                    .store_mut()
885                    .contains(&key);
886                let _ = sender.send(has_key);
887            }
888            LocalSwarmCmd::GetAllLocalRecordAddresses { sender } => {
889                cmd_string = "GetAllLocalRecordAddresses";
890                #[allow(clippy::mutable_key_type)] // for the Bytes in NetworkAddress
891                let addresses = self
892                    .swarm
893                    .behaviour_mut()
894                    .kademlia
895                    .store_mut()
896                    .record_addresses();
897                let _ = sender.send(addresses);
898            }
899            LocalSwarmCmd::GetKBuckets { sender } => {
900                cmd_string = "GetKBuckets";
901                let mut ilog2_kbuckets = BTreeMap::new();
902                for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
903                    let range = kbucket.range();
904                    if let Some(distance) = range.0.ilog2() {
905                        let peers_in_kbucket = kbucket
906                            .iter()
907                            .map(|peer_entry| peer_entry.node.key.into_preimage())
908                            .collect::<Vec<PeerId>>();
909                        let _ = ilog2_kbuckets.insert(distance, peers_in_kbucket);
910                    } else {
911                        // This shall never happen.
912                        error!("bucket is ourself ???!!!");
913                    }
914                }
915                let _ = sender.send(ilog2_kbuckets);
916            }
917            LocalSwarmCmd::GetPeersWithMultiaddr { sender } => {
918                cmd_string = "GetPeersWithMultiAddr";
919                let mut result: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
920                for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
921                    let peers_in_kbucket = kbucket
922                        .iter()
923                        .map(|peer_entry| {
924                            (
925                                peer_entry.node.key.into_preimage(),
926                                peer_entry.node.value.clone().into_vec(),
927                            )
928                        })
929                        .collect::<Vec<(PeerId, Vec<Multiaddr>)>>();
930                    result.extend(peers_in_kbucket);
931                }
932                let _ = sender.send(result);
933            }
934            LocalSwarmCmd::GetCloseLocalPeersToTarget {
935                key,
936                num_of_peers,
937                sender,
938            } => {
939                cmd_string = "GetCloseLocalPeersToTarget";
940                let closest_peers = self.get_closest_local_peers_to_target(&key, num_of_peers);
941
942                let _ = sender.send(closest_peers);
943            }
944            LocalSwarmCmd::GetClosestKLocalPeers { sender } => {
945                cmd_string = "GetClosestKLocalPeers";
946                let _ = sender.send(self.get_closest_k_value_local_peers());
947            }
948            LocalSwarmCmd::GetSwarmLocalState(sender) => {
949                cmd_string = "GetSwarmLocalState";
950                let current_state = SwarmLocalState {
951                    connected_peers: self.swarm.connected_peers().cloned().collect(),
952                    peers_in_routing_table: self.peers_in_rt,
953                    listeners: self.swarm.listeners().cloned().collect(),
954                };
955
956                sender
957                    .send(current_state)
958                    .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
959            }
960            LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
961                cmd_string = "AddPeerToBlockList";
962                self.swarm.behaviour_mut().blocklist.block_peer(peer_id);
963            }
964            LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
965                cmd_string = "RecordNodeIssues";
966                self.record_node_issue(peer_id, issue);
967            }
968            LocalSwarmCmd::IsPeerShunned { target, sender } => {
969                cmd_string = "IsPeerInTrouble";
970                let is_bad = if let Some(peer_id) = target.as_peer_id() {
971                    if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
972                        *is_bad
973                    } else {
974                        false
975                    }
976                } else {
977                    false
978                };
979                let _ = sender.send(is_bad);
980            }
981            LocalSwarmCmd::QuoteVerification { quotes } => {
982                cmd_string = "QuoteVerification";
983                for (peer_id, quote) in quotes {
984                    // Do nothing if already being bad
985                    if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
986                        if *is_bad {
987                            continue;
988                        }
989                    }
990                    self.verify_peer_quote(peer_id, quote);
991                }
992            }
993            LocalSwarmCmd::FetchCompleted((key, record_type)) => {
994                info!(
995                    "Fetch of {record_type:?} {:?} early completed, may have fetched an old version of the record.",
996                    PrettyPrintRecordKey::from(&key)
997                );
998                cmd_string = "FetchCompleted";
999                let new_keys_to_fetch = self
1000                    .replication_fetcher
1001                    .notify_fetch_early_completed(key, record_type);
1002                if !new_keys_to_fetch.is_empty() {
1003                    self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
1004                }
1005            }
1006            LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
1007                cmd_string = "TriggerIrrelevantRecordCleanup";
1008                self.swarm
1009                    .behaviour_mut()
1010                    .kademlia
1011                    .store_mut()
1012                    .cleanup_irrelevant_records();
1013            }
1014            LocalSwarmCmd::AddNetworkDensitySample { distance } => {
1015                cmd_string = "AddNetworkDensitySample";
1016                self.network_density_samples.add(distance);
1017            }
1018            LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
1019                cmd_string = "NotifyPeerScores";
1020                self.replication_fetcher.add_peer_scores(peer_scores);
1021            }
1022            LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
1023                cmd_string = "AddFreshReplicateRecords";
1024                let _ = self.add_keys_to_replication_fetcher(holder, keys, true);
1025            }
1026            LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
1027                cmd_string = "NotifyPeerVersion";
1028                self.record_node_version(peer, version);
1029            }
1030            LocalSwarmCmd::GetNetworkDensity { sender } => {
1031                cmd_string = "GetNetworkDensity";
1032                let density = self
1033                    .swarm
1034                    .behaviour_mut()
1035                    .kademlia
1036                    .store_mut()
1037                    .get_farthest_replication_distance()
1038                    .unwrap_or_default();
1039                let _ = sender.send(density);
1040            }
1041            LocalSwarmCmd::RemovePeer { peer } => {
1042                cmd_string = "RemovePeer";
1043                if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer) {
1044                    self.update_on_peer_removal(*dead_peer.node.key.preimage());
1045                }
1046            }
1047        }
1048
1049        self.log_handling(cmd_string.to_string(), start.elapsed());
1050
1051        Ok(())
1052    }
1053
1054    fn record_node_version(&mut self, peer_id: PeerId, version: String) {
1055        let _ = self.peers_version.insert(peer_id, version);
1056    }
1057
1058    pub(crate) fn record_node_issue(&mut self, peer_id: PeerId, issue: NodeIssue) {
1059        info!("Peer {peer_id:?} is reported as having issue {issue:?}");
1060        let (issue_vec, is_bad) = self.bad_nodes.entry(peer_id).or_default();
1061        let mut new_bad_behaviour = None;
1062        let mut is_connection_issue = false;
1063
1064        // If being considered as bad already, skip certain operations
1065        if !(*is_bad) {
1066            // Remove outdated entries
1067            issue_vec.retain(|(_, timestamp)| timestamp.elapsed().as_secs() < 300);
1068
1069            // check if vec is already 10 long, if so, remove the oldest issue
1070            // we only track 10 issues to avoid mem leaks
1071            if issue_vec.len() == 10 {
1072                issue_vec.remove(0);
1073            }
1074
1075            // To avoid being too sensitive, only consider as a new issue
1076            // when after certain while since the last one
1077            let is_new_issue = if let Some((_issue, timestamp)) = issue_vec.last() {
1078                timestamp.elapsed().as_secs() > 10
1079            } else {
1080                true
1081            };
1082
1083            if is_new_issue {
1084                issue_vec.push((issue, Instant::now()));
1085            } else {
1086                return;
1087            }
1088
1089            // Only consider candidate as a bad node when:
1090            //   accumulated THREE same kind issues within certain period
1091            for (issue, _timestamp) in issue_vec.iter() {
1092                let issue_counts = issue_vec
1093                    .iter()
1094                    .filter(|(i, _timestamp)| *issue == *i)
1095                    .count();
1096                if issue_counts >= 3 {
1097                    // If it is a connection issue, we don't need to consider it as a bad node
1098                    if matches!(issue, NodeIssue::ConnectionIssue) {
1099                        is_connection_issue = true;
1100                    }
1101                    // TODO: disable black_list currently.
1102                    //       re-enable once got more statistics from large scaled network
1103                    // else {
1104                    //     *is_bad = true;
1105                    // }
1106                    new_bad_behaviour = Some(issue.clone());
1107                    info!("Peer {peer_id:?} accumulated {issue_counts} times of issue {issue:?}. Consider it as a bad node now.");
1108                    // Once a bad behaviour detected, no point to continue
1109                    break;
1110                }
1111            }
1112        }
1113
1114        // Give the faulty connection node more chances by removing the issue from the list. It is still evicted from
1115        // the routing table.
1116        if is_connection_issue {
1117            issue_vec.retain(|(issue, _timestamp)| !matches!(issue, NodeIssue::ConnectionIssue));
1118            info!("Evicting bad peer {peer_id:?} due to connection issue from RT.");
1119            if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
1120                self.update_on_peer_removal(*dead_peer.node.key.preimage());
1121            }
1122            return;
1123        }
1124
1125        if *is_bad {
1126            info!("Evicting bad peer {peer_id:?} from RT.");
1127            let addrs = if let Some(dead_peer) =
1128                self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id)
1129            {
1130                self.update_on_peer_removal(*dead_peer.node.key.preimage());
1131                Addresses(dead_peer.node.value.into_vec())
1132            } else {
1133                Addresses(Vec::new())
1134            };
1135
1136            if let Some(bad_behaviour) = new_bad_behaviour {
1137                // inform the bad node about it and add to the blocklist after that (not for connection issues)
1138                self.record_metrics(Marker::PeerConsideredAsBad { bad_peer: &peer_id });
1139
1140                warn!("Peer {peer_id:?} is considered as bad due to {bad_behaviour:?}. Informing the peer and adding to blocklist.");
1141                // response handling
1142                let (tx, rx) = oneshot::channel();
1143                let local_swarm_cmd_sender = self.local_cmd_sender.clone();
1144                tokio::spawn(async move {
1145                    match rx.await {
1146                        Ok(result) => {
1147                            debug!("Got response for Cmd::PeerConsideredAsBad from {peer_id:?} {result:?}");
1148                            if let Err(err) = local_swarm_cmd_sender
1149                                .send(LocalSwarmCmd::AddPeerToBlockList { peer_id })
1150                                .await
1151                            {
1152                                error!("SwarmDriver failed to send LocalSwarmCmd: {err}");
1153                            }
1154                        }
1155                        Err(err) => {
1156                            error!("Failed to get response from one shot channel for Cmd::PeerConsideredAsBad : {err:?}");
1157                        }
1158                    }
1159                });
1160
1161                // request
1162                let request = Request::Cmd(Cmd::PeerConsideredAsBad {
1163                    detected_by: NetworkAddress::from(self.self_peer_id),
1164                    bad_peer: NetworkAddress::from(peer_id),
1165                    bad_behaviour: bad_behaviour.to_string(),
1166                });
1167                self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1168                    req: request,
1169                    addrs: Some(addrs),
1170                    peer: peer_id,
1171                    sender: Some(tx),
1172                });
1173            }
1174        }
1175    }
1176
1177    fn verify_peer_quote(&mut self, peer_id: PeerId, quote: PaymentQuote) {
1178        if let Some(history_quote) = self.quotes_history.get(&peer_id) {
1179            if !history_quote.historical_verify(&quote) {
1180                info!("From {peer_id:?}, detected a bad quote {quote:?} against history_quote {history_quote:?}");
1181                self.record_node_issue(peer_id, NodeIssue::BadQuoting);
1182                return;
1183            }
1184
1185            if history_quote.is_newer_than(&quote) {
1186                return;
1187            }
1188        }
1189
1190        let _ = self.quotes_history.insert(peer_id, quote);
1191    }
1192
1193    fn try_interval_replication(&mut self) -> Result<()> {
1194        // Add a last_replication field to track the last time replication was performed
1195        if let Some(last_replication) = self.last_replication {
1196            if last_replication.elapsed() < MIN_REPLICATION_INTERVAL_S {
1197                info!("Skipping replication as minimum interval hasn't elapsed");
1198                return Ok(());
1199            }
1200        }
1201        // Store the current time as the last replication time
1202        self.last_replication = Some(Instant::now());
1203
1204        let self_addr = NetworkAddress::from(self.self_peer_id);
1205        let mut replicate_targets = self.get_replicate_candidates(&self_addr)?;
1206
1207        let now = Instant::now();
1208        self.replication_targets
1209            .retain(|_peer_id, timestamp| *timestamp > now);
1210        // Only carry out replication to peer that not replicated to it recently
1211        replicate_targets
1212            .retain(|(peer_id, _addresses)| !self.replication_targets.contains_key(peer_id));
1213        if replicate_targets.is_empty() {
1214            return Ok(());
1215        }
1216
1217        let all_records: Vec<_> = self
1218            .swarm
1219            .behaviour_mut()
1220            .kademlia
1221            .store_mut()
1222            .record_addresses_ref()?
1223            .values()
1224            .cloned()
1225            .collect();
1226
1227        if !all_records.is_empty() {
1228            debug!(
1229                "Sending a replication list of {} keys to {replicate_targets:?} ",
1230                all_records.len()
1231            );
1232            let request = Request::Cmd(Cmd::Replicate {
1233                holder: NetworkAddress::from(self.self_peer_id),
1234                keys: all_records
1235                    .into_iter()
1236                    .map(|(addr, val_type, _data_type)| (addr, val_type))
1237                    .collect(),
1238            });
1239            for (peer_id, _addrs) in replicate_targets {
1240                self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1241                    req: request.clone(),
1242                    peer: peer_id,
1243                    // replicate targets are part of the RT, so no need to dial them manually.
1244                    addrs: None,
1245                    sender: None,
1246                });
1247
1248                let _ = self
1249                    .replication_targets
1250                    .insert(peer_id, now + REPLICATION_TIMEOUT);
1251            }
1252        }
1253
1254        Ok(())
1255    }
1256
1257    // Replies with in-range replicate candidates
1258    // Fall back to expected_candidates peers if range is too narrow.
1259    // Note that:
1260    //   * For general replication, replicate candidates shall be closest to self, but with wider range
1261    //   * For replicate fresh records, the replicate candidates shall be the closest to data
1262    pub(crate) fn get_replicate_candidates(
1263        &mut self,
1264        target: &NetworkAddress,
1265    ) -> Result<Vec<(PeerId, Addresses)>> {
1266        let is_periodic_replicate = target.as_peer_id().is_some();
1267        let expected_candidates = if is_periodic_replicate {
1268            CLOSE_GROUP_SIZE * 2
1269        } else {
1270            CLOSE_GROUP_SIZE
1271        };
1272
1273        // get closest peers from buckets, sorted by increasing distance to the target
1274        let closest_k_peers = self.get_closest_local_peers_to_target(target, K_VALUE.get());
1275
1276        if let Some(responsible_range) = self
1277            .swarm
1278            .behaviour_mut()
1279            .kademlia
1280            .store_mut()
1281            .get_farthest_replication_distance()?
1282        {
1283            let peers_in_range = get_peers_in_range(&closest_k_peers, target, responsible_range);
1284
1285            if peers_in_range.len() >= expected_candidates {
1286                return Ok(peers_in_range);
1287            }
1288        }
1289
1290        // In case the range is too narrow, fall back to at least expected_candidates peers.
1291        Ok(closest_k_peers
1292            .iter()
1293            .take(expected_candidates)
1294            .cloned()
1295            .collect())
1296    }
1297}
1298
1299/// Returns the nodes that within the defined distance.
1300fn get_peers_in_range(
1301    peers: &[(PeerId, Addresses)],
1302    address: &NetworkAddress,
1303    range: Distance,
1304) -> Vec<(PeerId, Addresses)> {
1305    peers
1306        .iter()
1307        .filter_map(|(peer_id, addresses)| {
1308            if address.distance(&NetworkAddress::from(*peer_id)) <= range {
1309                Some((*peer_id, addresses.clone()))
1310            } else {
1311                None
1312            }
1313        })
1314        .collect()
1315}