ant_networking/
cmd.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::{
10    config::GetRecordCfg,
11    driver::{PendingGetClosestType, SwarmDriver},
12    error::{NetworkError, Result},
13    event::TerminateNodeReason,
14    log_markers::Marker,
15    GetRecordError, MsgResponder, NetworkEvent, ResponseQuorum, CLOSE_GROUP_SIZE,
16};
17use ant_evm::{PaymentQuote, QuotingMetrics};
18use ant_protocol::{
19    messages::{Cmd, Request, Response},
20    storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
21    NetworkAddress, PrettyPrintRecordKey,
22};
23use libp2p::{
24    kad::{
25        store::{Error as StoreError, RecordStore},
26        KBucketDistance as Distance, Record, RecordKey, K_VALUE,
27    },
28    Multiaddr, PeerId,
29};
30use std::{
31    collections::{BTreeMap, HashMap},
32    fmt::Debug,
33    time::Duration,
34};
35use tokio::sync::oneshot;
36use xor_name::XorName;
37
38use crate::time::Instant;
39
40const MAX_CONTINUOUS_HDD_WRITE_ERROR: usize = 5;
41
42// Shall be synced with `ant_node::PERIODIC_REPLICATION_INTERVAL_MAX_S`
43const REPLICATION_TIMEOUT: Duration = Duration::from_secs(45);
44
45// Throttles replication to at most once every 30 seconds
46const MIN_REPLICATION_INTERVAL_S: Duration = Duration::from_secs(30);
47
48#[derive(Debug, Eq, PartialEq, Clone)]
49pub enum NodeIssue {
50    /// Some connections might be considered to be critical and should be tracked.
51    ConnectionIssue,
52    /// Data Replication failed
53    ReplicationFailure,
54    /// Close nodes have reported this peer as bad
55    CloseNodesShunning,
56    /// Provided a bad quote
57    BadQuoting,
58    /// Peer failed to pass the chunk proof verification
59    FailedChunkProofCheck,
60}
61
62impl std::fmt::Display for NodeIssue {
63    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64        match self {
65            NodeIssue::ConnectionIssue => write!(f, "CriticalConnectionIssue"),
66            NodeIssue::ReplicationFailure => write!(f, "ReplicationFailure"),
67            NodeIssue::CloseNodesShunning => write!(f, "CloseNodesShunning"),
68            NodeIssue::BadQuoting => write!(f, "BadQuoting"),
69            NodeIssue::FailedChunkProofCheck => write!(f, "FailedChunkProofCheck"),
70        }
71    }
72}
73
74/// Commands to send to the Swarm
75pub enum LocalSwarmCmd {
76    /// Get a list of all peers in local RT, with correspondent Multiaddr info attached as well.
77    GetPeersWithMultiaddr {
78        sender: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
79    },
80    /// Get a map where each key is the ilog2 distance of that Kbucket
81    /// and each value is a vector of peers in that bucket.
82    GetKBuckets {
83        sender: oneshot::Sender<BTreeMap<u32, Vec<PeerId>>>,
84    },
85    /// Returns the replicate candidates in range.
86    /// In case the range is too narrow, returns at lease CLOSE_GROUP_SIZE peers.
87    GetReplicateCandidates {
88        data_addr: NetworkAddress,
89        sender: oneshot::Sender<Result<Vec<PeerId>>>,
90    },
91    // Returns up to K_VALUE peers from all the k-buckets from the local Routing Table.
92    // And our PeerId as well.
93    GetClosestKLocalPeers {
94        sender: oneshot::Sender<Vec<PeerId>>,
95    },
96    // Get closest peers from the local RoutingTable
97    GetCloseGroupLocalPeers {
98        key: NetworkAddress,
99        sender: oneshot::Sender<Vec<PeerId>>,
100    },
101    GetSwarmLocalState(oneshot::Sender<SwarmLocalState>),
102    /// Check if the local RecordStore contains the provided key
103    RecordStoreHasKey {
104        key: RecordKey,
105        sender: oneshot::Sender<Result<bool>>,
106    },
107    /// Get the Addresses of all the Records held locally
108    GetAllLocalRecordAddresses {
109        sender: oneshot::Sender<Result<HashMap<NetworkAddress, ValidationType>>>,
110    },
111    /// Get data from the local RecordStore
112    GetLocalRecord {
113        key: RecordKey,
114        sender: oneshot::Sender<Option<Record>>,
115    },
116    /// GetLocalQuotingMetrics for this node
117    /// Returns the quoting metrics and whether the record at `key` is already stored locally
118    GetLocalQuotingMetrics {
119        key: RecordKey,
120        data_type: u32,
121        data_size: usize,
122        sender: oneshot::Sender<Result<(QuotingMetrics, bool)>>,
123    },
124    /// Notify the node received a payment.
125    PaymentReceived,
126    /// Put record to the local RecordStore
127    PutLocalRecord {
128        record: Record,
129        is_client_put: bool,
130    },
131    /// Remove a local record from the RecordStore
132    /// Typically because the write failed
133    RemoveFailedLocalRecord {
134        key: RecordKey,
135    },
136    /// Add a local record to the RecordStore's HashSet of stored records
137    /// This should be done after the record has been stored to disk
138    AddLocalRecordAsStored {
139        key: RecordKey,
140        record_type: ValidationType,
141        data_type: DataTypes,
142    },
143    /// Add a peer to the blocklist
144    AddPeerToBlockList {
145        peer_id: PeerId,
146    },
147    /// Notify whether peer is in trouble
148    RecordNodeIssue {
149        peer_id: PeerId,
150        issue: NodeIssue,
151    },
152    // Whether peer is considered as `in trouble` by self
153    IsPeerShunned {
154        target: NetworkAddress,
155        sender: oneshot::Sender<bool>,
156    },
157    // Quote verification agaisnt historical collected quotes
158    QuoteVerification {
159        quotes: Vec<(PeerId, PaymentQuote)>,
160    },
161    // Notify a fetch completion
162    FetchCompleted((RecordKey, ValidationType)),
163    /// Triggers interval repliation
164    /// NOTE: This does result in outgoing messages, but is produced locally
165    TriggerIntervalReplication,
166    /// Triggers unrelevant record cleanup
167    TriggerIrrelevantRecordCleanup,
168    /// Add a network density sample
169    AddNetworkDensitySample {
170        distance: Distance,
171    },
172    /// Send peer scores (collected from storage challenge) to replication_fetcher
173    NotifyPeerScores {
174        peer_scores: Vec<(PeerId, bool)>,
175    },
176    /// Add fresh replicate records into replication_fetcher
177    AddFreshReplicateRecords {
178        holder: NetworkAddress,
179        keys: Vec<(NetworkAddress, ValidationType)>,
180    },
181    /// Notify a fetched peer's version
182    NotifyPeerVersion {
183        peer: PeerId,
184        version: String,
185    },
186}
187
188/// Commands to send to the Swarm
189pub enum NetworkSwarmCmd {
190    // Get closest peers from the network
191    GetClosestPeersToAddressFromNetwork {
192        key: NetworkAddress,
193        sender: oneshot::Sender<Vec<PeerId>>,
194    },
195
196    // Send Request to the PeerId.
197    SendRequest {
198        req: Request,
199        peer: PeerId,
200
201        // If a `sender` is provided, the requesting node will await for a `Response` from the
202        // Peer. The result is then returned at the call site.
203        //
204        // If a `sender` is not provided, the requesting node will not wait for the Peer's
205        // response. Instead we trigger a `NetworkEvent::ResponseReceived` which calls the common
206        // `response_handler`
207        sender: Option<oneshot::Sender<Result<Response>>>,
208    },
209    SendResponse {
210        resp: Response,
211        channel: MsgResponder,
212    },
213
214    /// Get Record from the Kad network
215    GetNetworkRecord {
216        key: RecordKey,
217        sender: oneshot::Sender<std::result::Result<Record, GetRecordError>>,
218        cfg: GetRecordCfg,
219    },
220
221    /// Put record to network
222    PutRecord {
223        record: Record,
224        sender: oneshot::Sender<Result<()>>,
225        quorum: ResponseQuorum,
226    },
227    /// Put record to specific node
228    PutRecordTo {
229        peers: Vec<PeerId>,
230        record: Record,
231        sender: oneshot::Sender<Result<()>>,
232        quorum: ResponseQuorum,
233    },
234}
235
236/// Debug impl for LocalSwarmCmd to avoid printing full Record, instead only RecodKey
237/// and RecordKind are printed.
238impl Debug for LocalSwarmCmd {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        match self {
241            LocalSwarmCmd::PutLocalRecord {
242                record,
243                is_client_put,
244            } => {
245                write!(
246                    f,
247                    "LocalSwarmCmd::PutLocalRecord {{ key: {:?}, is_client_put: {is_client_put:?} }}",
248                    PrettyPrintRecordKey::from(&record.key)
249                )
250            }
251            LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
252                write!(
253                    f,
254                    "LocalSwarmCmd::RemoveFailedLocalRecord {{ key: {:?} }}",
255                    PrettyPrintRecordKey::from(key)
256                )
257            }
258            LocalSwarmCmd::AddLocalRecordAsStored {
259                key,
260                record_type,
261                data_type,
262            } => {
263                write!(
264                    f,
265                    "LocalSwarmCmd::AddLocalRecordAsStored {{ key: {:?}, record_type: {record_type:?}, data_type: {data_type:?} }}",
266                    PrettyPrintRecordKey::from(key)
267                )
268            }
269            LocalSwarmCmd::GetReplicateCandidates { .. } => {
270                write!(f, "LocalSwarmCmd::GetReplicateCandidates")
271            }
272            LocalSwarmCmd::GetClosestKLocalPeers { .. } => {
273                write!(f, "LocalSwarmCmd::GetClosestKLocalPeers")
274            }
275            LocalSwarmCmd::GetCloseGroupLocalPeers { key, .. } => {
276                write!(
277                    f,
278                    "LocalSwarmCmd::GetCloseGroupLocalPeers {{ key: {key:?} }}"
279                )
280            }
281            LocalSwarmCmd::GetLocalQuotingMetrics { .. } => {
282                write!(f, "LocalSwarmCmd::GetLocalQuotingMetrics")
283            }
284            LocalSwarmCmd::PaymentReceived => {
285                write!(f, "LocalSwarmCmd::PaymentReceived")
286            }
287            LocalSwarmCmd::GetLocalRecord { key, .. } => {
288                write!(
289                    f,
290                    "LocalSwarmCmd::GetLocalRecord {{ key: {:?} }}",
291                    PrettyPrintRecordKey::from(key)
292                )
293            }
294            LocalSwarmCmd::GetAllLocalRecordAddresses { .. } => {
295                write!(f, "LocalSwarmCmd::GetAllLocalRecordAddresses")
296            }
297            LocalSwarmCmd::GetPeersWithMultiaddr { .. } => {
298                write!(f, "LocalSwarmCmd::GetPeersWithMultiaddr")
299            }
300            LocalSwarmCmd::GetKBuckets { .. } => {
301                write!(f, "LocalSwarmCmd::GetKBuckets")
302            }
303            LocalSwarmCmd::GetSwarmLocalState { .. } => {
304                write!(f, "LocalSwarmCmd::GetSwarmLocalState")
305            }
306            LocalSwarmCmd::RecordStoreHasKey { key, .. } => {
307                write!(
308                    f,
309                    "LocalSwarmCmd::RecordStoreHasKey {:?}",
310                    PrettyPrintRecordKey::from(key)
311                )
312            }
313            LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
314                write!(f, "LocalSwarmCmd::AddPeerToBlockList {peer_id:?}")
315            }
316            LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
317                write!(
318                    f,
319                    "LocalSwarmCmd::SendNodeStatus peer {peer_id:?}, issue: {issue:?}"
320                )
321            }
322            LocalSwarmCmd::IsPeerShunned { target, .. } => {
323                write!(f, "LocalSwarmCmd::IsPeerInTrouble target: {target:?}")
324            }
325            LocalSwarmCmd::QuoteVerification { quotes } => {
326                write!(
327                    f,
328                    "LocalSwarmCmd::QuoteVerification of {} quotes",
329                    quotes.len()
330                )
331            }
332            LocalSwarmCmd::FetchCompleted((key, record_type)) => {
333                write!(
334                    f,
335                    "LocalSwarmCmd::FetchCompleted({record_type:?} : {:?})",
336                    PrettyPrintRecordKey::from(key)
337                )
338            }
339            LocalSwarmCmd::TriggerIntervalReplication => {
340                write!(f, "LocalSwarmCmd::TriggerIntervalReplication")
341            }
342            LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
343                write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup")
344            }
345            LocalSwarmCmd::AddNetworkDensitySample { distance } => {
346                write!(f, "LocalSwarmCmd::AddNetworkDensitySample({distance:?})")
347            }
348            LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
349                write!(f, "LocalSwarmCmd::NotifyPeerScores({peer_scores:?})")
350            }
351            LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
352                write!(
353                    f,
354                    "LocalSwarmCmd::AddFreshReplicateRecords({holder:?}, {keys:?})"
355                )
356            }
357            LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
358                write!(f, "LocalSwarmCmd::NotifyPeerVersion({peer:?}, {version:?})")
359            }
360        }
361    }
362}
363
364/// Debug impl for NetworkSwarmCmd to avoid printing full Record, instead only RecodKey
365/// and RecordKind are printed.
366impl Debug for NetworkSwarmCmd {
367    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368        match self {
369            NetworkSwarmCmd::GetNetworkRecord { key, cfg, .. } => {
370                write!(
371                    f,
372                    "NetworkSwarmCmd::GetNetworkRecord {{ key: {:?}, cfg: {cfg:?}",
373                    PrettyPrintRecordKey::from(key)
374                )
375            }
376            NetworkSwarmCmd::PutRecord { record, .. } => {
377                write!(
378                    f,
379                    "NetworkSwarmCmd::PutRecord {{ key: {:?} }}",
380                    PrettyPrintRecordKey::from(&record.key)
381                )
382            }
383            NetworkSwarmCmd::PutRecordTo { peers, record, .. } => {
384                write!(
385                    f,
386                    "NetworkSwarmCmd::PutRecordTo {{ peers: {peers:?}, key: {:?} }}",
387                    PrettyPrintRecordKey::from(&record.key)
388                )
389            }
390            NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, .. } => {
391                write!(f, "NetworkSwarmCmd::GetClosestPeers {{ key: {key:?} }}")
392            }
393            NetworkSwarmCmd::SendResponse { resp, .. } => {
394                write!(f, "NetworkSwarmCmd::SendResponse resp: {resp:?}")
395            }
396            NetworkSwarmCmd::SendRequest { req, peer, .. } => {
397                write!(
398                    f,
399                    "NetworkSwarmCmd::SendRequest req: {req:?}, peer: {peer:?}"
400                )
401            }
402        }
403    }
404}
405/// Snapshot of information kept in the Swarm's local state
406#[derive(Debug, Clone)]
407pub struct SwarmLocalState {
408    /// List of currently connected peers
409    pub connected_peers: Vec<PeerId>,
410    /// List of addresses the node is currently listening on
411    pub listeners: Vec<Multiaddr>,
412}
413
414impl SwarmDriver {
415    pub(crate) fn handle_network_cmd(&mut self, cmd: NetworkSwarmCmd) -> Result<(), NetworkError> {
416        let start = Instant::now();
417        let cmd_string;
418        match cmd {
419            NetworkSwarmCmd::GetNetworkRecord { key, sender, cfg } => {
420                cmd_string = "GetNetworkRecord";
421
422                for (pending_query, (inflight_record_query_key, senders, _, _)) in
423                    self.pending_get_record.iter_mut()
424                {
425                    if *inflight_record_query_key == key {
426                        debug!(
427                            "GetNetworkRecord for {:?} is already in progress. Adding sender to {pending_query:?}",
428                            PrettyPrintRecordKey::from(&key)
429                        );
430                        senders.push(sender);
431
432                        // early exit as we're already processing this query
433                        return Ok(());
434                    }
435                }
436
437                let query_id = self.swarm.behaviour_mut().kademlia.get_record(key.clone());
438
439                debug!(
440                    "Record {:?} with task {query_id:?} expected to be held by {:?}",
441                    PrettyPrintRecordKey::from(&key),
442                    cfg.expected_holders
443                );
444
445                if self
446                    .pending_get_record
447                    .insert(query_id, (key, vec![sender], Default::default(), cfg))
448                    .is_some()
449                {
450                    warn!("An existing get_record task {query_id:?} got replaced");
451                }
452                // Logging the status of the `pending_get_record`.
453                // We also interested in the status of `result_map` (which contains record) inside.
454                let total_records: usize = self
455                    .pending_get_record
456                    .iter()
457                    .map(|(_, (_, _, result_map, _))| result_map.len())
458                    .sum();
459                info!("We now have {} pending get record attempts and cached {total_records} fetched copies",
460                      self.pending_get_record.len());
461            }
462            NetworkSwarmCmd::PutRecord {
463                record,
464                sender,
465                quorum,
466            } => {
467                cmd_string = "PutRecord";
468                let record_key = PrettyPrintRecordKey::from(&record.key).into_owned();
469                debug!(
470                    "Putting record sized: {:?} to network {:?}",
471                    record.value.len(),
472                    record_key
473                );
474                let res = match self
475                    .swarm
476                    .behaviour_mut()
477                    .kademlia
478                    .put_record(record, quorum.get_kad_quorum())
479                {
480                    Ok(request_id) => {
481                        debug!("Sent record {record_key:?} to network. Request id: {request_id:?} to network");
482                        Ok(())
483                    }
484                    Err(error) => {
485                        error!("Error sending record {record_key:?} to network");
486                        Err(NetworkError::from(error))
487                    }
488                };
489
490                if let Err(err) = sender.send(res) {
491                    error!("Could not send response to PutRecord cmd: {:?}", err);
492                }
493            }
494            NetworkSwarmCmd::PutRecordTo {
495                peers,
496                record,
497                sender,
498                quorum,
499            } => {
500                cmd_string = "PutRecordTo";
501                let record_key = PrettyPrintRecordKey::from(&record.key).into_owned();
502                debug!(
503                    "Putting record {record_key:?} sized: {:?} to {peers:?}",
504                    record.value.len(),
505                );
506                let peers_count = peers.len();
507                let request_id = self.swarm.behaviour_mut().kademlia.put_record_to(
508                    record,
509                    peers.into_iter(),
510                    quorum.get_kad_quorum(),
511                );
512                debug!("Sent record {record_key:?} to {peers_count:?} peers. Request id: {request_id:?}");
513
514                if let Err(err) = sender.send(Ok(())) {
515                    error!("Could not send response to PutRecordTo cmd: {:?}", err);
516                }
517            }
518            NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, sender } => {
519                cmd_string = "GetClosestPeersToAddressFromNetwork";
520                let query_id = self
521                    .swarm
522                    .behaviour_mut()
523                    .kademlia
524                    .get_closest_peers(key.as_bytes());
525                let _ = self.pending_get_closest_peers.insert(
526                    query_id,
527                    (
528                        PendingGetClosestType::FunctionCall(sender),
529                        Default::default(),
530                    ),
531                );
532            }
533
534            NetworkSwarmCmd::SendRequest { req, peer, sender } => {
535                cmd_string = "SendRequest";
536                // If `self` is the recipient, forward the request directly to our upper layer to
537                // be handled.
538                // `self` then handles the request and sends a response back again to itself.
539                if peer == *self.swarm.local_peer_id() {
540                    trace!("Sending query request to self");
541                    if let Request::Query(query) = req {
542                        self.send_event(NetworkEvent::QueryRequestReceived {
543                            query,
544                            channel: MsgResponder::FromSelf(sender),
545                        });
546                    } else {
547                        // We should never receive a Replicate request from ourselves.
548                        // we already hold this data if we do... so we can ignore
549                        trace!("Replicate cmd to self received, ignoring");
550                    }
551                } else {
552                    let request_id = self
553                        .swarm
554                        .behaviour_mut()
555                        .request_response
556                        .send_request(&peer, req);
557                    trace!("Sending request {request_id:?} to peer {peer:?}");
558                    let _ = self.pending_requests.insert(request_id, sender);
559
560                    trace!("Pending Requests now: {:?}", self.pending_requests.len());
561                }
562            }
563            NetworkSwarmCmd::SendResponse { resp, channel } => {
564                cmd_string = "SendResponse";
565                match channel {
566                    // If the response is for `self`, send it directly through the oneshot channel.
567                    MsgResponder::FromSelf(channel) => {
568                        trace!("Sending response to self");
569                        match channel {
570                            Some(channel) => {
571                                channel
572                                    .send(Ok(resp))
573                                    .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
574                            }
575                            None => {
576                                // responses that are not awaited at the call site must be handled
577                                // separately
578                                self.send_event(NetworkEvent::ResponseReceived { res: resp });
579                            }
580                        }
581                    }
582                    MsgResponder::FromPeer(channel) => {
583                        self.swarm
584                            .behaviour_mut()
585                            .request_response
586                            .send_response(channel, resp)
587                            .map_err(NetworkError::OutgoingResponseDropped)?;
588                    }
589                }
590            }
591        }
592
593        self.log_handling(cmd_string.to_string(), start.elapsed());
594
595        Ok(())
596    }
597    pub(crate) fn handle_local_cmd(&mut self, cmd: LocalSwarmCmd) -> Result<(), NetworkError> {
598        let start = Instant::now();
599        let mut cmd_string;
600        match cmd {
601            LocalSwarmCmd::TriggerIntervalReplication => {
602                cmd_string = "TriggerIntervalReplication";
603                self.try_interval_replication()?;
604            }
605            LocalSwarmCmd::GetLocalQuotingMetrics {
606                key,
607                data_type,
608                data_size,
609                sender,
610            } => {
611                cmd_string = "GetLocalQuotingMetrics";
612                let kbucket_status = self.get_kbuckets_status();
613                self.update_on_kbucket_status(&kbucket_status);
614                let (quoting_metrics, is_already_stored) = match self
615                    .swarm
616                    .behaviour_mut()
617                    .kademlia
618                    .store_mut()
619                    .quoting_metrics(
620                        &key,
621                        data_type,
622                        data_size,
623                        Some(kbucket_status.estimated_network_size as u64),
624                    ) {
625                    Ok(res) => res,
626                    Err(err) => {
627                        let _res = sender.send(Err(err));
628                        return Ok(());
629                    }
630                };
631                self.record_metrics(Marker::QuotingMetrics {
632                    quoting_metrics: &quoting_metrics,
633                });
634
635                // To avoid sending entire list to client, sending those that:
636                //     closer than the CLOSE_GROUP_SIZEth closest node to the target
637                let mut bad_nodes: Vec<_> = self
638                    .bad_nodes
639                    .iter()
640                    .filter_map(|(peer_id, (_issue_list, is_bad))| {
641                        if *is_bad {
642                            Some(NetworkAddress::from_peer(*peer_id))
643                        } else {
644                            None
645                        }
646                    })
647                    .collect();
648
649                // List is ordered already, hence the last one is always the one wanted
650                let kbucket_key = NetworkAddress::from_record_key(&key).as_kbucket_key();
651                let closest_peers: Vec<_> = self
652                    .swarm
653                    .behaviour_mut()
654                    .kademlia
655                    .get_closest_local_peers(&kbucket_key)
656                    .map(|peer| peer.into_preimage())
657                    .take(CLOSE_GROUP_SIZE)
658                    .collect();
659                // In case of not enough clsest_peers, send the entire list
660                if closest_peers.len() >= CLOSE_GROUP_SIZE {
661                    let boundary_peer = closest_peers[CLOSE_GROUP_SIZE - 1];
662                    let key_address = NetworkAddress::from_record_key(&key);
663                    let boundary_distance =
664                        key_address.distance(&NetworkAddress::from_peer(boundary_peer));
665                    bad_nodes
666                        .retain(|peer_addr| key_address.distance(peer_addr) < boundary_distance);
667                }
668
669                let _res = sender.send(Ok((quoting_metrics, is_already_stored)));
670            }
671            LocalSwarmCmd::PaymentReceived => {
672                cmd_string = "PaymentReceived";
673                self.swarm
674                    .behaviour_mut()
675                    .kademlia
676                    .store_mut()
677                    .payment_received();
678            }
679            LocalSwarmCmd::GetLocalRecord { key, sender } => {
680                cmd_string = "GetLocalRecord";
681                let record = self
682                    .swarm
683                    .behaviour_mut()
684                    .kademlia
685                    .store_mut()
686                    .get(&key)
687                    .map(|rec| rec.into_owned());
688                let _ = sender.send(record);
689            }
690
691            LocalSwarmCmd::PutLocalRecord {
692                record,
693                is_client_put,
694            } => {
695                cmd_string = "PutLocalRecord";
696                let key = record.key.clone();
697                let record_key = PrettyPrintRecordKey::from(&key);
698
699                let record_type = match RecordHeader::from_record(&record) {
700                    Ok(record_header) => {
701                        match record_header.kind {
702                            RecordKind::DataOnly(DataTypes::Chunk) => ValidationType::Chunk,
703                            RecordKind::DataOnly(_) => {
704                                let content_hash = XorName::from_content(&record.value);
705                                ValidationType::NonChunk(content_hash)
706                            }
707                            RecordKind::DataWithPayment(_) => {
708                                error!("Record {record_key:?} with payment shall not be stored locally.");
709                                return Err(NetworkError::InCorrectRecordHeader);
710                            }
711                        }
712                    }
713                    Err(err) => {
714                        error!("For record {record_key:?}, failed to parse record_header {err:?}");
715                        return Err(NetworkError::InCorrectRecordHeader);
716                    }
717                };
718
719                let result = self
720                    .swarm
721                    .behaviour_mut()
722                    .kademlia
723                    .store_mut()
724                    .put_verified(record, record_type.clone(), is_client_put);
725
726                match result {
727                    Ok(_) => {
728                        // `replication_fetcher.farthest_acceptable_distance` shall only get
729                        // shrinked, instead of expanding, even with more nodes joined to share
730                        // the responsibility. Hence no need to reset it.
731                        // Also, as `record_store` is `prune 1 on 1 success put`, which means
732                        // once capacity reached max_records, there is only chance of rising slowly.
733                        // Due to the async/parrellel handling in replication_fetcher & record_store.
734                    }
735                    Err(StoreError::MaxRecords) => {
736                        // In case the capacity reaches full, restrict replication_fetcher to
737                        // only fetch entries not farther than the current farthest record
738                        let farthest = self
739                            .swarm
740                            .behaviour_mut()
741                            .kademlia
742                            .store_mut()
743                            .get_farthest()?;
744                        self.replication_fetcher.set_farthest_on_full(farthest);
745                    }
746                    Err(_) => {
747                        // Nothing special to do for these errors,
748                        // All error cases are further logged and bubbled up below
749                    }
750                }
751
752                // No matter storing the record succeeded or not,
753                // the entry shall be removed from the `replication_fetcher`.
754                // In case of local store error, re-attempt will be carried out
755                // within the next replication round.
756                let new_keys_to_fetch = self
757                    .replication_fetcher
758                    .notify_about_new_put(key.clone(), record_type);
759
760                if !new_keys_to_fetch.is_empty() {
761                    self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
762                }
763
764                // The record_store will prune far records and setup a `distance range`,
765                // once reached the `max_records` cap.
766                if let Some(distance) = self
767                    .swarm
768                    .behaviour_mut()
769                    .kademlia
770                    .store_mut()
771                    .get_farthest_replication_distance()?
772                {
773                    self.replication_fetcher
774                        .set_replication_distance_range(distance);
775                }
776
777                if let Err(err) = result {
778                    error!("Can't store verified record {record_key:?} locally: {err:?}");
779                    cmd_string = "PutLocalRecord error";
780                    self.log_handling(cmd_string.to_string(), start.elapsed());
781                    return Err(err.into());
782                };
783            }
784            LocalSwarmCmd::AddLocalRecordAsStored {
785                key,
786                record_type,
787                data_type,
788            } => {
789                cmd_string = "AddLocalRecordAsStored";
790                self.swarm
791                    .behaviour_mut()
792                    .kademlia
793                    .store_mut()
794                    .mark_as_stored(key, record_type, data_type);
795                // Reset counter on any success HDD write.
796                self.hard_disk_write_error = 0;
797            }
798            LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
799                info!("Removing Record locally, for {key:?}");
800                cmd_string = "RemoveFailedLocalRecord";
801                self.swarm.behaviour_mut().kademlia.store_mut().remove(&key);
802                self.hard_disk_write_error = self.hard_disk_write_error.saturating_add(1);
803                // When there is certain amount of continuous HDD write error,
804                // the hard disk is considered as full, and the node shall be terminated.
805                if self.hard_disk_write_error > MAX_CONTINUOUS_HDD_WRITE_ERROR {
806                    self.send_event(NetworkEvent::TerminateNode {
807                        reason: TerminateNodeReason::HardDiskWriteError,
808                    });
809                }
810            }
811            LocalSwarmCmd::RecordStoreHasKey { key, sender } => {
812                cmd_string = "RecordStoreHasKey";
813                let has_key = self
814                    .swarm
815                    .behaviour_mut()
816                    .kademlia
817                    .store_mut()
818                    .contains(&key);
819                let _ = sender.send(has_key);
820            }
821            LocalSwarmCmd::GetAllLocalRecordAddresses { sender } => {
822                cmd_string = "GetAllLocalRecordAddresses";
823                #[allow(clippy::mutable_key_type)] // for the Bytes in NetworkAddress
824                let addresses = self
825                    .swarm
826                    .behaviour_mut()
827                    .kademlia
828                    .store_mut()
829                    .record_addresses();
830                let _ = sender.send(addresses);
831            }
832            LocalSwarmCmd::GetKBuckets { sender } => {
833                cmd_string = "GetKBuckets";
834                let mut ilog2_kbuckets = BTreeMap::new();
835                for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
836                    let range = kbucket.range();
837                    if let Some(distance) = range.0.ilog2() {
838                        let peers_in_kbucket = kbucket
839                            .iter()
840                            .map(|peer_entry| peer_entry.node.key.into_preimage())
841                            .collect::<Vec<PeerId>>();
842                        let _ = ilog2_kbuckets.insert(distance, peers_in_kbucket);
843                    } else {
844                        // This shall never happen.
845                        error!("bucket is ourself ???!!!");
846                    }
847                }
848                let _ = sender.send(ilog2_kbuckets);
849            }
850            LocalSwarmCmd::GetPeersWithMultiaddr { sender } => {
851                cmd_string = "GetPeersWithMultiAddr";
852                let mut result: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
853                for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
854                    let peers_in_kbucket = kbucket
855                        .iter()
856                        .map(|peer_entry| {
857                            (
858                                peer_entry.node.key.into_preimage(),
859                                peer_entry.node.value.clone().into_vec(),
860                            )
861                        })
862                        .collect::<Vec<(PeerId, Vec<Multiaddr>)>>();
863                    result.extend(peers_in_kbucket);
864                }
865                let _ = sender.send(result);
866            }
867            LocalSwarmCmd::GetCloseGroupLocalPeers { key, sender } => {
868                cmd_string = "GetCloseGroupLocalPeers";
869                let key = key.as_kbucket_key();
870                // calls `kbuckets.closest_keys(key)` internally, which orders the peers by
871                // increasing distance
872                // Note it will return all peers, heance a chop down is required.
873                let closest_peers = self
874                    .swarm
875                    .behaviour_mut()
876                    .kademlia
877                    .get_closest_local_peers(&key)
878                    .map(|peer| peer.into_preimage())
879                    .take(CLOSE_GROUP_SIZE)
880                    .collect();
881
882                let _ = sender.send(closest_peers);
883            }
884            LocalSwarmCmd::GetClosestKLocalPeers { sender } => {
885                cmd_string = "GetClosestKLocalPeers";
886                let _ = sender.send(self.get_closest_k_value_local_peers());
887            }
888            LocalSwarmCmd::GetReplicateCandidates { data_addr, sender } => {
889                cmd_string = "GetReplicateCandidates";
890                let _ = sender.send(self.get_replicate_candidates(&data_addr));
891            }
892            LocalSwarmCmd::GetSwarmLocalState(sender) => {
893                cmd_string = "GetSwarmLocalState";
894                let current_state = SwarmLocalState {
895                    connected_peers: self.swarm.connected_peers().cloned().collect(),
896                    listeners: self.swarm.listeners().cloned().collect(),
897                };
898
899                sender
900                    .send(current_state)
901                    .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
902            }
903            LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
904                cmd_string = "AddPeerToBlockList";
905                self.swarm.behaviour_mut().blocklist.block_peer(peer_id);
906            }
907            LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
908                cmd_string = "RecordNodeIssues";
909                self.record_node_issue(peer_id, issue);
910            }
911            LocalSwarmCmd::IsPeerShunned { target, sender } => {
912                cmd_string = "IsPeerInTrouble";
913                let is_bad = if let Some(peer_id) = target.as_peer_id() {
914                    if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
915                        *is_bad
916                    } else {
917                        false
918                    }
919                } else {
920                    false
921                };
922                let _ = sender.send(is_bad);
923            }
924            LocalSwarmCmd::QuoteVerification { quotes } => {
925                cmd_string = "QuoteVerification";
926                for (peer_id, quote) in quotes {
927                    // Do nothing if already being bad
928                    if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
929                        if *is_bad {
930                            continue;
931                        }
932                    }
933                    self.verify_peer_quote(peer_id, quote);
934                }
935            }
936            LocalSwarmCmd::FetchCompleted((key, record_type)) => {
937                info!(
938                    "Fetch of {record_type:?} {:?} early completed, may have fetched an old version of the record.",
939                    PrettyPrintRecordKey::from(&key)
940                );
941                cmd_string = "FetchCompleted";
942                let new_keys_to_fetch = self
943                    .replication_fetcher
944                    .notify_fetch_early_completed(key, record_type);
945                if !new_keys_to_fetch.is_empty() {
946                    self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
947                }
948            }
949            LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
950                cmd_string = "TriggerIrrelevantRecordCleanup";
951                self.swarm
952                    .behaviour_mut()
953                    .kademlia
954                    .store_mut()
955                    .cleanup_irrelevant_records();
956            }
957            LocalSwarmCmd::AddNetworkDensitySample { distance } => {
958                cmd_string = "AddNetworkDensitySample";
959                self.network_density_samples.add(distance);
960            }
961            LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
962                cmd_string = "NotifyPeerScores";
963                self.replication_fetcher.add_peer_scores(peer_scores);
964            }
965            LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
966                cmd_string = "AddFreshReplicateRecords";
967                let _ = self.add_keys_to_replication_fetcher(holder, keys, true);
968            }
969            LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
970                cmd_string = "NotifyPeerVersion";
971                self.record_node_version(peer, version);
972            }
973        }
974
975        self.log_handling(cmd_string.to_string(), start.elapsed());
976
977        Ok(())
978    }
979
980    fn record_node_version(&mut self, peer_id: PeerId, version: String) {
981        let _ = self.peers_version.insert(peer_id, version);
982
983        // Collect all peers_in_non_full_buckets
984        let mut peers_in_non_full_buckets = vec![];
985        for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
986            let num_entires = kbucket.num_entries();
987            if num_entires >= K_VALUE.get() {
988                continue;
989            } else {
990                let peers_in_kbucket = kbucket
991                    .iter()
992                    .map(|peer_entry| peer_entry.node.key.into_preimage())
993                    .collect::<Vec<PeerId>>();
994                peers_in_non_full_buckets.extend(peers_in_kbucket);
995            }
996        }
997
998        // Ensure all existing node_version records are for those peers_in_non_full_buckets
999        self.peers_version
1000            .retain(|peer_id, _version| peers_in_non_full_buckets.contains(peer_id));
1001    }
1002
1003    pub(crate) fn record_node_issue(&mut self, peer_id: PeerId, issue: NodeIssue) {
1004        info!("Peer {peer_id:?} is reported as having issue {issue:?}");
1005        let (issue_vec, is_bad) = self.bad_nodes.entry(peer_id).or_default();
1006        let mut new_bad_behaviour = None;
1007        let mut is_connection_issue = false;
1008
1009        // If being considered as bad already, skip certain operations
1010        if !(*is_bad) {
1011            // Remove outdated entries
1012            issue_vec.retain(|(_, timestamp)| timestamp.elapsed().as_secs() < 300);
1013
1014            // check if vec is already 10 long, if so, remove the oldest issue
1015            // we only track 10 issues to avoid mem leaks
1016            if issue_vec.len() == 10 {
1017                issue_vec.remove(0);
1018            }
1019
1020            // To avoid being too sensitive, only consider as a new issue
1021            // when after certain while since the last one
1022            let is_new_issue = if let Some((_issue, timestamp)) = issue_vec.last() {
1023                timestamp.elapsed().as_secs() > 10
1024            } else {
1025                true
1026            };
1027
1028            if is_new_issue {
1029                issue_vec.push((issue, Instant::now()));
1030            } else {
1031                return;
1032            }
1033
1034            // Only consider candidate as a bad node when:
1035            //   accumulated THREE same kind issues within certain period
1036            for (issue, _timestamp) in issue_vec.iter() {
1037                let issue_counts = issue_vec
1038                    .iter()
1039                    .filter(|(i, _timestamp)| *issue == *i)
1040                    .count();
1041                if issue_counts >= 3 {
1042                    // If it is a connection issue, we don't need to consider it as a bad node
1043                    if matches!(issue, NodeIssue::ConnectionIssue) {
1044                        is_connection_issue = true;
1045                    }
1046                    // TODO: disable black_list currently.
1047                    //       re-enable once got more statistics from large scaled network
1048                    // else {
1049                    //     *is_bad = true;
1050                    // }
1051                    new_bad_behaviour = Some(issue.clone());
1052                    info!("Peer {peer_id:?} accumulated {issue_counts} times of issue {issue:?}. Consider it as a bad node now.");
1053                    // Once a bad behaviour detected, no point to continue
1054                    break;
1055                }
1056            }
1057        }
1058
1059        // Give the faulty connection node more chances by removing the issue from the list. It is still evicted from
1060        // the routing table.
1061        if is_connection_issue {
1062            issue_vec.retain(|(issue, _timestamp)| !matches!(issue, NodeIssue::ConnectionIssue));
1063            info!("Evicting bad peer {peer_id:?} due to connection issue from RT.");
1064            if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
1065                self.update_on_peer_removal(*dead_peer.node.key.preimage());
1066            }
1067            return;
1068        }
1069
1070        if *is_bad {
1071            info!("Evicting bad peer {peer_id:?} from RT.");
1072            if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
1073                self.update_on_peer_removal(*dead_peer.node.key.preimage());
1074            }
1075
1076            if let Some(bad_behaviour) = new_bad_behaviour {
1077                // inform the bad node about it and add to the blocklist after that (not for connection issues)
1078                self.record_metrics(Marker::PeerConsideredAsBad { bad_peer: &peer_id });
1079
1080                warn!("Peer {peer_id:?} is considered as bad due to {bad_behaviour:?}. Informing the peer and adding to blocklist.");
1081                // response handling
1082                let (tx, rx) = oneshot::channel();
1083                let local_swarm_cmd_sender = self.local_cmd_sender.clone();
1084                tokio::spawn(async move {
1085                    match rx.await {
1086                        Ok(result) => {
1087                            debug!("Got response for Cmd::PeerConsideredAsBad from {peer_id:?} {result:?}");
1088                            if let Err(err) = local_swarm_cmd_sender
1089                                .send(LocalSwarmCmd::AddPeerToBlockList { peer_id })
1090                                .await
1091                            {
1092                                error!("SwarmDriver failed to send LocalSwarmCmd: {err}");
1093                            }
1094                        }
1095                        Err(err) => {
1096                            error!("Failed to get response from one shot channel for Cmd::PeerConsideredAsBad : {err:?}");
1097                        }
1098                    }
1099                });
1100
1101                // request
1102                let request = Request::Cmd(Cmd::PeerConsideredAsBad {
1103                    detected_by: NetworkAddress::from_peer(self.self_peer_id),
1104                    bad_peer: NetworkAddress::from_peer(peer_id),
1105                    bad_behaviour: bad_behaviour.to_string(),
1106                });
1107                self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1108                    req: request,
1109                    peer: peer_id,
1110                    sender: Some(tx),
1111                });
1112            }
1113        }
1114    }
1115
1116    fn verify_peer_quote(&mut self, peer_id: PeerId, quote: PaymentQuote) {
1117        if let Some(history_quote) = self.quotes_history.get(&peer_id) {
1118            if !history_quote.historical_verify(&quote) {
1119                info!("From {peer_id:?}, detected a bad quote {quote:?} against history_quote {history_quote:?}");
1120                self.record_node_issue(peer_id, NodeIssue::BadQuoting);
1121                return;
1122            }
1123
1124            if history_quote.is_newer_than(&quote) {
1125                return;
1126            }
1127        }
1128
1129        let _ = self.quotes_history.insert(peer_id, quote);
1130    }
1131
1132    fn try_interval_replication(&mut self) -> Result<()> {
1133        // Add a last_replication field to track the last time replication was performed
1134        if let Some(last_replication) = self.last_replication {
1135            if last_replication.elapsed() < MIN_REPLICATION_INTERVAL_S {
1136                info!("Skipping replication as minimum interval hasn't elapsed");
1137                return Ok(());
1138            }
1139        }
1140        // Store the current time as the last replication time
1141        self.last_replication = Some(Instant::now());
1142
1143        let self_addr = NetworkAddress::from_peer(self.self_peer_id);
1144        let mut replicate_targets = self.get_replicate_candidates(&self_addr)?;
1145
1146        let now = Instant::now();
1147        self.replication_targets
1148            .retain(|_peer_id, timestamp| *timestamp > now);
1149        // Only carry out replication to peer that not replicated to it recently
1150        replicate_targets.retain(|peer_id| !self.replication_targets.contains_key(peer_id));
1151        if replicate_targets.is_empty() {
1152            return Ok(());
1153        }
1154
1155        let all_records: Vec<_> = self
1156            .swarm
1157            .behaviour_mut()
1158            .kademlia
1159            .store_mut()
1160            .record_addresses_ref()?
1161            .values()
1162            .cloned()
1163            .collect();
1164
1165        if !all_records.is_empty() {
1166            debug!(
1167                "Sending a replication list of {} keys to {replicate_targets:?} ",
1168                all_records.len()
1169            );
1170            let request = Request::Cmd(Cmd::Replicate {
1171                holder: NetworkAddress::from_peer(self.self_peer_id),
1172                keys: all_records
1173                    .into_iter()
1174                    .map(|(addr, val_type, _data_type)| (addr, val_type))
1175                    .collect(),
1176            });
1177            for peer_id in replicate_targets {
1178                self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1179                    req: request.clone(),
1180                    peer: peer_id,
1181                    sender: None,
1182                });
1183
1184                let _ = self
1185                    .replication_targets
1186                    .insert(peer_id, now + REPLICATION_TIMEOUT);
1187            }
1188        }
1189
1190        Ok(())
1191    }
1192
1193    // Replies with in-range replicate candidates
1194    // Fall back to expected_candidates peers if range is too narrow.
1195    // Note that:
1196    //   * For general replication, replicate candidates shall be closest to self, but with wider range
1197    //   * For replicate fresh records, the replicate candidates shall be the closest to data
1198
1199    pub(crate) fn get_replicate_candidates(
1200        &mut self,
1201        target: &NetworkAddress,
1202    ) -> Result<Vec<PeerId>> {
1203        let is_periodic_replicate = target.as_peer_id().is_some();
1204        let expected_candidates = if is_periodic_replicate {
1205            CLOSE_GROUP_SIZE * 2
1206        } else {
1207            CLOSE_GROUP_SIZE
1208        };
1209
1210        // get closest peers from buckets, sorted by increasing distance to the target
1211        let kbucket_key = target.as_kbucket_key();
1212        let closest_k_peers: Vec<PeerId> = self
1213            .swarm
1214            .behaviour_mut()
1215            .kademlia
1216            .get_closest_local_peers(&kbucket_key)
1217            // Map KBucketKey<PeerId> to PeerId.
1218            .map(|key| key.into_preimage())
1219            .take(K_VALUE.get())
1220            .collect();
1221
1222        if let Some(responsible_range) = self
1223            .swarm
1224            .behaviour_mut()
1225            .kademlia
1226            .store_mut()
1227            .get_farthest_replication_distance()?
1228        {
1229            let peers_in_range = get_peers_in_range(&closest_k_peers, target, responsible_range);
1230
1231            if peers_in_range.len() >= expected_candidates {
1232                return Ok(peers_in_range);
1233            }
1234        }
1235
1236        // In case the range is too narrow, fall back to at least expected_candidates peers.
1237        Ok(closest_k_peers
1238            .iter()
1239            .take(expected_candidates)
1240            .cloned()
1241            .collect())
1242    }
1243}
1244
1245/// Returns the nodes that within the defined distance.
1246fn get_peers_in_range(peers: &[PeerId], address: &NetworkAddress, range: Distance) -> Vec<PeerId> {
1247    peers
1248        .iter()
1249        .filter_map(|peer_id| {
1250            if address.distance(&NetworkAddress::from_peer(*peer_id)) <= range {
1251                Some(*peer_id)
1252            } else {
1253                None
1254            }
1255        })
1256        .collect()
1257}