ant_networking/
cmd.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::time::Instant;
10use crate::{
11    driver::{PendingGetClosestType, SwarmDriver},
12    error::{NetworkError, Result},
13    event::TerminateNodeReason,
14    log_markers::Marker,
15    Addresses, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE,
16};
17use ant_evm::{PaymentQuote, QuotingMetrics};
18use ant_protocol::messages::ConnectionInfo;
19use ant_protocol::{
20    messages::{Cmd, Request, Response},
21    storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
22    NetworkAddress, PrettyPrintRecordKey,
23};
24use libp2p::{
25    kad::{
26        store::{Error as StoreError, RecordStore},
27        KBucketDistance as Distance, Record, RecordKey,
28    },
29    swarm::dial_opts::{DialOpts, PeerCondition},
30    Multiaddr, PeerId,
31};
32use std::{
33    collections::{BTreeMap, HashMap},
34    fmt::Debug,
35    time::Duration,
36};
37use tokio::sync::oneshot;
38use xor_name::XorName;
39
40const MAX_CONTINUOUS_HDD_WRITE_ERROR: usize = 5;
41
42// Shall be synced with `ant_node::PERIODIC_REPLICATION_INTERVAL_MAX_S`
43const REPLICATION_TIMEOUT: Duration = Duration::from_secs(45);
44
45// Throttles replication to at most once every 30 seconds
46const MIN_REPLICATION_INTERVAL_S: Duration = Duration::from_secs(30);
47
48#[derive(Debug, Eq, PartialEq, Clone)]
49pub enum NodeIssue {
50    /// Some connections might be considered to be critical and should be tracked.
51    ConnectionIssue,
52    /// Data Replication failed
53    ReplicationFailure,
54    /// Close nodes have reported this peer as bad
55    CloseNodesShunning,
56    /// Provided a bad quote
57    BadQuoting,
58    /// Peer failed to pass the chunk proof verification
59    FailedChunkProofCheck,
60}
61
62impl std::fmt::Display for NodeIssue {
63    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64        match self {
65            NodeIssue::ConnectionIssue => write!(f, "CriticalConnectionIssue"),
66            NodeIssue::ReplicationFailure => write!(f, "ReplicationFailure"),
67            NodeIssue::CloseNodesShunning => write!(f, "CloseNodesShunning"),
68            NodeIssue::BadQuoting => write!(f, "BadQuoting"),
69            NodeIssue::FailedChunkProofCheck => write!(f, "FailedChunkProofCheck"),
70        }
71    }
72}
73
74/// Commands to send to the Swarm
75pub enum LocalSwarmCmd {
76    /// Get a list of all peers in local RT, with correspondent Multiaddr info attached as well.
77    GetPeersWithMultiaddr {
78        sender: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
79    },
80    /// Get a map where each key is the ilog2 distance of that Kbucket
81    /// and each value is a vector of peers in that bucket.
82    GetKBuckets {
83        sender: oneshot::Sender<BTreeMap<u32, Vec<PeerId>>>,
84    },
85    // Get K closest peers to target from the local RoutingTable, self is included
86    GetKCloseLocalPeersToTarget {
87        key: NetworkAddress,
88        sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
89    },
90    GetSwarmLocalState(oneshot::Sender<SwarmLocalState>),
91    /// Check if the local RecordStore contains the provided key
92    RecordStoreHasKey {
93        key: RecordKey,
94        sender: oneshot::Sender<bool>,
95    },
96    /// Get the Addresses of all the Records held locally
97    GetAllLocalRecordAddresses {
98        sender: oneshot::Sender<HashMap<NetworkAddress, ValidationType>>,
99    },
100    /// Get data from the local RecordStore
101    GetLocalRecord {
102        key: RecordKey,
103        sender: oneshot::Sender<Option<Record>>,
104    },
105    /// GetLocalQuotingMetrics for this node
106    /// Returns the quoting metrics and whether the record at `key` is already stored locally
107    GetLocalQuotingMetrics {
108        key: RecordKey,
109        data_type: u32,
110        data_size: usize,
111        sender: oneshot::Sender<(QuotingMetrics, bool)>,
112    },
113    /// Notify the node received a payment.
114    PaymentReceived,
115    /// Put record to the local RecordStore
116    PutLocalRecord {
117        record: Record,
118        is_client_put: bool,
119    },
120    /// Remove a local record from the RecordStore
121    /// Typically because the write failed
122    RemoveFailedLocalRecord {
123        key: RecordKey,
124    },
125    /// Add a local record to the RecordStore's HashSet of stored records
126    /// This should be done after the record has been stored to disk
127    AddLocalRecordAsStored {
128        key: RecordKey,
129        record_type: ValidationType,
130        data_type: DataTypes,
131    },
132    /// Add a peer to the blocklist
133    AddPeerToBlockList {
134        peer_id: PeerId,
135    },
136    /// Notify whether peer is in trouble
137    RecordNodeIssue {
138        peer_id: PeerId,
139        issue: NodeIssue,
140    },
141    // Whether peer is considered as `in trouble` by self
142    IsPeerShunned {
143        target: NetworkAddress,
144        sender: oneshot::Sender<bool>,
145    },
146    // Quote verification agaisnt historical collected quotes
147    QuoteVerification {
148        quotes: Vec<(PeerId, PaymentQuote)>,
149    },
150    // Notify a fetch completion
151    FetchCompleted((RecordKey, ValidationType)),
152    /// Triggers interval repliation
153    /// NOTE: This does result in outgoing messages, but is produced locally
154    TriggerIntervalReplication,
155    /// Triggers unrelevant record cleanup
156    TriggerIrrelevantRecordCleanup,
157    /// Send peer scores (collected from storage challenge) to replication_fetcher
158    NotifyPeerScores {
159        peer_scores: Vec<(PeerId, bool)>,
160    },
161    /// Add fresh replicate records into replication_fetcher
162    AddFreshReplicateRecords {
163        holder: NetworkAddress,
164        keys: Vec<(NetworkAddress, ValidationType)>,
165    },
166    /// Notify a fetched peer's version
167    NotifyPeerVersion {
168        peer: PeerId,
169        version: String,
170    },
171    /// Get responsible distance range.
172    GetNetworkDensity {
173        sender: oneshot::Sender<Option<Distance>>,
174    },
175    /// Remove peer from the routing table
176    RemovePeer {
177        peer: PeerId,
178    },
179}
180
181/// Commands to send to the Swarm
182pub enum NetworkSwarmCmd {
183    // Get closest peers from the network
184    GetClosestPeersToAddressFromNetwork {
185        key: NetworkAddress,
186        sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
187    },
188
189    // Send Request to the PeerId.
190    SendRequest {
191        req: Request,
192        peer: PeerId,
193        /// If the address is provided, we will try to perform a dial before sending the request.
194        addrs: Option<Addresses>,
195
196        // If a `sender` is provided, the requesting node will await for a `Response` from the
197        // Peer. The result is then returned at the call site.
198        //
199        // If a `sender` is not provided, the requesting node will not wait for the Peer's
200        // response. Instead we trigger a `NetworkEvent::ResponseReceived` which calls the common
201        // `response_handler`
202        #[allow(clippy::type_complexity)]
203        sender: Option<oneshot::Sender<Result<(Response, Option<ConnectionInfo>)>>>,
204    },
205    SendResponse {
206        resp: Response,
207        channel: MsgResponder,
208    },
209    // Attempt to dial specific peer.
210    DialPeer {
211        peer: PeerId,
212        addrs: Addresses,
213    },
214}
215
216/// Debug impl for LocalSwarmCmd to avoid printing full Record, instead only RecodKey
217/// and RecordKind are printed.
218impl Debug for LocalSwarmCmd {
219    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220        match self {
221            LocalSwarmCmd::PutLocalRecord {
222                record,
223                is_client_put,
224            } => {
225                write!(
226                    f,
227                    "LocalSwarmCmd::PutLocalRecord {{ key: {:?}, is_client_put: {is_client_put:?} }}",
228                    PrettyPrintRecordKey::from(&record.key)
229                )
230            }
231            LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
232                write!(
233                    f,
234                    "LocalSwarmCmd::RemoveFailedLocalRecord {{ key: {:?} }}",
235                    PrettyPrintRecordKey::from(key)
236                )
237            }
238            LocalSwarmCmd::AddLocalRecordAsStored {
239                key,
240                record_type,
241                data_type,
242            } => {
243                write!(
244                    f,
245                    "LocalSwarmCmd::AddLocalRecordAsStored {{ key: {:?}, record_type: {record_type:?}, data_type: {data_type:?} }}",
246                    PrettyPrintRecordKey::from(key)
247                )
248            }
249            LocalSwarmCmd::GetKCloseLocalPeersToTarget { key, .. } => {
250                write!(
251                    f,
252                    "LocalSwarmCmd::GetKCloseLocalPeersToTarget {{ key: {key:?} }}"
253                )
254            }
255            LocalSwarmCmd::GetLocalQuotingMetrics { .. } => {
256                write!(f, "LocalSwarmCmd::GetLocalQuotingMetrics")
257            }
258            LocalSwarmCmd::PaymentReceived => {
259                write!(f, "LocalSwarmCmd::PaymentReceived")
260            }
261            LocalSwarmCmd::GetLocalRecord { key, .. } => {
262                write!(
263                    f,
264                    "LocalSwarmCmd::GetLocalRecord {{ key: {:?} }}",
265                    PrettyPrintRecordKey::from(key)
266                )
267            }
268            LocalSwarmCmd::GetAllLocalRecordAddresses { .. } => {
269                write!(f, "LocalSwarmCmd::GetAllLocalRecordAddresses")
270            }
271            LocalSwarmCmd::GetPeersWithMultiaddr { .. } => {
272                write!(f, "LocalSwarmCmd::GetPeersWithMultiaddr")
273            }
274            LocalSwarmCmd::GetKBuckets { .. } => {
275                write!(f, "LocalSwarmCmd::GetKBuckets")
276            }
277            LocalSwarmCmd::GetSwarmLocalState { .. } => {
278                write!(f, "LocalSwarmCmd::GetSwarmLocalState")
279            }
280            LocalSwarmCmd::RecordStoreHasKey { key, .. } => {
281                write!(
282                    f,
283                    "LocalSwarmCmd::RecordStoreHasKey {:?}",
284                    PrettyPrintRecordKey::from(key)
285                )
286            }
287            LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
288                write!(f, "LocalSwarmCmd::AddPeerToBlockList {peer_id:?}")
289            }
290            LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
291                write!(
292                    f,
293                    "LocalSwarmCmd::SendNodeStatus peer {peer_id:?}, issue: {issue:?}"
294                )
295            }
296            LocalSwarmCmd::IsPeerShunned { target, .. } => {
297                write!(f, "LocalSwarmCmd::IsPeerInTrouble target: {target:?}")
298            }
299            LocalSwarmCmd::QuoteVerification { quotes } => {
300                write!(
301                    f,
302                    "LocalSwarmCmd::QuoteVerification of {} quotes",
303                    quotes.len()
304                )
305            }
306            LocalSwarmCmd::FetchCompleted((key, record_type)) => {
307                write!(
308                    f,
309                    "LocalSwarmCmd::FetchCompleted({record_type:?} : {:?})",
310                    PrettyPrintRecordKey::from(key)
311                )
312            }
313            LocalSwarmCmd::TriggerIntervalReplication => {
314                write!(f, "LocalSwarmCmd::TriggerIntervalReplication")
315            }
316            LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
317                write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup")
318            }
319            LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
320                write!(f, "LocalSwarmCmd::NotifyPeerScores({peer_scores:?})")
321            }
322            LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
323                write!(
324                    f,
325                    "LocalSwarmCmd::AddFreshReplicateRecords({holder:?}, {keys:?})"
326                )
327            }
328            LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
329                write!(f, "LocalSwarmCmd::NotifyPeerVersion({peer:?}, {version:?})")
330            }
331            LocalSwarmCmd::GetNetworkDensity { .. } => {
332                write!(f, "LocalSwarmCmd::GetNetworkDensity")
333            }
334            LocalSwarmCmd::RemovePeer { peer } => {
335                write!(f, "LocalSwarmCmd::RemovePeer({peer:?})")
336            }
337        }
338    }
339}
340
341/// Debug impl for NetworkSwarmCmd to avoid printing full Record, instead only RecodKey
342/// and RecordKind are printed.
343impl Debug for NetworkSwarmCmd {
344    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345        match self {
346            NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, .. } => {
347                write!(f, "NetworkSwarmCmd::GetClosestPeers {{ key: {key:?} }}")
348            }
349            NetworkSwarmCmd::SendResponse { resp, .. } => {
350                write!(f, "NetworkSwarmCmd::SendResponse resp: {resp:?}")
351            }
352            NetworkSwarmCmd::SendRequest { req, peer, .. } => {
353                write!(
354                    f,
355                    "NetworkSwarmCmd::SendRequest req: {req:?}, peer: {peer:?}"
356                )
357            }
358            NetworkSwarmCmd::DialPeer { peer, .. } => {
359                write!(f, "NetworkSwarmCmd::DialPeer peer: {peer:?}")
360            }
361        }
362    }
363}
364/// Snapshot of information kept in the Swarm's local state
365#[derive(Debug, Clone)]
366pub struct SwarmLocalState {
367    /// List of peers that we have an established connection with.
368    pub connected_peers: Vec<PeerId>,
369    /// The number of peers in the routing table
370    pub peers_in_routing_table: usize,
371    /// List of addresses the node is currently listening on
372    pub listeners: Vec<Multiaddr>,
373}
374
375impl SwarmDriver {
376    pub(crate) fn handle_network_cmd(&mut self, cmd: NetworkSwarmCmd) -> Result<(), NetworkError> {
377        let start = Instant::now();
378        let cmd_string;
379        match cmd {
380            NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, sender } => {
381                cmd_string = "GetClosestPeersToAddressFromNetwork";
382                let query_id = self
383                    .swarm
384                    .behaviour_mut()
385                    .kademlia
386                    .get_closest_peers(key.as_bytes());
387                let _ = self.pending_get_closest_peers.insert(
388                    query_id,
389                    (
390                        PendingGetClosestType::FunctionCall(sender),
391                        Default::default(),
392                    ),
393                );
394            }
395            NetworkSwarmCmd::SendRequest {
396                req,
397                peer,
398                addrs,
399                sender,
400            } => {
401                cmd_string = "SendRequest";
402                // If `self` is the recipient, forward the request directly to our upper layer to
403                // be handled.
404                // `self` then handles the request and sends a response back again to itself.
405                if peer == *self.swarm.local_peer_id() {
406                    trace!("Sending query request to self");
407                    if let Request::Query(query) = req {
408                        self.send_event(NetworkEvent::QueryRequestReceived {
409                            query,
410                            channel: MsgResponder::FromSelf(sender),
411                        });
412                    } else {
413                        // We should never receive a Replicate request from ourselves.
414                        // we already hold this data if we do... so we can ignore
415                        trace!("Replicate cmd to self received, ignoring");
416                    }
417                } else {
418                    if let Some(addrs) = addrs {
419                        // dial the peer and send the request
420                        if addrs.0.is_empty() {
421                            info!("No addresses for peer {peer:?} to send request. This could cause dial failure if swarm could not find the peer's addrs.");
422                        } else {
423                            let opts = DialOpts::peer_id(peer).addresses(addrs.0.clone()).build();
424
425                            match self.swarm.dial(opts) {
426                                Ok(()) => {
427                                    debug!("Dialing peer {peer:?} for req_resp with address: {addrs:?}",);
428                                }
429                                Err(err) => {
430                                    error!("Failed to dial peer {peer:?} for req_resp with address: {addrs:?} error: {err}",);
431                                }
432                            }
433                        }
434                    }
435
436                    let request_id = self
437                        .swarm
438                        .behaviour_mut()
439                        .request_response
440                        .send_request(&peer, req);
441                    trace!("Sending request {request_id:?} to peer {peer:?}");
442                    let _ = self.pending_requests.insert(request_id, sender);
443
444                    trace!("Pending Requests now: {:?}", self.pending_requests.len());
445                }
446            }
447            NetworkSwarmCmd::SendResponse { resp, channel } => {
448                cmd_string = "SendResponse";
449                match channel {
450                    // If the response is for `self`, send it directly through the oneshot channel.
451                    MsgResponder::FromSelf(channel) => {
452                        trace!("Sending response to self");
453                        match channel {
454                            Some(channel) => {
455                                channel
456                                    .send(Ok((resp, None)))
457                                    .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
458                            }
459                            None => {
460                                // responses that are not awaited at the call site must be handled
461                                // separately
462                                self.send_event(NetworkEvent::ResponseReceived { res: resp });
463                            }
464                        }
465                    }
466                    MsgResponder::FromPeer(channel) => {
467                        self.swarm
468                            .behaviour_mut()
469                            .request_response
470                            .send_response(channel, resp)
471                            .map_err(NetworkError::OutgoingResponseDropped)?;
472                    }
473                }
474            }
475            NetworkSwarmCmd::DialPeer { peer, addrs } => {
476                cmd_string = "DialPeer";
477                let opts = DialOpts::peer_id(peer)
478                    // If we have a peer ID, we can prevent simultaneous dials.
479                    .condition(PeerCondition::NotDialing)
480                    .addresses(addrs.0.clone())
481                    .build();
482
483                match self.swarm.dial(opts) {
484                    Ok(()) => {
485                        info!("Manual dialing peer {peer:?} with address: {addrs:?}",);
486                    }
487                    Err(err) => {
488                        error!("Failed to manual dial peer {peer:?} with address: {addrs:?} error: {err}",);
489                    }
490                }
491            }
492        }
493
494        self.log_handling(cmd_string.to_string(), start.elapsed());
495
496        Ok(())
497    }
498    pub(crate) fn handle_local_cmd(&mut self, cmd: LocalSwarmCmd) -> Result<(), NetworkError> {
499        let start = Instant::now();
500        let mut cmd_string;
501        match cmd {
502            LocalSwarmCmd::TriggerIntervalReplication => {
503                cmd_string = "TriggerIntervalReplication";
504                self.try_interval_replication()?;
505            }
506            LocalSwarmCmd::GetLocalQuotingMetrics {
507                key,
508                data_type,
509                data_size,
510                sender,
511            } => {
512                cmd_string = "GetLocalQuotingMetrics";
513                let kbucket_status = self.get_kbuckets_status();
514                self.update_on_kbucket_status(&kbucket_status);
515                let (quoting_metrics, is_already_stored) = self
516                    .swarm
517                    .behaviour_mut()
518                    .kademlia
519                    .store_mut()
520                    .quoting_metrics(
521                        &key,
522                        data_type,
523                        data_size,
524                        Some(kbucket_status.estimated_network_size as u64),
525                    );
526                self.record_metrics(Marker::QuotingMetrics {
527                    quoting_metrics: &quoting_metrics,
528                });
529
530                // To avoid sending entire list to client, sending those that:
531                //     closer than the CLOSE_GROUP_SIZEth closest node to the target
532                let mut bad_nodes: Vec<_> = self
533                    .bad_nodes
534                    .iter()
535                    .filter_map(|(peer_id, (_issue_list, is_bad))| {
536                        if *is_bad {
537                            Some(NetworkAddress::from(*peer_id))
538                        } else {
539                            None
540                        }
541                    })
542                    .collect();
543
544                // List is ordered already, hence the last one is always the one wanted
545                let kbucket_key = NetworkAddress::from(&key).as_kbucket_key();
546                let closest_peers: Vec<_> = self
547                    .swarm
548                    .behaviour_mut()
549                    .kademlia
550                    .get_closest_local_peers(&kbucket_key)
551                    .map(|peer| peer.into_preimage())
552                    .take(CLOSE_GROUP_SIZE)
553                    .collect();
554                // In case of not enough clsest_peers, send the entire list
555                if closest_peers.len() >= CLOSE_GROUP_SIZE {
556                    let boundary_peer = closest_peers[CLOSE_GROUP_SIZE - 1];
557                    let key_address = NetworkAddress::from(&key);
558                    let boundary_distance =
559                        key_address.distance(&NetworkAddress::from(boundary_peer));
560                    bad_nodes
561                        .retain(|peer_addr| key_address.distance(peer_addr) < boundary_distance);
562                }
563
564                let _res = sender.send((quoting_metrics, is_already_stored));
565            }
566            LocalSwarmCmd::PaymentReceived => {
567                cmd_string = "PaymentReceived";
568                self.swarm
569                    .behaviour_mut()
570                    .kademlia
571                    .store_mut()
572                    .payment_received();
573            }
574            LocalSwarmCmd::GetLocalRecord { key, sender } => {
575                cmd_string = "GetLocalRecord";
576                let record = self
577                    .swarm
578                    .behaviour_mut()
579                    .kademlia
580                    .store_mut()
581                    .get(&key)
582                    .map(|rec| rec.into_owned());
583                let _ = sender.send(record);
584            }
585
586            LocalSwarmCmd::PutLocalRecord {
587                record,
588                is_client_put,
589            } => {
590                cmd_string = "PutLocalRecord";
591                let key = record.key.clone();
592                let record_key = PrettyPrintRecordKey::from(&key);
593
594                let record_type = match RecordHeader::from_record(&record) {
595                    Ok(record_header) => {
596                        match record_header.kind {
597                            RecordKind::DataOnly(DataTypes::Chunk) => ValidationType::Chunk,
598                            RecordKind::DataOnly(_) => {
599                                let content_hash = XorName::from_content(&record.value);
600                                ValidationType::NonChunk(content_hash)
601                            }
602                            RecordKind::DataWithPayment(_) => {
603                                error!("Record {record_key:?} with payment shall not be stored locally.");
604                                return Err(NetworkError::InCorrectRecordHeader);
605                            }
606                        }
607                    }
608                    Err(err) => {
609                        error!("For record {record_key:?}, failed to parse record_header {err:?}");
610                        return Err(NetworkError::InCorrectRecordHeader);
611                    }
612                };
613
614                let result = self
615                    .swarm
616                    .behaviour_mut()
617                    .kademlia
618                    .store_mut()
619                    .put_verified(record, record_type.clone(), is_client_put);
620
621                match result {
622                    Ok(_) => {
623                        // `replication_fetcher.farthest_acceptable_distance` shall only get
624                        // shrinked, instead of expanding, even with more nodes joined to share
625                        // the responsibility. Hence no need to reset it.
626                        // Also, as `record_store` is `prune 1 on 1 success put`, which means
627                        // once capacity reached max_records, there is only chance of rising slowly.
628                        // Due to the async/parrellel handling in replication_fetcher & record_store.
629                    }
630                    Err(StoreError::MaxRecords) => {
631                        // In case the capacity reaches full, restrict replication_fetcher to
632                        // only fetch entries not farther than the current farthest record
633                        let farthest = self
634                            .swarm
635                            .behaviour_mut()
636                            .kademlia
637                            .store_mut()
638                            .get_farthest();
639                        self.replication_fetcher.set_farthest_on_full(farthest);
640                    }
641                    Err(_) => {
642                        // Nothing special to do for these errors,
643                        // All error cases are further logged and bubbled up below
644                    }
645                }
646
647                // No matter storing the record succeeded or not,
648                // the entry shall be removed from the `replication_fetcher`.
649                // In case of local store error, re-attempt will be carried out
650                // within the next replication round.
651                let new_keys_to_fetch = self
652                    .replication_fetcher
653                    .notify_about_new_put(key.clone(), record_type);
654
655                if !new_keys_to_fetch.is_empty() {
656                    self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
657                }
658
659                // The record_store will prune far records and setup a `distance range`,
660                // once reached the `max_records` cap.
661                if let Some(distance) = self
662                    .swarm
663                    .behaviour_mut()
664                    .kademlia
665                    .store_mut()
666                    .get_responsible_distance_range()
667                {
668                    self.replication_fetcher
669                        .set_replication_distance_range(distance);
670                }
671
672                if let Err(err) = result {
673                    error!("Can't store verified record {record_key:?} locally: {err:?}");
674                    cmd_string = "PutLocalRecord error";
675                    self.log_handling(cmd_string.to_string(), start.elapsed());
676                    return Err(err.into());
677                };
678            }
679            LocalSwarmCmd::AddLocalRecordAsStored {
680                key,
681                record_type,
682                data_type,
683            } => {
684                cmd_string = "AddLocalRecordAsStored";
685                self.swarm
686                    .behaviour_mut()
687                    .kademlia
688                    .store_mut()
689                    .mark_as_stored(key, record_type, data_type);
690                // Reset counter on any success HDD write.
691                self.hard_disk_write_error = 0;
692            }
693            LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
694                info!("Removing Record locally, for {key:?}");
695                cmd_string = "RemoveFailedLocalRecord";
696                self.swarm.behaviour_mut().kademlia.store_mut().remove(&key);
697                self.hard_disk_write_error = self.hard_disk_write_error.saturating_add(1);
698                // When there is certain amount of continuous HDD write error,
699                // the hard disk is considered as full, and the node shall be terminated.
700                if self.hard_disk_write_error > MAX_CONTINUOUS_HDD_WRITE_ERROR {
701                    self.send_event(NetworkEvent::TerminateNode {
702                        reason: TerminateNodeReason::HardDiskWriteError,
703                    });
704                }
705            }
706            LocalSwarmCmd::RecordStoreHasKey { key, sender } => {
707                cmd_string = "RecordStoreHasKey";
708                let has_key = self
709                    .swarm
710                    .behaviour_mut()
711                    .kademlia
712                    .store_mut()
713                    .contains(&key);
714                let _ = sender.send(has_key);
715            }
716            LocalSwarmCmd::GetAllLocalRecordAddresses { sender } => {
717                cmd_string = "GetAllLocalRecordAddresses";
718                #[allow(clippy::mutable_key_type)] // for the Bytes in NetworkAddress
719                let addresses = self
720                    .swarm
721                    .behaviour_mut()
722                    .kademlia
723                    .store_mut()
724                    .record_addresses();
725                let _ = sender.send(addresses);
726            }
727            LocalSwarmCmd::GetKBuckets { sender } => {
728                cmd_string = "GetKBuckets";
729                let mut ilog2_kbuckets = BTreeMap::new();
730                for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
731                    let range = kbucket.range();
732                    if let Some(distance) = range.0.ilog2() {
733                        let peers_in_kbucket = kbucket
734                            .iter()
735                            .map(|peer_entry| peer_entry.node.key.into_preimage())
736                            .collect::<Vec<PeerId>>();
737                        let _ = ilog2_kbuckets.insert(distance, peers_in_kbucket);
738                    } else {
739                        // This shall never happen.
740                        error!("bucket is ourself ???!!!");
741                    }
742                }
743                let _ = sender.send(ilog2_kbuckets);
744            }
745            LocalSwarmCmd::GetPeersWithMultiaddr { sender } => {
746                cmd_string = "GetPeersWithMultiAddr";
747                let mut result: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
748                for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
749                    let peers_in_kbucket = kbucket
750                        .iter()
751                        .map(|peer_entry| {
752                            (
753                                peer_entry.node.key.into_preimage(),
754                                peer_entry.node.value.clone().into_vec(),
755                            )
756                        })
757                        .collect::<Vec<(PeerId, Vec<Multiaddr>)>>();
758                    result.extend(peers_in_kbucket);
759                }
760                let _ = sender.send(result);
761            }
762            LocalSwarmCmd::GetKCloseLocalPeersToTarget { key, sender } => {
763                cmd_string = "GetKCloseLocalPeersToTarget";
764                let closest_peers = self.get_closest_k_local_peers_to_target(&key, true);
765
766                let _ = sender.send(closest_peers);
767            }
768            LocalSwarmCmd::GetSwarmLocalState(sender) => {
769                cmd_string = "GetSwarmLocalState";
770                let current_state = SwarmLocalState {
771                    connected_peers: self.swarm.connected_peers().cloned().collect(),
772                    peers_in_routing_table: self.peers_in_rt,
773                    listeners: self.swarm.listeners().cloned().collect(),
774                };
775
776                sender
777                    .send(current_state)
778                    .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
779            }
780            LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
781                cmd_string = "AddPeerToBlockList";
782                self.swarm.behaviour_mut().blocklist.block_peer(peer_id);
783            }
784            LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
785                cmd_string = "RecordNodeIssues";
786                self.record_node_issue(peer_id, issue);
787            }
788            LocalSwarmCmd::IsPeerShunned { target, sender } => {
789                cmd_string = "IsPeerInTrouble";
790                let is_bad = if let Some(peer_id) = target.as_peer_id() {
791                    if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
792                        *is_bad
793                    } else {
794                        false
795                    }
796                } else {
797                    false
798                };
799                let _ = sender.send(is_bad);
800            }
801            LocalSwarmCmd::QuoteVerification { quotes } => {
802                cmd_string = "QuoteVerification";
803                for (peer_id, quote) in quotes {
804                    // Do nothing if already being bad
805                    if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
806                        if *is_bad {
807                            continue;
808                        }
809                    }
810                    self.verify_peer_quote(peer_id, quote);
811                }
812            }
813            LocalSwarmCmd::FetchCompleted((key, record_type)) => {
814                info!(
815                    "Fetch of {record_type:?} {:?} early completed, may have fetched an old version of the record.",
816                    PrettyPrintRecordKey::from(&key)
817                );
818                cmd_string = "FetchCompleted";
819                let new_keys_to_fetch = self
820                    .replication_fetcher
821                    .notify_fetch_early_completed(key, record_type);
822                if !new_keys_to_fetch.is_empty() {
823                    self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
824                }
825            }
826            LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
827                cmd_string = "TriggerIrrelevantRecordCleanup";
828                self.swarm
829                    .behaviour_mut()
830                    .kademlia
831                    .store_mut()
832                    .cleanup_irrelevant_records();
833            }
834            LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
835                cmd_string = "NotifyPeerScores";
836                self.replication_fetcher.add_peer_scores(peer_scores);
837            }
838            LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
839                cmd_string = "AddFreshReplicateRecords";
840                let _ = self.add_keys_to_replication_fetcher(holder, keys, true);
841            }
842            LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
843                cmd_string = "NotifyPeerVersion";
844                self.record_node_version(peer, version);
845            }
846            LocalSwarmCmd::GetNetworkDensity { sender } => {
847                cmd_string = "GetNetworkDensity";
848                let density = self
849                    .swarm
850                    .behaviour_mut()
851                    .kademlia
852                    .store_mut()
853                    .get_responsible_distance_range();
854                let _ = sender.send(density);
855            }
856            LocalSwarmCmd::RemovePeer { peer } => {
857                cmd_string = "RemovePeer";
858                if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer) {
859                    self.update_on_peer_removal(*dead_peer.node.key.preimage());
860                }
861            }
862        }
863
864        self.log_handling(cmd_string.to_string(), start.elapsed());
865
866        Ok(())
867    }
868
869    fn record_node_version(&mut self, peer_id: PeerId, version: String) {
870        let _ = self.peers_version.insert(peer_id, version);
871    }
872
873    pub(crate) fn record_node_issue(&mut self, peer_id: PeerId, issue: NodeIssue) {
874        info!("Peer {peer_id:?} is reported as having issue {issue:?}");
875        let (issue_vec, is_bad) = self.bad_nodes.entry(peer_id).or_default();
876        let mut new_bad_behaviour = None;
877        let mut is_connection_issue = false;
878
879        // If being considered as bad already, skip certain operations
880        if !(*is_bad) {
881            // Remove outdated entries
882            issue_vec.retain(|(_, timestamp)| timestamp.elapsed().as_secs() < 300);
883
884            // check if vec is already 10 long, if so, remove the oldest issue
885            // we only track 10 issues to avoid mem leaks
886            if issue_vec.len() == 10 {
887                issue_vec.remove(0);
888            }
889
890            // To avoid being too sensitive, only consider as a new issue
891            // when after certain while since the last one
892            let is_new_issue = if let Some((_issue, timestamp)) = issue_vec.last() {
893                timestamp.elapsed().as_secs() > 10
894            } else {
895                true
896            };
897
898            if is_new_issue {
899                issue_vec.push((issue, Instant::now()));
900            } else {
901                return;
902            }
903
904            // Only consider candidate as a bad node when:
905            //   accumulated THREE same kind issues within certain period
906            for (issue, _timestamp) in issue_vec.iter() {
907                let issue_counts = issue_vec
908                    .iter()
909                    .filter(|(i, _timestamp)| *issue == *i)
910                    .count();
911                if issue_counts >= 3 {
912                    // If it is a connection issue, we don't need to consider it as a bad node
913                    if matches!(issue, NodeIssue::ConnectionIssue) {
914                        is_connection_issue = true;
915                    }
916                    // TODO: disable black_list currently.
917                    //       re-enable once got more statistics from large scaled network
918                    // else {
919                    //     *is_bad = true;
920                    // }
921                    new_bad_behaviour = Some(issue.clone());
922                    info!("Peer {peer_id:?} accumulated {issue_counts} times of issue {issue:?}. Consider it as a bad node now.");
923                    // Once a bad behaviour detected, no point to continue
924                    break;
925                }
926            }
927        }
928
929        // Give the faulty connection node more chances by removing the issue from the list. It is still evicted from
930        // the routing table.
931        if is_connection_issue {
932            issue_vec.retain(|(issue, _timestamp)| !matches!(issue, NodeIssue::ConnectionIssue));
933            info!("Evicting bad peer {peer_id:?} due to connection issue from RT.");
934            if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
935                self.update_on_peer_removal(*dead_peer.node.key.preimage());
936            }
937            return;
938        }
939
940        if *is_bad {
941            info!("Evicting bad peer {peer_id:?} from RT.");
942            let addrs = if let Some(dead_peer) =
943                self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id)
944            {
945                self.update_on_peer_removal(*dead_peer.node.key.preimage());
946                Addresses(dead_peer.node.value.into_vec())
947            } else {
948                Addresses(Vec::new())
949            };
950
951            if let Some(bad_behaviour) = new_bad_behaviour {
952                // inform the bad node about it and add to the blocklist after that (not for connection issues)
953                self.record_metrics(Marker::PeerConsideredAsBad { bad_peer: &peer_id });
954
955                warn!("Peer {peer_id:?} is considered as bad due to {bad_behaviour:?}. Informing the peer and adding to blocklist.");
956                // response handling
957                let (tx, rx) = oneshot::channel();
958                let local_swarm_cmd_sender = self.local_cmd_sender.clone();
959                tokio::spawn(async move {
960                    match rx.await {
961                        Ok(result) => {
962                            debug!("Got response for Cmd::PeerConsideredAsBad from {peer_id:?} {result:?}");
963                            if let Err(err) = local_swarm_cmd_sender
964                                .send(LocalSwarmCmd::AddPeerToBlockList { peer_id })
965                                .await
966                            {
967                                error!("SwarmDriver failed to send LocalSwarmCmd: {err}");
968                            }
969                        }
970                        Err(err) => {
971                            error!("Failed to get response from one shot channel for Cmd::PeerConsideredAsBad : {err:?}");
972                        }
973                    }
974                });
975
976                // request
977                let request = Request::Cmd(Cmd::PeerConsideredAsBad {
978                    detected_by: NetworkAddress::from(self.self_peer_id),
979                    bad_peer: NetworkAddress::from(peer_id),
980                    bad_behaviour: bad_behaviour.to_string(),
981                });
982                self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
983                    req: request,
984                    addrs: Some(addrs),
985                    peer: peer_id,
986                    sender: Some(tx),
987                });
988            }
989        }
990    }
991
992    fn verify_peer_quote(&mut self, peer_id: PeerId, quote: PaymentQuote) {
993        if let Some(history_quote) = self.quotes_history.get(&peer_id) {
994            if !history_quote.historical_verify(&quote) {
995                info!("From {peer_id:?}, detected a bad quote {quote:?} against history_quote {history_quote:?}");
996                self.record_node_issue(peer_id, NodeIssue::BadQuoting);
997                return;
998            }
999
1000            if history_quote.is_newer_than(&quote) {
1001                return;
1002            }
1003        }
1004
1005        let _ = self.quotes_history.insert(peer_id, quote);
1006    }
1007
1008    fn try_interval_replication(&mut self) -> Result<()> {
1009        // Add a last_replication field to track the last time replication was performed
1010        if let Some(last_replication) = self.last_replication {
1011            if last_replication.elapsed() < MIN_REPLICATION_INTERVAL_S {
1012                info!("Skipping replication as minimum interval hasn't elapsed");
1013                return Ok(());
1014            }
1015        }
1016        // Store the current time as the last replication time
1017        self.last_replication = Some(Instant::now());
1018
1019        let self_addr = NetworkAddress::from(self.self_peer_id);
1020        let mut replicate_targets = self.get_replicate_candidates(&self_addr)?;
1021
1022        let now = Instant::now();
1023        self.replication_targets
1024            .retain(|_peer_id, timestamp| *timestamp > now);
1025        // Only carry out replication to peer that not replicated to it recently
1026        replicate_targets
1027            .retain(|(peer_id, _addresses)| !self.replication_targets.contains_key(peer_id));
1028        if replicate_targets.is_empty() {
1029            return Ok(());
1030        }
1031
1032        let all_records: Vec<_> = self
1033            .swarm
1034            .behaviour_mut()
1035            .kademlia
1036            .store_mut()
1037            .record_addresses_ref()
1038            .values()
1039            .cloned()
1040            .collect();
1041
1042        if !all_records.is_empty() {
1043            debug!(
1044                "Sending a replication list of {} keys to {replicate_targets:?} ",
1045                all_records.len()
1046            );
1047            let request = Request::Cmd(Cmd::Replicate {
1048                holder: NetworkAddress::from(self.self_peer_id),
1049                keys: all_records
1050                    .into_iter()
1051                    .map(|(addr, val_type, _data_type)| (addr, val_type))
1052                    .collect(),
1053            });
1054            for (peer_id, _addrs) in replicate_targets {
1055                self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1056                    req: request.clone(),
1057                    peer: peer_id,
1058                    // replicate targets are part of the RT, so no need to dial them manually.
1059                    addrs: None,
1060                    sender: None,
1061                });
1062
1063                let _ = self
1064                    .replication_targets
1065                    .insert(peer_id, now + REPLICATION_TIMEOUT);
1066            }
1067        }
1068
1069        Ok(())
1070    }
1071
1072    // Replies with in-range replicate candidates
1073    // Fall back to expected_candidates peers if range is too narrow.
1074    // Note that:
1075    //   * For general replication, replicate candidates shall be closest to self, but with wider range
1076    //   * For replicate fresh records, the replicate candidates shall be the closest to data
1077    pub(crate) fn get_replicate_candidates(
1078        &mut self,
1079        target: &NetworkAddress,
1080    ) -> Result<Vec<(PeerId, Addresses)>> {
1081        let is_periodic_replicate = target.as_peer_id().is_some();
1082        let expected_candidates = if is_periodic_replicate {
1083            CLOSE_GROUP_SIZE * 2
1084        } else {
1085            CLOSE_GROUP_SIZE
1086        };
1087
1088        // Get closest peers from buckets
1089        let closest_k_peers = self.get_closest_k_local_peers_to_target(target, false);
1090
1091        if let Some(responsible_range) = self
1092            .swarm
1093            .behaviour_mut()
1094            .kademlia
1095            .store_mut()
1096            .get_responsible_distance_range()
1097        {
1098            let peers_in_range = get_peers_in_range(&closest_k_peers, target, responsible_range);
1099
1100            if peers_in_range.len() >= expected_candidates {
1101                return Ok(peers_in_range);
1102            }
1103        }
1104
1105        // In case the range is too narrow, fall back to at least expected_candidates peers.
1106        Ok(closest_k_peers
1107            .iter()
1108            .take(expected_candidates)
1109            .cloned()
1110            .collect())
1111    }
1112}
1113
1114/// Returns the nodes that within the defined distance.
1115fn get_peers_in_range(
1116    peers: &[(PeerId, Addresses)],
1117    address: &NetworkAddress,
1118    range: Distance,
1119) -> Vec<(PeerId, Addresses)> {
1120    peers
1121        .iter()
1122        .filter_map(|(peer_id, addresses)| {
1123            if address.distance(&NetworkAddress::from(*peer_id)) <= range {
1124                Some((*peer_id, addresses.clone()))
1125            } else {
1126                None
1127            }
1128        })
1129        .collect()
1130}