ant_networking/
cmd.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::{
10    config::GetRecordCfg,
11    driver::{PendingGetClosestType, SwarmDriver},
12    error::{NetworkError, Result},
13    event::TerminateNodeReason,
14    log_markers::Marker,
15    Addresses, GetRecordError, MsgResponder, NetworkEvent, ResponseQuorum, CLOSE_GROUP_SIZE,
16};
17use ant_evm::{PaymentQuote, QuotingMetrics};
18use ant_protocol::{
19    messages::{Cmd, Request, Response},
20    storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
21    NetworkAddress, PrettyPrintRecordKey,
22};
23use libp2p::{
24    kad::{
25        store::{Error as StoreError, RecordStore},
26        KBucketDistance as Distance, Record, RecordKey, K_VALUE,
27    },
28    swarm::dial_opts::{DialOpts, PeerCondition},
29    Multiaddr, PeerId,
30};
31use std::{
32    collections::{BTreeMap, HashMap},
33    fmt::Debug,
34    time::Duration,
35};
36use tokio::sync::oneshot;
37use xor_name::XorName;
38
39use crate::time::Instant;
40
41const MAX_CONTINUOUS_HDD_WRITE_ERROR: usize = 5;
42
43// Shall be synced with `ant_node::PERIODIC_REPLICATION_INTERVAL_MAX_S`
44const REPLICATION_TIMEOUT: Duration = Duration::from_secs(45);
45
46// Throttles replication to at most once every 30 seconds
47const MIN_REPLICATION_INTERVAL_S: Duration = Duration::from_secs(30);
48
49#[derive(Debug, Eq, PartialEq, Clone)]
50pub enum NodeIssue {
51    /// Some connections might be considered to be critical and should be tracked.
52    ConnectionIssue,
53    /// Data Replication failed
54    ReplicationFailure,
55    /// Close nodes have reported this peer as bad
56    CloseNodesShunning,
57    /// Provided a bad quote
58    BadQuoting,
59    /// Peer failed to pass the chunk proof verification
60    FailedChunkProofCheck,
61}
62
63impl std::fmt::Display for NodeIssue {
64    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
65        match self {
66            NodeIssue::ConnectionIssue => write!(f, "CriticalConnectionIssue"),
67            NodeIssue::ReplicationFailure => write!(f, "ReplicationFailure"),
68            NodeIssue::CloseNodesShunning => write!(f, "CloseNodesShunning"),
69            NodeIssue::BadQuoting => write!(f, "BadQuoting"),
70            NodeIssue::FailedChunkProofCheck => write!(f, "FailedChunkProofCheck"),
71        }
72    }
73}
74
75/// Commands to send to the Swarm
76pub enum LocalSwarmCmd {
77    /// Get a list of all peers in local RT, with correspondent Multiaddr info attached as well.
78    GetPeersWithMultiaddr {
79        sender: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
80    },
81    /// Get a map where each key is the ilog2 distance of that Kbucket
82    /// and each value is a vector of peers in that bucket.
83    GetKBuckets {
84        sender: oneshot::Sender<BTreeMap<u32, Vec<PeerId>>>,
85    },
86    // Returns up to K_VALUE peers from all the k-buckets from the local Routing Table.
87    // And our PeerId as well.
88    GetClosestKLocalPeers {
89        sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
90    },
91    // Get X closest peers to target from the local RoutingTable, self not included
92    GetCloseLocalPeersToTarget {
93        key: NetworkAddress,
94        num_of_peers: usize,
95        sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
96    },
97    GetSwarmLocalState(oneshot::Sender<SwarmLocalState>),
98    /// Check if the local RecordStore contains the provided key
99    RecordStoreHasKey {
100        key: RecordKey,
101        sender: oneshot::Sender<Result<bool>>,
102    },
103    /// Get the Addresses of all the Records held locally
104    GetAllLocalRecordAddresses {
105        sender: oneshot::Sender<Result<HashMap<NetworkAddress, ValidationType>>>,
106    },
107    /// Get data from the local RecordStore
108    GetLocalRecord {
109        key: RecordKey,
110        sender: oneshot::Sender<Option<Record>>,
111    },
112    /// GetLocalQuotingMetrics for this node
113    /// Returns the quoting metrics and whether the record at `key` is already stored locally
114    GetLocalQuotingMetrics {
115        key: RecordKey,
116        data_type: u32,
117        data_size: usize,
118        sender: oneshot::Sender<Result<(QuotingMetrics, bool)>>,
119    },
120    /// Notify the node received a payment.
121    PaymentReceived,
122    /// Put record to the local RecordStore
123    PutLocalRecord {
124        record: Record,
125        is_client_put: bool,
126    },
127    /// Remove a local record from the RecordStore
128    /// Typically because the write failed
129    RemoveFailedLocalRecord {
130        key: RecordKey,
131    },
132    /// Add a local record to the RecordStore's HashSet of stored records
133    /// This should be done after the record has been stored to disk
134    AddLocalRecordAsStored {
135        key: RecordKey,
136        record_type: ValidationType,
137        data_type: DataTypes,
138    },
139    /// Add a peer to the blocklist
140    AddPeerToBlockList {
141        peer_id: PeerId,
142    },
143    /// Notify whether peer is in trouble
144    RecordNodeIssue {
145        peer_id: PeerId,
146        issue: NodeIssue,
147    },
148    // Whether peer is considered as `in trouble` by self
149    IsPeerShunned {
150        target: NetworkAddress,
151        sender: oneshot::Sender<bool>,
152    },
153    // Quote verification agaisnt historical collected quotes
154    QuoteVerification {
155        quotes: Vec<(PeerId, PaymentQuote)>,
156    },
157    // Notify a fetch completion
158    FetchCompleted((RecordKey, ValidationType)),
159    /// Triggers interval repliation
160    /// NOTE: This does result in outgoing messages, but is produced locally
161    TriggerIntervalReplication,
162    /// Triggers unrelevant record cleanup
163    TriggerIrrelevantRecordCleanup,
164    /// Add a network density sample
165    AddNetworkDensitySample {
166        distance: Distance,
167    },
168    /// Send peer scores (collected from storage challenge) to replication_fetcher
169    NotifyPeerScores {
170        peer_scores: Vec<(PeerId, bool)>,
171    },
172    /// Add fresh replicate records into replication_fetcher
173    AddFreshReplicateRecords {
174        holder: NetworkAddress,
175        keys: Vec<(NetworkAddress, ValidationType)>,
176    },
177    /// Notify a fetched peer's version
178    NotifyPeerVersion {
179        peer: PeerId,
180        version: String,
181    },
182}
183
184/// Commands to send to the Swarm
185pub enum NetworkSwarmCmd {
186    // Get closest peers from the network
187    GetClosestPeersToAddressFromNetwork {
188        key: NetworkAddress,
189        sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
190    },
191
192    // Send Request to the PeerId.
193    SendRequest {
194        req: Request,
195        peer: PeerId,
196        /// If the address is provided, we will try to perform a dial before sending the request.
197        addrs: Option<Addresses>,
198
199        // If a `sender` is provided, the requesting node will await for a `Response` from the
200        // Peer. The result is then returned at the call site.
201        //
202        // If a `sender` is not provided, the requesting node will not wait for the Peer's
203        // response. Instead we trigger a `NetworkEvent::ResponseReceived` which calls the common
204        // `response_handler`
205        sender: Option<oneshot::Sender<Result<Response>>>,
206    },
207    SendResponse {
208        resp: Response,
209        channel: MsgResponder,
210    },
211
212    /// Get Record from the Kad network
213    GetNetworkRecord {
214        key: RecordKey,
215        sender: oneshot::Sender<std::result::Result<Record, GetRecordError>>,
216        cfg: GetRecordCfg,
217    },
218
219    /// Put record to network
220    PutRecord {
221        record: Record,
222        sender: oneshot::Sender<Result<()>>,
223        quorum: ResponseQuorum,
224    },
225    /// Put record to specific node
226    PutRecordTo {
227        peers: Vec<PeerId>,
228        record: Record,
229        sender: oneshot::Sender<Result<()>>,
230        quorum: ResponseQuorum,
231    },
232
233    // Attempt to dial specific peer.
234    DialPeer {
235        peer: PeerId,
236        addrs: Addresses,
237    },
238}
239
240/// Debug impl for LocalSwarmCmd to avoid printing full Record, instead only RecodKey
241/// and RecordKind are printed.
242impl Debug for LocalSwarmCmd {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        match self {
245            LocalSwarmCmd::PutLocalRecord {
246                record,
247                is_client_put,
248            } => {
249                write!(
250                    f,
251                    "LocalSwarmCmd::PutLocalRecord {{ key: {:?}, is_client_put: {is_client_put:?} }}",
252                    PrettyPrintRecordKey::from(&record.key)
253                )
254            }
255            LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
256                write!(
257                    f,
258                    "LocalSwarmCmd::RemoveFailedLocalRecord {{ key: {:?} }}",
259                    PrettyPrintRecordKey::from(key)
260                )
261            }
262            LocalSwarmCmd::AddLocalRecordAsStored {
263                key,
264                record_type,
265                data_type,
266            } => {
267                write!(
268                    f,
269                    "LocalSwarmCmd::AddLocalRecordAsStored {{ key: {:?}, record_type: {record_type:?}, data_type: {data_type:?} }}",
270                    PrettyPrintRecordKey::from(key)
271                )
272            }
273            LocalSwarmCmd::GetClosestKLocalPeers { .. } => {
274                write!(f, "LocalSwarmCmd::GetClosestKLocalPeers")
275            }
276            LocalSwarmCmd::GetCloseLocalPeersToTarget {
277                key, num_of_peers, ..
278            } => {
279                write!(
280                    f,
281                    "LocalSwarmCmd::GetCloseLocalPeersToTarget {{ key: {key:?}, num_of_peers: {num_of_peers} }}"
282                )
283            }
284            LocalSwarmCmd::GetLocalQuotingMetrics { .. } => {
285                write!(f, "LocalSwarmCmd::GetLocalQuotingMetrics")
286            }
287            LocalSwarmCmd::PaymentReceived => {
288                write!(f, "LocalSwarmCmd::PaymentReceived")
289            }
290            LocalSwarmCmd::GetLocalRecord { key, .. } => {
291                write!(
292                    f,
293                    "LocalSwarmCmd::GetLocalRecord {{ key: {:?} }}",
294                    PrettyPrintRecordKey::from(key)
295                )
296            }
297            LocalSwarmCmd::GetAllLocalRecordAddresses { .. } => {
298                write!(f, "LocalSwarmCmd::GetAllLocalRecordAddresses")
299            }
300            LocalSwarmCmd::GetPeersWithMultiaddr { .. } => {
301                write!(f, "LocalSwarmCmd::GetPeersWithMultiaddr")
302            }
303            LocalSwarmCmd::GetKBuckets { .. } => {
304                write!(f, "LocalSwarmCmd::GetKBuckets")
305            }
306            LocalSwarmCmd::GetSwarmLocalState { .. } => {
307                write!(f, "LocalSwarmCmd::GetSwarmLocalState")
308            }
309            LocalSwarmCmd::RecordStoreHasKey { key, .. } => {
310                write!(
311                    f,
312                    "LocalSwarmCmd::RecordStoreHasKey {:?}",
313                    PrettyPrintRecordKey::from(key)
314                )
315            }
316            LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
317                write!(f, "LocalSwarmCmd::AddPeerToBlockList {peer_id:?}")
318            }
319            LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
320                write!(
321                    f,
322                    "LocalSwarmCmd::SendNodeStatus peer {peer_id:?}, issue: {issue:?}"
323                )
324            }
325            LocalSwarmCmd::IsPeerShunned { target, .. } => {
326                write!(f, "LocalSwarmCmd::IsPeerInTrouble target: {target:?}")
327            }
328            LocalSwarmCmd::QuoteVerification { quotes } => {
329                write!(
330                    f,
331                    "LocalSwarmCmd::QuoteVerification of {} quotes",
332                    quotes.len()
333                )
334            }
335            LocalSwarmCmd::FetchCompleted((key, record_type)) => {
336                write!(
337                    f,
338                    "LocalSwarmCmd::FetchCompleted({record_type:?} : {:?})",
339                    PrettyPrintRecordKey::from(key)
340                )
341            }
342            LocalSwarmCmd::TriggerIntervalReplication => {
343                write!(f, "LocalSwarmCmd::TriggerIntervalReplication")
344            }
345            LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
346                write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup")
347            }
348            LocalSwarmCmd::AddNetworkDensitySample { distance } => {
349                write!(f, "LocalSwarmCmd::AddNetworkDensitySample({distance:?})")
350            }
351            LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
352                write!(f, "LocalSwarmCmd::NotifyPeerScores({peer_scores:?})")
353            }
354            LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
355                write!(
356                    f,
357                    "LocalSwarmCmd::AddFreshReplicateRecords({holder:?}, {keys:?})"
358                )
359            }
360            LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
361                write!(f, "LocalSwarmCmd::NotifyPeerVersion({peer:?}, {version:?})")
362            }
363        }
364    }
365}
366
367/// Debug impl for NetworkSwarmCmd to avoid printing full Record, instead only RecodKey
368/// and RecordKind are printed.
369impl Debug for NetworkSwarmCmd {
370    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
371        match self {
372            NetworkSwarmCmd::GetNetworkRecord { key, cfg, .. } => {
373                write!(
374                    f,
375                    "NetworkSwarmCmd::GetNetworkRecord {{ key: {:?}, cfg: {cfg:?}",
376                    PrettyPrintRecordKey::from(key)
377                )
378            }
379            NetworkSwarmCmd::PutRecord { record, .. } => {
380                write!(
381                    f,
382                    "NetworkSwarmCmd::PutRecord {{ key: {:?} }}",
383                    PrettyPrintRecordKey::from(&record.key)
384                )
385            }
386            NetworkSwarmCmd::PutRecordTo { peers, record, .. } => {
387                write!(
388                    f,
389                    "NetworkSwarmCmd::PutRecordTo {{ peers: {peers:?}, key: {:?} }}",
390                    PrettyPrintRecordKey::from(&record.key)
391                )
392            }
393            NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, .. } => {
394                write!(f, "NetworkSwarmCmd::GetClosestPeers {{ key: {key:?} }}")
395            }
396            NetworkSwarmCmd::SendResponse { resp, .. } => {
397                write!(f, "NetworkSwarmCmd::SendResponse resp: {resp:?}")
398            }
399            NetworkSwarmCmd::SendRequest { req, peer, .. } => {
400                write!(
401                    f,
402                    "NetworkSwarmCmd::SendRequest req: {req:?}, peer: {peer:?}"
403                )
404            }
405            NetworkSwarmCmd::DialPeer { peer, .. } => {
406                write!(f, "NetworkSwarmCmd::DialPeer peer: {peer:?}")
407            }
408        }
409    }
410}
411/// Snapshot of information kept in the Swarm's local state
412#[derive(Debug, Clone)]
413pub struct SwarmLocalState {
414    /// List of currently connected peers
415    pub connected_peers: Vec<PeerId>,
416    /// List of addresses the node is currently listening on
417    pub listeners: Vec<Multiaddr>,
418}
419
420impl SwarmDriver {
421    pub(crate) fn handle_network_cmd(&mut self, cmd: NetworkSwarmCmd) -> Result<(), NetworkError> {
422        let start = Instant::now();
423        let cmd_string;
424        match cmd {
425            NetworkSwarmCmd::GetNetworkRecord { key, sender, cfg } => {
426                cmd_string = "GetNetworkRecord";
427
428                for (pending_query, (inflight_record_query_key, senders, _, _)) in
429                    self.pending_get_record.iter_mut()
430                {
431                    if *inflight_record_query_key == key {
432                        debug!(
433                            "GetNetworkRecord for {:?} is already in progress. Adding sender to {pending_query:?}",
434                            PrettyPrintRecordKey::from(&key)
435                        );
436                        senders.push(sender);
437
438                        // early exit as we're already processing this query
439                        return Ok(());
440                    }
441                }
442
443                let query_id = self.swarm.behaviour_mut().kademlia.get_record(key.clone());
444
445                debug!(
446                    "Record {:?} with task {query_id:?} expected to be held by {:?}",
447                    PrettyPrintRecordKey::from(&key),
448                    cfg.expected_holders
449                );
450
451                if self
452                    .pending_get_record
453                    .insert(query_id, (key, vec![sender], Default::default(), cfg))
454                    .is_some()
455                {
456                    warn!("An existing get_record task {query_id:?} got replaced");
457                }
458                // Logging the status of the `pending_get_record`.
459                // We also interested in the status of `result_map` (which contains record) inside.
460                let total_records: usize = self
461                    .pending_get_record
462                    .iter()
463                    .map(|(_, (_, _, result_map, _))| result_map.len())
464                    .sum();
465                info!("We now have {} pending get record attempts and cached {total_records} fetched copies",
466                      self.pending_get_record.len());
467            }
468            NetworkSwarmCmd::PutRecord {
469                record,
470                sender,
471                quorum,
472            } => {
473                cmd_string = "PutRecord";
474                let record_key = PrettyPrintRecordKey::from(&record.key).into_owned();
475                debug!(
476                    "Putting record sized: {:?} to network {:?}",
477                    record.value.len(),
478                    record_key
479                );
480                let res = match self
481                    .swarm
482                    .behaviour_mut()
483                    .kademlia
484                    .put_record(record, quorum.get_kad_quorum())
485                {
486                    Ok(request_id) => {
487                        debug!("Sent record {record_key:?} to network. Request id: {request_id:?} to network");
488                        Ok(())
489                    }
490                    Err(error) => {
491                        error!("Error sending record {record_key:?} to network");
492                        Err(NetworkError::from(error))
493                    }
494                };
495
496                if let Err(err) = sender.send(res) {
497                    error!("Could not send response to PutRecord cmd: {:?}", err);
498                }
499            }
500            NetworkSwarmCmd::PutRecordTo {
501                peers,
502                record,
503                sender,
504                quorum,
505            } => {
506                cmd_string = "PutRecordTo";
507                let record_key = PrettyPrintRecordKey::from(&record.key).into_owned();
508                debug!(
509                    "Putting record {record_key:?} sized: {:?} to {peers:?}",
510                    record.value.len(),
511                );
512                let peers_count = peers.len();
513                let request_id = self.swarm.behaviour_mut().kademlia.put_record_to(
514                    record,
515                    peers.into_iter(),
516                    quorum.get_kad_quorum(),
517                );
518                info!("Sent record {record_key:?} to {peers_count:?} peers. Request id: {request_id:?}");
519
520                if let Err(err) = sender.send(Ok(())) {
521                    error!("Could not send response to PutRecordTo cmd: {:?}", err);
522                }
523            }
524            NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, sender } => {
525                cmd_string = "GetClosestPeersToAddressFromNetwork";
526                let query_id = self
527                    .swarm
528                    .behaviour_mut()
529                    .kademlia
530                    .get_closest_peers(key.as_bytes());
531                let _ = self.pending_get_closest_peers.insert(
532                    query_id,
533                    (
534                        PendingGetClosestType::FunctionCall(sender),
535                        Default::default(),
536                    ),
537                );
538            }
539
540            NetworkSwarmCmd::SendRequest {
541                req,
542                peer,
543                addrs,
544                sender,
545            } => {
546                cmd_string = "SendRequest";
547                // If `self` is the recipient, forward the request directly to our upper layer to
548                // be handled.
549                // `self` then handles the request and sends a response back again to itself.
550                if peer == *self.swarm.local_peer_id() {
551                    trace!("Sending query request to self");
552                    if let Request::Query(query) = req {
553                        self.send_event(NetworkEvent::QueryRequestReceived {
554                            query,
555                            channel: MsgResponder::FromSelf(sender),
556                        });
557                    } else {
558                        // We should never receive a Replicate request from ourselves.
559                        // we already hold this data if we do... so we can ignore
560                        trace!("Replicate cmd to self received, ignoring");
561                    }
562                } else {
563                    if let Some(addrs) = addrs {
564                        // dial the peer and send the request
565                        if addrs.0.is_empty() {
566                            info!("No addresses for peer {peer:?} to send request. This could cause dial failure if swarm could not find the peer's addrs.");
567                        } else {
568                            let opts = DialOpts::peer_id(peer)
569                                // If we have a peer ID, we can prevent simultaneous dials.
570                                .condition(PeerCondition::NotDialing)
571                                .addresses(addrs.0.clone())
572                                .build();
573
574                            match self.swarm.dial(opts) {
575                                Ok(()) => {
576                                    info!("Dialing peer {peer:?} for req_resp with address: {addrs:?}",);
577                                }
578                                Err(err) => {
579                                    error!("Failed to dial peer {peer:?} for req_resp with address: {addrs:?} error: {err}",);
580                                }
581                            }
582                        }
583                    }
584
585                    let request_id = self
586                        .swarm
587                        .behaviour_mut()
588                        .request_response
589                        .send_request(&peer, req);
590                    trace!("Sending request {request_id:?} to peer {peer:?}");
591                    let _ = self.pending_requests.insert(request_id, sender);
592
593                    trace!("Pending Requests now: {:?}", self.pending_requests.len());
594                }
595            }
596            NetworkSwarmCmd::SendResponse { resp, channel } => {
597                cmd_string = "SendResponse";
598                match channel {
599                    // If the response is for `self`, send it directly through the oneshot channel.
600                    MsgResponder::FromSelf(channel) => {
601                        trace!("Sending response to self");
602                        match channel {
603                            Some(channel) => {
604                                channel
605                                    .send(Ok(resp))
606                                    .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
607                            }
608                            None => {
609                                // responses that are not awaited at the call site must be handled
610                                // separately
611                                self.send_event(NetworkEvent::ResponseReceived { res: resp });
612                            }
613                        }
614                    }
615                    MsgResponder::FromPeer(channel) => {
616                        self.swarm
617                            .behaviour_mut()
618                            .request_response
619                            .send_response(channel, resp)
620                            .map_err(NetworkError::OutgoingResponseDropped)?;
621                    }
622                }
623            }
624            NetworkSwarmCmd::DialPeer { peer, addrs } => {
625                cmd_string = "DialPeer";
626                let opts = DialOpts::peer_id(peer)
627                    // If we have a peer ID, we can prevent simultaneous dials.
628                    .condition(PeerCondition::NotDialing)
629                    .addresses(addrs.0.clone())
630                    .build();
631
632                match self.swarm.dial(opts) {
633                    Ok(()) => {
634                        info!("Manual dialing peer {peer:?} with address: {addrs:?}",);
635                    }
636                    Err(err) => {
637                        error!("Failed to manual dial peer {peer:?} with address: {addrs:?} error: {err}",);
638                    }
639                }
640            }
641        }
642
643        self.log_handling(cmd_string.to_string(), start.elapsed());
644
645        Ok(())
646    }
647    pub(crate) fn handle_local_cmd(&mut self, cmd: LocalSwarmCmd) -> Result<(), NetworkError> {
648        let start = Instant::now();
649        let mut cmd_string;
650        match cmd {
651            LocalSwarmCmd::TriggerIntervalReplication => {
652                cmd_string = "TriggerIntervalReplication";
653                self.try_interval_replication()?;
654            }
655            LocalSwarmCmd::GetLocalQuotingMetrics {
656                key,
657                data_type,
658                data_size,
659                sender,
660            } => {
661                cmd_string = "GetLocalQuotingMetrics";
662                let kbucket_status = self.get_kbuckets_status();
663                self.update_on_kbucket_status(&kbucket_status);
664                let (quoting_metrics, is_already_stored) = match self
665                    .swarm
666                    .behaviour_mut()
667                    .kademlia
668                    .store_mut()
669                    .quoting_metrics(
670                        &key,
671                        data_type,
672                        data_size,
673                        Some(kbucket_status.estimated_network_size as u64),
674                    ) {
675                    Ok(res) => res,
676                    Err(err) => {
677                        let _res = sender.send(Err(err));
678                        return Ok(());
679                    }
680                };
681                self.record_metrics(Marker::QuotingMetrics {
682                    quoting_metrics: &quoting_metrics,
683                });
684
685                // To avoid sending entire list to client, sending those that:
686                //     closer than the CLOSE_GROUP_SIZEth closest node to the target
687                let mut bad_nodes: Vec<_> = self
688                    .bad_nodes
689                    .iter()
690                    .filter_map(|(peer_id, (_issue_list, is_bad))| {
691                        if *is_bad {
692                            Some(NetworkAddress::from_peer(*peer_id))
693                        } else {
694                            None
695                        }
696                    })
697                    .collect();
698
699                // List is ordered already, hence the last one is always the one wanted
700                let kbucket_key = NetworkAddress::from_record_key(&key).as_kbucket_key();
701                let closest_peers: Vec<_> = self
702                    .swarm
703                    .behaviour_mut()
704                    .kademlia
705                    .get_closest_local_peers(&kbucket_key)
706                    .map(|peer| peer.into_preimage())
707                    .take(CLOSE_GROUP_SIZE)
708                    .collect();
709                // In case of not enough clsest_peers, send the entire list
710                if closest_peers.len() >= CLOSE_GROUP_SIZE {
711                    let boundary_peer = closest_peers[CLOSE_GROUP_SIZE - 1];
712                    let key_address = NetworkAddress::from_record_key(&key);
713                    let boundary_distance =
714                        key_address.distance(&NetworkAddress::from_peer(boundary_peer));
715                    bad_nodes
716                        .retain(|peer_addr| key_address.distance(peer_addr) < boundary_distance);
717                }
718
719                let _res = sender.send(Ok((quoting_metrics, is_already_stored)));
720            }
721            LocalSwarmCmd::PaymentReceived => {
722                cmd_string = "PaymentReceived";
723                self.swarm
724                    .behaviour_mut()
725                    .kademlia
726                    .store_mut()
727                    .payment_received();
728            }
729            LocalSwarmCmd::GetLocalRecord { key, sender } => {
730                cmd_string = "GetLocalRecord";
731                let record = self
732                    .swarm
733                    .behaviour_mut()
734                    .kademlia
735                    .store_mut()
736                    .get(&key)
737                    .map(|rec| rec.into_owned());
738                let _ = sender.send(record);
739            }
740
741            LocalSwarmCmd::PutLocalRecord {
742                record,
743                is_client_put,
744            } => {
745                cmd_string = "PutLocalRecord";
746                let key = record.key.clone();
747                let record_key = PrettyPrintRecordKey::from(&key);
748
749                let record_type = match RecordHeader::from_record(&record) {
750                    Ok(record_header) => {
751                        match record_header.kind {
752                            RecordKind::DataOnly(DataTypes::Chunk) => ValidationType::Chunk,
753                            RecordKind::DataOnly(_) => {
754                                let content_hash = XorName::from_content(&record.value);
755                                ValidationType::NonChunk(content_hash)
756                            }
757                            RecordKind::DataWithPayment(_) => {
758                                error!("Record {record_key:?} with payment shall not be stored locally.");
759                                return Err(NetworkError::InCorrectRecordHeader);
760                            }
761                        }
762                    }
763                    Err(err) => {
764                        error!("For record {record_key:?}, failed to parse record_header {err:?}");
765                        return Err(NetworkError::InCorrectRecordHeader);
766                    }
767                };
768
769                let result = self
770                    .swarm
771                    .behaviour_mut()
772                    .kademlia
773                    .store_mut()
774                    .put_verified(record, record_type.clone(), is_client_put);
775
776                match result {
777                    Ok(_) => {
778                        // `replication_fetcher.farthest_acceptable_distance` shall only get
779                        // shrinked, instead of expanding, even with more nodes joined to share
780                        // the responsibility. Hence no need to reset it.
781                        // Also, as `record_store` is `prune 1 on 1 success put`, which means
782                        // once capacity reached max_records, there is only chance of rising slowly.
783                        // Due to the async/parrellel handling in replication_fetcher & record_store.
784                    }
785                    Err(StoreError::MaxRecords) => {
786                        // In case the capacity reaches full, restrict replication_fetcher to
787                        // only fetch entries not farther than the current farthest record
788                        let farthest = self
789                            .swarm
790                            .behaviour_mut()
791                            .kademlia
792                            .store_mut()
793                            .get_farthest()?;
794                        self.replication_fetcher.set_farthest_on_full(farthest);
795                    }
796                    Err(_) => {
797                        // Nothing special to do for these errors,
798                        // All error cases are further logged and bubbled up below
799                    }
800                }
801
802                // No matter storing the record succeeded or not,
803                // the entry shall be removed from the `replication_fetcher`.
804                // In case of local store error, re-attempt will be carried out
805                // within the next replication round.
806                let new_keys_to_fetch = self
807                    .replication_fetcher
808                    .notify_about_new_put(key.clone(), record_type);
809
810                if !new_keys_to_fetch.is_empty() {
811                    self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
812                }
813
814                // The record_store will prune far records and setup a `distance range`,
815                // once reached the `max_records` cap.
816                if let Some(distance) = self
817                    .swarm
818                    .behaviour_mut()
819                    .kademlia
820                    .store_mut()
821                    .get_farthest_replication_distance()?
822                {
823                    self.replication_fetcher
824                        .set_replication_distance_range(distance);
825                }
826
827                if let Err(err) = result {
828                    error!("Can't store verified record {record_key:?} locally: {err:?}");
829                    cmd_string = "PutLocalRecord error";
830                    self.log_handling(cmd_string.to_string(), start.elapsed());
831                    return Err(err.into());
832                };
833            }
834            LocalSwarmCmd::AddLocalRecordAsStored {
835                key,
836                record_type,
837                data_type,
838            } => {
839                cmd_string = "AddLocalRecordAsStored";
840                self.swarm
841                    .behaviour_mut()
842                    .kademlia
843                    .store_mut()
844                    .mark_as_stored(key, record_type, data_type);
845                // Reset counter on any success HDD write.
846                self.hard_disk_write_error = 0;
847            }
848            LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
849                info!("Removing Record locally, for {key:?}");
850                cmd_string = "RemoveFailedLocalRecord";
851                self.swarm.behaviour_mut().kademlia.store_mut().remove(&key);
852                self.hard_disk_write_error = self.hard_disk_write_error.saturating_add(1);
853                // When there is certain amount of continuous HDD write error,
854                // the hard disk is considered as full, and the node shall be terminated.
855                if self.hard_disk_write_error > MAX_CONTINUOUS_HDD_WRITE_ERROR {
856                    self.send_event(NetworkEvent::TerminateNode {
857                        reason: TerminateNodeReason::HardDiskWriteError,
858                    });
859                }
860            }
861            LocalSwarmCmd::RecordStoreHasKey { key, sender } => {
862                cmd_string = "RecordStoreHasKey";
863                let has_key = self
864                    .swarm
865                    .behaviour_mut()
866                    .kademlia
867                    .store_mut()
868                    .contains(&key);
869                let _ = sender.send(has_key);
870            }
871            LocalSwarmCmd::GetAllLocalRecordAddresses { sender } => {
872                cmd_string = "GetAllLocalRecordAddresses";
873                #[allow(clippy::mutable_key_type)] // for the Bytes in NetworkAddress
874                let addresses = self
875                    .swarm
876                    .behaviour_mut()
877                    .kademlia
878                    .store_mut()
879                    .record_addresses();
880                let _ = sender.send(addresses);
881            }
882            LocalSwarmCmd::GetKBuckets { sender } => {
883                cmd_string = "GetKBuckets";
884                let mut ilog2_kbuckets = BTreeMap::new();
885                for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
886                    let range = kbucket.range();
887                    if let Some(distance) = range.0.ilog2() {
888                        let peers_in_kbucket = kbucket
889                            .iter()
890                            .map(|peer_entry| peer_entry.node.key.into_preimage())
891                            .collect::<Vec<PeerId>>();
892                        let _ = ilog2_kbuckets.insert(distance, peers_in_kbucket);
893                    } else {
894                        // This shall never happen.
895                        error!("bucket is ourself ???!!!");
896                    }
897                }
898                let _ = sender.send(ilog2_kbuckets);
899            }
900            LocalSwarmCmd::GetPeersWithMultiaddr { sender } => {
901                cmd_string = "GetPeersWithMultiAddr";
902                let mut result: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
903                for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
904                    let peers_in_kbucket = kbucket
905                        .iter()
906                        .map(|peer_entry| {
907                            (
908                                peer_entry.node.key.into_preimage(),
909                                peer_entry.node.value.clone().into_vec(),
910                            )
911                        })
912                        .collect::<Vec<(PeerId, Vec<Multiaddr>)>>();
913                    result.extend(peers_in_kbucket);
914                }
915                let _ = sender.send(result);
916            }
917            LocalSwarmCmd::GetCloseLocalPeersToTarget {
918                key,
919                num_of_peers,
920                sender,
921            } => {
922                cmd_string = "GetCloseLocalPeersToTarget";
923                let closest_peers = self.get_closest_local_peers_to_target(&key, num_of_peers);
924
925                let _ = sender.send(closest_peers);
926            }
927            LocalSwarmCmd::GetClosestKLocalPeers { sender } => {
928                cmd_string = "GetClosestKLocalPeers";
929                let _ = sender.send(self.get_closest_k_value_local_peers());
930            }
931            LocalSwarmCmd::GetSwarmLocalState(sender) => {
932                cmd_string = "GetSwarmLocalState";
933                let current_state = SwarmLocalState {
934                    connected_peers: self.swarm.connected_peers().cloned().collect(),
935                    listeners: self.swarm.listeners().cloned().collect(),
936                };
937
938                sender
939                    .send(current_state)
940                    .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
941            }
942            LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
943                cmd_string = "AddPeerToBlockList";
944                self.swarm.behaviour_mut().blocklist.block_peer(peer_id);
945            }
946            LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
947                cmd_string = "RecordNodeIssues";
948                self.record_node_issue(peer_id, issue);
949            }
950            LocalSwarmCmd::IsPeerShunned { target, sender } => {
951                cmd_string = "IsPeerInTrouble";
952                let is_bad = if let Some(peer_id) = target.as_peer_id() {
953                    if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
954                        *is_bad
955                    } else {
956                        false
957                    }
958                } else {
959                    false
960                };
961                let _ = sender.send(is_bad);
962            }
963            LocalSwarmCmd::QuoteVerification { quotes } => {
964                cmd_string = "QuoteVerification";
965                for (peer_id, quote) in quotes {
966                    // Do nothing if already being bad
967                    if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
968                        if *is_bad {
969                            continue;
970                        }
971                    }
972                    self.verify_peer_quote(peer_id, quote);
973                }
974            }
975            LocalSwarmCmd::FetchCompleted((key, record_type)) => {
976                info!(
977                    "Fetch of {record_type:?} {:?} early completed, may have fetched an old version of the record.",
978                    PrettyPrintRecordKey::from(&key)
979                );
980                cmd_string = "FetchCompleted";
981                let new_keys_to_fetch = self
982                    .replication_fetcher
983                    .notify_fetch_early_completed(key, record_type);
984                if !new_keys_to_fetch.is_empty() {
985                    self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
986                }
987            }
988            LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
989                cmd_string = "TriggerIrrelevantRecordCleanup";
990                self.swarm
991                    .behaviour_mut()
992                    .kademlia
993                    .store_mut()
994                    .cleanup_irrelevant_records();
995            }
996            LocalSwarmCmd::AddNetworkDensitySample { distance } => {
997                cmd_string = "AddNetworkDensitySample";
998                self.network_density_samples.add(distance);
999            }
1000            LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
1001                cmd_string = "NotifyPeerScores";
1002                self.replication_fetcher.add_peer_scores(peer_scores);
1003            }
1004            LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
1005                cmd_string = "AddFreshReplicateRecords";
1006                let _ = self.add_keys_to_replication_fetcher(holder, keys, true);
1007            }
1008            LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
1009                cmd_string = "NotifyPeerVersion";
1010                self.record_node_version(peer, version);
1011            }
1012        }
1013
1014        self.log_handling(cmd_string.to_string(), start.elapsed());
1015
1016        Ok(())
1017    }
1018
1019    fn record_node_version(&mut self, peer_id: PeerId, version: String) {
1020        let _ = self.peers_version.insert(peer_id, version);
1021
1022        // Collect all peers_in_non_full_buckets
1023        let mut peers_in_non_full_buckets = vec![];
1024        for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
1025            let num_entires = kbucket.num_entries();
1026            if num_entires >= K_VALUE.get() {
1027                continue;
1028            } else {
1029                let peers_in_kbucket = kbucket
1030                    .iter()
1031                    .map(|peer_entry| peer_entry.node.key.into_preimage())
1032                    .collect::<Vec<PeerId>>();
1033                peers_in_non_full_buckets.extend(peers_in_kbucket);
1034            }
1035        }
1036
1037        // Ensure all existing node_version records are for those peers_in_non_full_buckets
1038        self.peers_version
1039            .retain(|peer_id, _version| peers_in_non_full_buckets.contains(peer_id));
1040    }
1041
1042    pub(crate) fn record_node_issue(&mut self, peer_id: PeerId, issue: NodeIssue) {
1043        info!("Peer {peer_id:?} is reported as having issue {issue:?}");
1044        let (issue_vec, is_bad) = self.bad_nodes.entry(peer_id).or_default();
1045        let mut new_bad_behaviour = None;
1046        let mut is_connection_issue = false;
1047
1048        // If being considered as bad already, skip certain operations
1049        if !(*is_bad) {
1050            // Remove outdated entries
1051            issue_vec.retain(|(_, timestamp)| timestamp.elapsed().as_secs() < 300);
1052
1053            // check if vec is already 10 long, if so, remove the oldest issue
1054            // we only track 10 issues to avoid mem leaks
1055            if issue_vec.len() == 10 {
1056                issue_vec.remove(0);
1057            }
1058
1059            // To avoid being too sensitive, only consider as a new issue
1060            // when after certain while since the last one
1061            let is_new_issue = if let Some((_issue, timestamp)) = issue_vec.last() {
1062                timestamp.elapsed().as_secs() > 10
1063            } else {
1064                true
1065            };
1066
1067            if is_new_issue {
1068                issue_vec.push((issue, Instant::now()));
1069            } else {
1070                return;
1071            }
1072
1073            // Only consider candidate as a bad node when:
1074            //   accumulated THREE same kind issues within certain period
1075            for (issue, _timestamp) in issue_vec.iter() {
1076                let issue_counts = issue_vec
1077                    .iter()
1078                    .filter(|(i, _timestamp)| *issue == *i)
1079                    .count();
1080                if issue_counts >= 3 {
1081                    // If it is a connection issue, we don't need to consider it as a bad node
1082                    if matches!(issue, NodeIssue::ConnectionIssue) {
1083                        is_connection_issue = true;
1084                    }
1085                    // TODO: disable black_list currently.
1086                    //       re-enable once got more statistics from large scaled network
1087                    // else {
1088                    //     *is_bad = true;
1089                    // }
1090                    new_bad_behaviour = Some(issue.clone());
1091                    info!("Peer {peer_id:?} accumulated {issue_counts} times of issue {issue:?}. Consider it as a bad node now.");
1092                    // Once a bad behaviour detected, no point to continue
1093                    break;
1094                }
1095            }
1096        }
1097
1098        // Give the faulty connection node more chances by removing the issue from the list. It is still evicted from
1099        // the routing table.
1100        if is_connection_issue {
1101            issue_vec.retain(|(issue, _timestamp)| !matches!(issue, NodeIssue::ConnectionIssue));
1102            info!("Evicting bad peer {peer_id:?} due to connection issue from RT.");
1103            if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
1104                self.update_on_peer_removal(*dead_peer.node.key.preimage());
1105            }
1106            return;
1107        }
1108
1109        if *is_bad {
1110            info!("Evicting bad peer {peer_id:?} from RT.");
1111            let addrs = if let Some(dead_peer) =
1112                self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id)
1113            {
1114                self.update_on_peer_removal(*dead_peer.node.key.preimage());
1115                Addresses(dead_peer.node.value.into_vec())
1116            } else {
1117                Addresses(Vec::new())
1118            };
1119
1120            if let Some(bad_behaviour) = new_bad_behaviour {
1121                // inform the bad node about it and add to the blocklist after that (not for connection issues)
1122                self.record_metrics(Marker::PeerConsideredAsBad { bad_peer: &peer_id });
1123
1124                warn!("Peer {peer_id:?} is considered as bad due to {bad_behaviour:?}. Informing the peer and adding to blocklist.");
1125                // response handling
1126                let (tx, rx) = oneshot::channel();
1127                let local_swarm_cmd_sender = self.local_cmd_sender.clone();
1128                tokio::spawn(async move {
1129                    match rx.await {
1130                        Ok(result) => {
1131                            debug!("Got response for Cmd::PeerConsideredAsBad from {peer_id:?} {result:?}");
1132                            if let Err(err) = local_swarm_cmd_sender
1133                                .send(LocalSwarmCmd::AddPeerToBlockList { peer_id })
1134                                .await
1135                            {
1136                                error!("SwarmDriver failed to send LocalSwarmCmd: {err}");
1137                            }
1138                        }
1139                        Err(err) => {
1140                            error!("Failed to get response from one shot channel for Cmd::PeerConsideredAsBad : {err:?}");
1141                        }
1142                    }
1143                });
1144
1145                // request
1146                let request = Request::Cmd(Cmd::PeerConsideredAsBad {
1147                    detected_by: NetworkAddress::from_peer(self.self_peer_id),
1148                    bad_peer: NetworkAddress::from_peer(peer_id),
1149                    bad_behaviour: bad_behaviour.to_string(),
1150                });
1151                self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1152                    req: request,
1153                    addrs: Some(addrs),
1154                    peer: peer_id,
1155                    sender: Some(tx),
1156                });
1157            }
1158        }
1159    }
1160
1161    fn verify_peer_quote(&mut self, peer_id: PeerId, quote: PaymentQuote) {
1162        if let Some(history_quote) = self.quotes_history.get(&peer_id) {
1163            if !history_quote.historical_verify(&quote) {
1164                info!("From {peer_id:?}, detected a bad quote {quote:?} against history_quote {history_quote:?}");
1165                self.record_node_issue(peer_id, NodeIssue::BadQuoting);
1166                return;
1167            }
1168
1169            if history_quote.is_newer_than(&quote) {
1170                return;
1171            }
1172        }
1173
1174        let _ = self.quotes_history.insert(peer_id, quote);
1175    }
1176
1177    fn try_interval_replication(&mut self) -> Result<()> {
1178        // Add a last_replication field to track the last time replication was performed
1179        if let Some(last_replication) = self.last_replication {
1180            if last_replication.elapsed() < MIN_REPLICATION_INTERVAL_S {
1181                info!("Skipping replication as minimum interval hasn't elapsed");
1182                return Ok(());
1183            }
1184        }
1185        // Store the current time as the last replication time
1186        self.last_replication = Some(Instant::now());
1187
1188        let self_addr = NetworkAddress::from_peer(self.self_peer_id);
1189        let mut replicate_targets = self.get_replicate_candidates(&self_addr)?;
1190
1191        let now = Instant::now();
1192        self.replication_targets
1193            .retain(|_peer_id, timestamp| *timestamp > now);
1194        // Only carry out replication to peer that not replicated to it recently
1195        replicate_targets
1196            .retain(|(peer_id, _addresses)| !self.replication_targets.contains_key(peer_id));
1197        if replicate_targets.is_empty() {
1198            return Ok(());
1199        }
1200
1201        let all_records: Vec<_> = self
1202            .swarm
1203            .behaviour_mut()
1204            .kademlia
1205            .store_mut()
1206            .record_addresses_ref()?
1207            .values()
1208            .cloned()
1209            .collect();
1210
1211        if !all_records.is_empty() {
1212            debug!(
1213                "Sending a replication list of {} keys to {replicate_targets:?} ",
1214                all_records.len()
1215            );
1216            let request = Request::Cmd(Cmd::Replicate {
1217                holder: NetworkAddress::from_peer(self.self_peer_id),
1218                keys: all_records
1219                    .into_iter()
1220                    .map(|(addr, val_type, _data_type)| (addr, val_type))
1221                    .collect(),
1222            });
1223            for (peer_id, _addrs) in replicate_targets {
1224                self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1225                    req: request.clone(),
1226                    peer: peer_id,
1227                    // replicate targets are part of the RT, so no need to dial them manually.
1228                    addrs: None,
1229                    sender: None,
1230                });
1231
1232                let _ = self
1233                    .replication_targets
1234                    .insert(peer_id, now + REPLICATION_TIMEOUT);
1235            }
1236        }
1237
1238        Ok(())
1239    }
1240
1241    // Replies with in-range replicate candidates
1242    // Fall back to expected_candidates peers if range is too narrow.
1243    // Note that:
1244    //   * For general replication, replicate candidates shall be closest to self, but with wider range
1245    //   * For replicate fresh records, the replicate candidates shall be the closest to data
1246    pub(crate) fn get_replicate_candidates(
1247        &mut self,
1248        target: &NetworkAddress,
1249    ) -> Result<Vec<(PeerId, Addresses)>> {
1250        let is_periodic_replicate = target.as_peer_id().is_some();
1251        let expected_candidates = if is_periodic_replicate {
1252            CLOSE_GROUP_SIZE * 2
1253        } else {
1254            CLOSE_GROUP_SIZE
1255        };
1256
1257        // get closest peers from buckets, sorted by increasing distance to the target
1258        let closest_k_peers = self.get_closest_local_peers_to_target(target, K_VALUE.get());
1259
1260        if let Some(responsible_range) = self
1261            .swarm
1262            .behaviour_mut()
1263            .kademlia
1264            .store_mut()
1265            .get_farthest_replication_distance()?
1266        {
1267            let peers_in_range = get_peers_in_range(&closest_k_peers, target, responsible_range);
1268
1269            if peers_in_range.len() >= expected_candidates {
1270                return Ok(peers_in_range);
1271            }
1272        }
1273
1274        // In case the range is too narrow, fall back to at least expected_candidates peers.
1275        Ok(closest_k_peers
1276            .iter()
1277            .take(expected_candidates)
1278            .cloned()
1279            .collect())
1280    }
1281}
1282
1283/// Returns the nodes that within the defined distance.
1284fn get_peers_in_range(
1285    peers: &[(PeerId, Addresses)],
1286    address: &NetworkAddress,
1287    range: Distance,
1288) -> Vec<(PeerId, Addresses)> {
1289    peers
1290        .iter()
1291        .filter_map(|(peer_id, addresses)| {
1292            if address.distance(&NetworkAddress::from_peer(*peer_id)) <= range {
1293                Some((*peer_id, addresses.clone()))
1294            } else {
1295                None
1296            }
1297        })
1298        .collect()
1299}