1use crate::time::Instant;
10use crate::{
11 driver::{PendingGetClosestType, SwarmDriver},
12 error::{NetworkError, Result},
13 event::TerminateNodeReason,
14 log_markers::Marker,
15 Addresses, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE,
16};
17use ant_evm::{PaymentQuote, QuotingMetrics};
18use ant_protocol::messages::ConnectionInfo;
19use ant_protocol::{
20 messages::{Cmd, Request, Response},
21 storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
22 NetworkAddress, PrettyPrintRecordKey,
23};
24use libp2p::{
25 kad::{
26 store::{Error as StoreError, RecordStore},
27 KBucketDistance as Distance, Record, RecordKey,
28 },
29 swarm::dial_opts::{DialOpts, PeerCondition},
30 Multiaddr, PeerId,
31};
32use std::{
33 collections::{BTreeMap, HashMap},
34 fmt::Debug,
35 time::Duration,
36};
37use tokio::sync::oneshot;
38use xor_name::XorName;
39
40const MAX_CONTINUOUS_HDD_WRITE_ERROR: usize = 5;
41
42const REPLICATION_TIMEOUT: Duration = Duration::from_secs(45);
44
45const MIN_REPLICATION_INTERVAL_S: Duration = Duration::from_secs(30);
47
48#[derive(Debug, Eq, PartialEq, Clone)]
49pub enum NodeIssue {
50 ConnectionIssue,
52 ReplicationFailure,
54 CloseNodesShunning,
56 BadQuoting,
58 FailedChunkProofCheck,
60}
61
62impl std::fmt::Display for NodeIssue {
63 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64 match self {
65 NodeIssue::ConnectionIssue => write!(f, "CriticalConnectionIssue"),
66 NodeIssue::ReplicationFailure => write!(f, "ReplicationFailure"),
67 NodeIssue::CloseNodesShunning => write!(f, "CloseNodesShunning"),
68 NodeIssue::BadQuoting => write!(f, "BadQuoting"),
69 NodeIssue::FailedChunkProofCheck => write!(f, "FailedChunkProofCheck"),
70 }
71 }
72}
73
74pub enum LocalSwarmCmd {
76 GetPeersWithMultiaddr {
78 sender: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
79 },
80 GetKBuckets {
83 sender: oneshot::Sender<BTreeMap<u32, Vec<PeerId>>>,
84 },
85 GetKCloseLocalPeersToTarget {
87 key: NetworkAddress,
88 sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
89 },
90 GetSwarmLocalState(oneshot::Sender<SwarmLocalState>),
91 RecordStoreHasKey {
93 key: RecordKey,
94 sender: oneshot::Sender<bool>,
95 },
96 GetAllLocalRecordAddresses {
98 sender: oneshot::Sender<HashMap<NetworkAddress, ValidationType>>,
99 },
100 GetLocalRecord {
102 key: RecordKey,
103 sender: oneshot::Sender<Option<Record>>,
104 },
105 GetLocalQuotingMetrics {
108 key: RecordKey,
109 data_type: u32,
110 data_size: usize,
111 sender: oneshot::Sender<(QuotingMetrics, bool)>,
112 },
113 PaymentReceived,
115 PutLocalRecord {
117 record: Record,
118 is_client_put: bool,
119 },
120 RemoveFailedLocalRecord {
123 key: RecordKey,
124 },
125 AddLocalRecordAsStored {
128 key: RecordKey,
129 record_type: ValidationType,
130 data_type: DataTypes,
131 },
132 AddPeerToBlockList {
134 peer_id: PeerId,
135 },
136 RecordNodeIssue {
138 peer_id: PeerId,
139 issue: NodeIssue,
140 },
141 IsPeerShunned {
143 target: NetworkAddress,
144 sender: oneshot::Sender<bool>,
145 },
146 QuoteVerification {
148 quotes: Vec<(PeerId, PaymentQuote)>,
149 },
150 FetchCompleted((RecordKey, ValidationType)),
152 TriggerIntervalReplication,
155 TriggerIrrelevantRecordCleanup,
157 NotifyPeerScores {
159 peer_scores: Vec<(PeerId, bool)>,
160 },
161 AddFreshReplicateRecords {
163 holder: NetworkAddress,
164 keys: Vec<(NetworkAddress, ValidationType)>,
165 },
166 NotifyPeerVersion {
168 peer: PeerId,
169 version: String,
170 },
171 GetNetworkDensity {
173 sender: oneshot::Sender<Option<Distance>>,
174 },
175 RemovePeer {
177 peer: PeerId,
178 },
179}
180
181pub enum NetworkSwarmCmd {
183 GetClosestPeersToAddressFromNetwork {
185 key: NetworkAddress,
186 sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
187 },
188
189 SendRequest {
191 req: Request,
192 peer: PeerId,
193 addrs: Option<Addresses>,
195
196 #[allow(clippy::type_complexity)]
203 sender: Option<oneshot::Sender<Result<(Response, Option<ConnectionInfo>)>>>,
204 },
205 SendResponse {
206 resp: Response,
207 channel: MsgResponder,
208 },
209 DialPeer {
211 peer: PeerId,
212 addrs: Addresses,
213 },
214}
215
216impl Debug for LocalSwarmCmd {
219 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220 match self {
221 LocalSwarmCmd::PutLocalRecord {
222 record,
223 is_client_put,
224 } => {
225 write!(
226 f,
227 "LocalSwarmCmd::PutLocalRecord {{ key: {:?}, is_client_put: {is_client_put:?} }}",
228 PrettyPrintRecordKey::from(&record.key)
229 )
230 }
231 LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
232 write!(
233 f,
234 "LocalSwarmCmd::RemoveFailedLocalRecord {{ key: {:?} }}",
235 PrettyPrintRecordKey::from(key)
236 )
237 }
238 LocalSwarmCmd::AddLocalRecordAsStored {
239 key,
240 record_type,
241 data_type,
242 } => {
243 write!(
244 f,
245 "LocalSwarmCmd::AddLocalRecordAsStored {{ key: {:?}, record_type: {record_type:?}, data_type: {data_type:?} }}",
246 PrettyPrintRecordKey::from(key)
247 )
248 }
249 LocalSwarmCmd::GetKCloseLocalPeersToTarget { key, .. } => {
250 write!(
251 f,
252 "LocalSwarmCmd::GetKCloseLocalPeersToTarget {{ key: {key:?} }}"
253 )
254 }
255 LocalSwarmCmd::GetLocalQuotingMetrics { .. } => {
256 write!(f, "LocalSwarmCmd::GetLocalQuotingMetrics")
257 }
258 LocalSwarmCmd::PaymentReceived => {
259 write!(f, "LocalSwarmCmd::PaymentReceived")
260 }
261 LocalSwarmCmd::GetLocalRecord { key, .. } => {
262 write!(
263 f,
264 "LocalSwarmCmd::GetLocalRecord {{ key: {:?} }}",
265 PrettyPrintRecordKey::from(key)
266 )
267 }
268 LocalSwarmCmd::GetAllLocalRecordAddresses { .. } => {
269 write!(f, "LocalSwarmCmd::GetAllLocalRecordAddresses")
270 }
271 LocalSwarmCmd::GetPeersWithMultiaddr { .. } => {
272 write!(f, "LocalSwarmCmd::GetPeersWithMultiaddr")
273 }
274 LocalSwarmCmd::GetKBuckets { .. } => {
275 write!(f, "LocalSwarmCmd::GetKBuckets")
276 }
277 LocalSwarmCmd::GetSwarmLocalState { .. } => {
278 write!(f, "LocalSwarmCmd::GetSwarmLocalState")
279 }
280 LocalSwarmCmd::RecordStoreHasKey { key, .. } => {
281 write!(
282 f,
283 "LocalSwarmCmd::RecordStoreHasKey {:?}",
284 PrettyPrintRecordKey::from(key)
285 )
286 }
287 LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
288 write!(f, "LocalSwarmCmd::AddPeerToBlockList {peer_id:?}")
289 }
290 LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
291 write!(
292 f,
293 "LocalSwarmCmd::SendNodeStatus peer {peer_id:?}, issue: {issue:?}"
294 )
295 }
296 LocalSwarmCmd::IsPeerShunned { target, .. } => {
297 write!(f, "LocalSwarmCmd::IsPeerInTrouble target: {target:?}")
298 }
299 LocalSwarmCmd::QuoteVerification { quotes } => {
300 write!(
301 f,
302 "LocalSwarmCmd::QuoteVerification of {} quotes",
303 quotes.len()
304 )
305 }
306 LocalSwarmCmd::FetchCompleted((key, record_type)) => {
307 write!(
308 f,
309 "LocalSwarmCmd::FetchCompleted({record_type:?} : {:?})",
310 PrettyPrintRecordKey::from(key)
311 )
312 }
313 LocalSwarmCmd::TriggerIntervalReplication => {
314 write!(f, "LocalSwarmCmd::TriggerIntervalReplication")
315 }
316 LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
317 write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup")
318 }
319 LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
320 write!(f, "LocalSwarmCmd::NotifyPeerScores({peer_scores:?})")
321 }
322 LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
323 write!(
324 f,
325 "LocalSwarmCmd::AddFreshReplicateRecords({holder:?}, {keys:?})"
326 )
327 }
328 LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
329 write!(f, "LocalSwarmCmd::NotifyPeerVersion({peer:?}, {version:?})")
330 }
331 LocalSwarmCmd::GetNetworkDensity { .. } => {
332 write!(f, "LocalSwarmCmd::GetNetworkDensity")
333 }
334 LocalSwarmCmd::RemovePeer { peer } => {
335 write!(f, "LocalSwarmCmd::RemovePeer({peer:?})")
336 }
337 }
338 }
339}
340
341impl Debug for NetworkSwarmCmd {
344 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345 match self {
346 NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, .. } => {
347 write!(f, "NetworkSwarmCmd::GetClosestPeers {{ key: {key:?} }}")
348 }
349 NetworkSwarmCmd::SendResponse { resp, .. } => {
350 write!(f, "NetworkSwarmCmd::SendResponse resp: {resp:?}")
351 }
352 NetworkSwarmCmd::SendRequest { req, peer, .. } => {
353 write!(
354 f,
355 "NetworkSwarmCmd::SendRequest req: {req:?}, peer: {peer:?}"
356 )
357 }
358 NetworkSwarmCmd::DialPeer { peer, .. } => {
359 write!(f, "NetworkSwarmCmd::DialPeer peer: {peer:?}")
360 }
361 }
362 }
363}
364#[derive(Debug, Clone)]
366pub struct SwarmLocalState {
367 pub connected_peers: Vec<PeerId>,
369 pub peers_in_routing_table: usize,
371 pub listeners: Vec<Multiaddr>,
373}
374
375impl SwarmDriver {
376 pub(crate) fn handle_network_cmd(&mut self, cmd: NetworkSwarmCmd) -> Result<(), NetworkError> {
377 let start = Instant::now();
378 let cmd_string;
379 match cmd {
380 NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, sender } => {
381 cmd_string = "GetClosestPeersToAddressFromNetwork";
382 let query_id = self
383 .swarm
384 .behaviour_mut()
385 .kademlia
386 .get_closest_peers(key.as_bytes());
387 let _ = self.pending_get_closest_peers.insert(
388 query_id,
389 (
390 PendingGetClosestType::FunctionCall(sender),
391 Default::default(),
392 ),
393 );
394 }
395 NetworkSwarmCmd::SendRequest {
396 req,
397 peer,
398 addrs,
399 sender,
400 } => {
401 cmd_string = "SendRequest";
402 if peer == *self.swarm.local_peer_id() {
406 trace!("Sending query request to self");
407 if let Request::Query(query) = req {
408 self.send_event(NetworkEvent::QueryRequestReceived {
409 query,
410 channel: MsgResponder::FromSelf(sender),
411 });
412 } else {
413 trace!("Replicate cmd to self received, ignoring");
416 }
417 } else {
418 if let Some(addrs) = addrs {
419 if addrs.0.is_empty() {
421 info!("No addresses for peer {peer:?} to send request. This could cause dial failure if swarm could not find the peer's addrs.");
422 } else {
423 let opts = DialOpts::peer_id(peer).addresses(addrs.0.clone()).build();
424
425 match self.swarm.dial(opts) {
426 Ok(()) => {
427 debug!("Dialing peer {peer:?} for req_resp with address: {addrs:?}",);
428 }
429 Err(err) => {
430 error!("Failed to dial peer {peer:?} for req_resp with address: {addrs:?} error: {err}",);
431 }
432 }
433 }
434 }
435
436 let request_id = self
437 .swarm
438 .behaviour_mut()
439 .request_response
440 .send_request(&peer, req);
441 trace!("Sending request {request_id:?} to peer {peer:?}");
442 let _ = self.pending_requests.insert(request_id, sender);
443
444 trace!("Pending Requests now: {:?}", self.pending_requests.len());
445 }
446 }
447 NetworkSwarmCmd::SendResponse { resp, channel } => {
448 cmd_string = "SendResponse";
449 match channel {
450 MsgResponder::FromSelf(channel) => {
452 trace!("Sending response to self");
453 match channel {
454 Some(channel) => {
455 channel
456 .send(Ok((resp, None)))
457 .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
458 }
459 None => {
460 self.send_event(NetworkEvent::ResponseReceived { res: resp });
463 }
464 }
465 }
466 MsgResponder::FromPeer(channel) => {
467 self.swarm
468 .behaviour_mut()
469 .request_response
470 .send_response(channel, resp)
471 .map_err(NetworkError::OutgoingResponseDropped)?;
472 }
473 }
474 }
475 NetworkSwarmCmd::DialPeer { peer, addrs } => {
476 cmd_string = "DialPeer";
477 let opts = DialOpts::peer_id(peer)
478 .condition(PeerCondition::NotDialing)
480 .addresses(addrs.0.clone())
481 .build();
482
483 match self.swarm.dial(opts) {
484 Ok(()) => {
485 info!("Manual dialing peer {peer:?} with address: {addrs:?}",);
486 }
487 Err(err) => {
488 error!("Failed to manual dial peer {peer:?} with address: {addrs:?} error: {err}",);
489 }
490 }
491 }
492 }
493
494 self.log_handling(cmd_string.to_string(), start.elapsed());
495
496 Ok(())
497 }
498 pub(crate) fn handle_local_cmd(&mut self, cmd: LocalSwarmCmd) -> Result<(), NetworkError> {
499 let start = Instant::now();
500 let mut cmd_string;
501 match cmd {
502 LocalSwarmCmd::TriggerIntervalReplication => {
503 cmd_string = "TriggerIntervalReplication";
504 self.try_interval_replication()?;
505 }
506 LocalSwarmCmd::GetLocalQuotingMetrics {
507 key,
508 data_type,
509 data_size,
510 sender,
511 } => {
512 cmd_string = "GetLocalQuotingMetrics";
513 let kbucket_status = self.get_kbuckets_status();
514 self.update_on_kbucket_status(&kbucket_status);
515 let (quoting_metrics, is_already_stored) = self
516 .swarm
517 .behaviour_mut()
518 .kademlia
519 .store_mut()
520 .quoting_metrics(
521 &key,
522 data_type,
523 data_size,
524 Some(kbucket_status.estimated_network_size as u64),
525 );
526 self.record_metrics(Marker::QuotingMetrics {
527 quoting_metrics: "ing_metrics,
528 });
529
530 let mut bad_nodes: Vec<_> = self
533 .bad_nodes
534 .iter()
535 .filter_map(|(peer_id, (_issue_list, is_bad))| {
536 if *is_bad {
537 Some(NetworkAddress::from(*peer_id))
538 } else {
539 None
540 }
541 })
542 .collect();
543
544 let kbucket_key = NetworkAddress::from(&key).as_kbucket_key();
546 let closest_peers: Vec<_> = self
547 .swarm
548 .behaviour_mut()
549 .kademlia
550 .get_closest_local_peers(&kbucket_key)
551 .map(|peer| peer.into_preimage())
552 .take(CLOSE_GROUP_SIZE)
553 .collect();
554 if closest_peers.len() >= CLOSE_GROUP_SIZE {
556 let boundary_peer = closest_peers[CLOSE_GROUP_SIZE - 1];
557 let key_address = NetworkAddress::from(&key);
558 let boundary_distance =
559 key_address.distance(&NetworkAddress::from(boundary_peer));
560 bad_nodes
561 .retain(|peer_addr| key_address.distance(peer_addr) < boundary_distance);
562 }
563
564 let _res = sender.send((quoting_metrics, is_already_stored));
565 }
566 LocalSwarmCmd::PaymentReceived => {
567 cmd_string = "PaymentReceived";
568 self.swarm
569 .behaviour_mut()
570 .kademlia
571 .store_mut()
572 .payment_received();
573 }
574 LocalSwarmCmd::GetLocalRecord { key, sender } => {
575 cmd_string = "GetLocalRecord";
576 let record = self
577 .swarm
578 .behaviour_mut()
579 .kademlia
580 .store_mut()
581 .get(&key)
582 .map(|rec| rec.into_owned());
583 let _ = sender.send(record);
584 }
585
586 LocalSwarmCmd::PutLocalRecord {
587 record,
588 is_client_put,
589 } => {
590 cmd_string = "PutLocalRecord";
591 let key = record.key.clone();
592 let record_key = PrettyPrintRecordKey::from(&key);
593
594 let record_type = match RecordHeader::from_record(&record) {
595 Ok(record_header) => {
596 match record_header.kind {
597 RecordKind::DataOnly(DataTypes::Chunk) => ValidationType::Chunk,
598 RecordKind::DataOnly(_) => {
599 let content_hash = XorName::from_content(&record.value);
600 ValidationType::NonChunk(content_hash)
601 }
602 RecordKind::DataWithPayment(_) => {
603 error!("Record {record_key:?} with payment shall not be stored locally.");
604 return Err(NetworkError::InCorrectRecordHeader);
605 }
606 }
607 }
608 Err(err) => {
609 error!("For record {record_key:?}, failed to parse record_header {err:?}");
610 return Err(NetworkError::InCorrectRecordHeader);
611 }
612 };
613
614 let result = self
615 .swarm
616 .behaviour_mut()
617 .kademlia
618 .store_mut()
619 .put_verified(record, record_type.clone(), is_client_put);
620
621 match result {
622 Ok(_) => {
623 }
630 Err(StoreError::MaxRecords) => {
631 let farthest = self
634 .swarm
635 .behaviour_mut()
636 .kademlia
637 .store_mut()
638 .get_farthest();
639 self.replication_fetcher.set_farthest_on_full(farthest);
640 }
641 Err(_) => {
642 }
645 }
646
647 let new_keys_to_fetch = self
652 .replication_fetcher
653 .notify_about_new_put(key.clone(), record_type);
654
655 if !new_keys_to_fetch.is_empty() {
656 self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
657 }
658
659 if let Some(distance) = self
662 .swarm
663 .behaviour_mut()
664 .kademlia
665 .store_mut()
666 .get_responsible_distance_range()
667 {
668 self.replication_fetcher
669 .set_replication_distance_range(distance);
670 }
671
672 if let Err(err) = result {
673 error!("Can't store verified record {record_key:?} locally: {err:?}");
674 cmd_string = "PutLocalRecord error";
675 self.log_handling(cmd_string.to_string(), start.elapsed());
676 return Err(err.into());
677 };
678 }
679 LocalSwarmCmd::AddLocalRecordAsStored {
680 key,
681 record_type,
682 data_type,
683 } => {
684 cmd_string = "AddLocalRecordAsStored";
685 self.swarm
686 .behaviour_mut()
687 .kademlia
688 .store_mut()
689 .mark_as_stored(key, record_type, data_type);
690 self.hard_disk_write_error = 0;
692 }
693 LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
694 info!("Removing Record locally, for {key:?}");
695 cmd_string = "RemoveFailedLocalRecord";
696 self.swarm.behaviour_mut().kademlia.store_mut().remove(&key);
697 self.hard_disk_write_error = self.hard_disk_write_error.saturating_add(1);
698 if self.hard_disk_write_error > MAX_CONTINUOUS_HDD_WRITE_ERROR {
701 self.send_event(NetworkEvent::TerminateNode {
702 reason: TerminateNodeReason::HardDiskWriteError,
703 });
704 }
705 }
706 LocalSwarmCmd::RecordStoreHasKey { key, sender } => {
707 cmd_string = "RecordStoreHasKey";
708 let has_key = self
709 .swarm
710 .behaviour_mut()
711 .kademlia
712 .store_mut()
713 .contains(&key);
714 let _ = sender.send(has_key);
715 }
716 LocalSwarmCmd::GetAllLocalRecordAddresses { sender } => {
717 cmd_string = "GetAllLocalRecordAddresses";
718 #[allow(clippy::mutable_key_type)] let addresses = self
720 .swarm
721 .behaviour_mut()
722 .kademlia
723 .store_mut()
724 .record_addresses();
725 let _ = sender.send(addresses);
726 }
727 LocalSwarmCmd::GetKBuckets { sender } => {
728 cmd_string = "GetKBuckets";
729 let mut ilog2_kbuckets = BTreeMap::new();
730 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
731 let range = kbucket.range();
732 if let Some(distance) = range.0.ilog2() {
733 let peers_in_kbucket = kbucket
734 .iter()
735 .map(|peer_entry| peer_entry.node.key.into_preimage())
736 .collect::<Vec<PeerId>>();
737 let _ = ilog2_kbuckets.insert(distance, peers_in_kbucket);
738 } else {
739 error!("bucket is ourself ???!!!");
741 }
742 }
743 let _ = sender.send(ilog2_kbuckets);
744 }
745 LocalSwarmCmd::GetPeersWithMultiaddr { sender } => {
746 cmd_string = "GetPeersWithMultiAddr";
747 let mut result: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
748 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
749 let peers_in_kbucket = kbucket
750 .iter()
751 .map(|peer_entry| {
752 (
753 peer_entry.node.key.into_preimage(),
754 peer_entry.node.value.clone().into_vec(),
755 )
756 })
757 .collect::<Vec<(PeerId, Vec<Multiaddr>)>>();
758 result.extend(peers_in_kbucket);
759 }
760 let _ = sender.send(result);
761 }
762 LocalSwarmCmd::GetKCloseLocalPeersToTarget { key, sender } => {
763 cmd_string = "GetKCloseLocalPeersToTarget";
764 let closest_peers = self.get_closest_k_local_peers_to_target(&key, true);
765
766 let _ = sender.send(closest_peers);
767 }
768 LocalSwarmCmd::GetSwarmLocalState(sender) => {
769 cmd_string = "GetSwarmLocalState";
770 let current_state = SwarmLocalState {
771 connected_peers: self.swarm.connected_peers().cloned().collect(),
772 peers_in_routing_table: self.peers_in_rt,
773 listeners: self.swarm.listeners().cloned().collect(),
774 };
775
776 sender
777 .send(current_state)
778 .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
779 }
780 LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
781 cmd_string = "AddPeerToBlockList";
782 self.swarm.behaviour_mut().blocklist.block_peer(peer_id);
783 }
784 LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
785 cmd_string = "RecordNodeIssues";
786 self.record_node_issue(peer_id, issue);
787 }
788 LocalSwarmCmd::IsPeerShunned { target, sender } => {
789 cmd_string = "IsPeerInTrouble";
790 let is_bad = if let Some(peer_id) = target.as_peer_id() {
791 if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
792 *is_bad
793 } else {
794 false
795 }
796 } else {
797 false
798 };
799 let _ = sender.send(is_bad);
800 }
801 LocalSwarmCmd::QuoteVerification { quotes } => {
802 cmd_string = "QuoteVerification";
803 for (peer_id, quote) in quotes {
804 if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
806 if *is_bad {
807 continue;
808 }
809 }
810 self.verify_peer_quote(peer_id, quote);
811 }
812 }
813 LocalSwarmCmd::FetchCompleted((key, record_type)) => {
814 info!(
815 "Fetch of {record_type:?} {:?} early completed, may have fetched an old version of the record.",
816 PrettyPrintRecordKey::from(&key)
817 );
818 cmd_string = "FetchCompleted";
819 let new_keys_to_fetch = self
820 .replication_fetcher
821 .notify_fetch_early_completed(key, record_type);
822 if !new_keys_to_fetch.is_empty() {
823 self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
824 }
825 }
826 LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
827 cmd_string = "TriggerIrrelevantRecordCleanup";
828 self.swarm
829 .behaviour_mut()
830 .kademlia
831 .store_mut()
832 .cleanup_irrelevant_records();
833 }
834 LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
835 cmd_string = "NotifyPeerScores";
836 self.replication_fetcher.add_peer_scores(peer_scores);
837 }
838 LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
839 cmd_string = "AddFreshReplicateRecords";
840 let _ = self.add_keys_to_replication_fetcher(holder, keys, true);
841 }
842 LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
843 cmd_string = "NotifyPeerVersion";
844 self.record_node_version(peer, version);
845 }
846 LocalSwarmCmd::GetNetworkDensity { sender } => {
847 cmd_string = "GetNetworkDensity";
848 let density = self
849 .swarm
850 .behaviour_mut()
851 .kademlia
852 .store_mut()
853 .get_responsible_distance_range();
854 let _ = sender.send(density);
855 }
856 LocalSwarmCmd::RemovePeer { peer } => {
857 cmd_string = "RemovePeer";
858 if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer) {
859 self.update_on_peer_removal(*dead_peer.node.key.preimage());
860 }
861 }
862 }
863
864 self.log_handling(cmd_string.to_string(), start.elapsed());
865
866 Ok(())
867 }
868
869 fn record_node_version(&mut self, peer_id: PeerId, version: String) {
870 let _ = self.peers_version.insert(peer_id, version);
871 }
872
873 pub(crate) fn record_node_issue(&mut self, peer_id: PeerId, issue: NodeIssue) {
874 info!("Peer {peer_id:?} is reported as having issue {issue:?}");
875 let (issue_vec, is_bad) = self.bad_nodes.entry(peer_id).or_default();
876 let mut new_bad_behaviour = None;
877 let mut is_connection_issue = false;
878
879 if !(*is_bad) {
881 issue_vec.retain(|(_, timestamp)| timestamp.elapsed().as_secs() < 300);
883
884 if issue_vec.len() == 10 {
887 issue_vec.remove(0);
888 }
889
890 let is_new_issue = if let Some((_issue, timestamp)) = issue_vec.last() {
893 timestamp.elapsed().as_secs() > 10
894 } else {
895 true
896 };
897
898 if is_new_issue {
899 issue_vec.push((issue, Instant::now()));
900 } else {
901 return;
902 }
903
904 for (issue, _timestamp) in issue_vec.iter() {
907 let issue_counts = issue_vec
908 .iter()
909 .filter(|(i, _timestamp)| *issue == *i)
910 .count();
911 if issue_counts >= 3 {
912 if matches!(issue, NodeIssue::ConnectionIssue) {
914 is_connection_issue = true;
915 }
916 new_bad_behaviour = Some(issue.clone());
922 info!("Peer {peer_id:?} accumulated {issue_counts} times of issue {issue:?}. Consider it as a bad node now.");
923 break;
925 }
926 }
927 }
928
929 if is_connection_issue {
932 issue_vec.retain(|(issue, _timestamp)| !matches!(issue, NodeIssue::ConnectionIssue));
933 info!("Evicting bad peer {peer_id:?} due to connection issue from RT.");
934 if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
935 self.update_on_peer_removal(*dead_peer.node.key.preimage());
936 }
937 return;
938 }
939
940 if *is_bad {
941 info!("Evicting bad peer {peer_id:?} from RT.");
942 let addrs = if let Some(dead_peer) =
943 self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id)
944 {
945 self.update_on_peer_removal(*dead_peer.node.key.preimage());
946 Addresses(dead_peer.node.value.into_vec())
947 } else {
948 Addresses(Vec::new())
949 };
950
951 if let Some(bad_behaviour) = new_bad_behaviour {
952 self.record_metrics(Marker::PeerConsideredAsBad { bad_peer: &peer_id });
954
955 warn!("Peer {peer_id:?} is considered as bad due to {bad_behaviour:?}. Informing the peer and adding to blocklist.");
956 let (tx, rx) = oneshot::channel();
958 let local_swarm_cmd_sender = self.local_cmd_sender.clone();
959 tokio::spawn(async move {
960 match rx.await {
961 Ok(result) => {
962 debug!("Got response for Cmd::PeerConsideredAsBad from {peer_id:?} {result:?}");
963 if let Err(err) = local_swarm_cmd_sender
964 .send(LocalSwarmCmd::AddPeerToBlockList { peer_id })
965 .await
966 {
967 error!("SwarmDriver failed to send LocalSwarmCmd: {err}");
968 }
969 }
970 Err(err) => {
971 error!("Failed to get response from one shot channel for Cmd::PeerConsideredAsBad : {err:?}");
972 }
973 }
974 });
975
976 let request = Request::Cmd(Cmd::PeerConsideredAsBad {
978 detected_by: NetworkAddress::from(self.self_peer_id),
979 bad_peer: NetworkAddress::from(peer_id),
980 bad_behaviour: bad_behaviour.to_string(),
981 });
982 self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
983 req: request,
984 addrs: Some(addrs),
985 peer: peer_id,
986 sender: Some(tx),
987 });
988 }
989 }
990 }
991
992 fn verify_peer_quote(&mut self, peer_id: PeerId, quote: PaymentQuote) {
993 if let Some(history_quote) = self.quotes_history.get(&peer_id) {
994 if !history_quote.historical_verify("e) {
995 info!("From {peer_id:?}, detected a bad quote {quote:?} against history_quote {history_quote:?}");
996 self.record_node_issue(peer_id, NodeIssue::BadQuoting);
997 return;
998 }
999
1000 if history_quote.is_newer_than("e) {
1001 return;
1002 }
1003 }
1004
1005 let _ = self.quotes_history.insert(peer_id, quote);
1006 }
1007
1008 fn try_interval_replication(&mut self) -> Result<()> {
1009 if let Some(last_replication) = self.last_replication {
1011 if last_replication.elapsed() < MIN_REPLICATION_INTERVAL_S {
1012 info!("Skipping replication as minimum interval hasn't elapsed");
1013 return Ok(());
1014 }
1015 }
1016 self.last_replication = Some(Instant::now());
1018
1019 let self_addr = NetworkAddress::from(self.self_peer_id);
1020 let mut replicate_targets = self.get_replicate_candidates(&self_addr)?;
1021
1022 let now = Instant::now();
1023 self.replication_targets
1024 .retain(|_peer_id, timestamp| *timestamp > now);
1025 replicate_targets
1027 .retain(|(peer_id, _addresses)| !self.replication_targets.contains_key(peer_id));
1028 if replicate_targets.is_empty() {
1029 return Ok(());
1030 }
1031
1032 let all_records: Vec<_> = self
1033 .swarm
1034 .behaviour_mut()
1035 .kademlia
1036 .store_mut()
1037 .record_addresses_ref()
1038 .values()
1039 .cloned()
1040 .collect();
1041
1042 if !all_records.is_empty() {
1043 debug!(
1044 "Sending a replication list of {} keys to {replicate_targets:?} ",
1045 all_records.len()
1046 );
1047 let request = Request::Cmd(Cmd::Replicate {
1048 holder: NetworkAddress::from(self.self_peer_id),
1049 keys: all_records
1050 .into_iter()
1051 .map(|(addr, val_type, _data_type)| (addr, val_type))
1052 .collect(),
1053 });
1054 for (peer_id, _addrs) in replicate_targets {
1055 self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1056 req: request.clone(),
1057 peer: peer_id,
1058 addrs: None,
1060 sender: None,
1061 });
1062
1063 let _ = self
1064 .replication_targets
1065 .insert(peer_id, now + REPLICATION_TIMEOUT);
1066 }
1067 }
1068
1069 Ok(())
1070 }
1071
1072 pub(crate) fn get_replicate_candidates(
1078 &mut self,
1079 target: &NetworkAddress,
1080 ) -> Result<Vec<(PeerId, Addresses)>> {
1081 let is_periodic_replicate = target.as_peer_id().is_some();
1082 let expected_candidates = if is_periodic_replicate {
1083 CLOSE_GROUP_SIZE * 2
1084 } else {
1085 CLOSE_GROUP_SIZE
1086 };
1087
1088 let closest_k_peers = self.get_closest_k_local_peers_to_target(target, false);
1090
1091 if let Some(responsible_range) = self
1092 .swarm
1093 .behaviour_mut()
1094 .kademlia
1095 .store_mut()
1096 .get_responsible_distance_range()
1097 {
1098 let peers_in_range = get_peers_in_range(&closest_k_peers, target, responsible_range);
1099
1100 if peers_in_range.len() >= expected_candidates {
1101 return Ok(peers_in_range);
1102 }
1103 }
1104
1105 Ok(closest_k_peers
1107 .iter()
1108 .take(expected_candidates)
1109 .cloned()
1110 .collect())
1111 }
1112}
1113
1114fn get_peers_in_range(
1116 peers: &[(PeerId, Addresses)],
1117 address: &NetworkAddress,
1118 range: Distance,
1119) -> Vec<(PeerId, Addresses)> {
1120 peers
1121 .iter()
1122 .filter_map(|(peer_id, addresses)| {
1123 if address.distance(&NetworkAddress::from(*peer_id)) <= range {
1124 Some((*peer_id, addresses.clone()))
1125 } else {
1126 None
1127 }
1128 })
1129 .collect()
1130}