1use crate::{
10 config::GetRecordCfg,
11 driver::{PendingGetClosestType, SwarmDriver},
12 error::{NetworkError, Result},
13 event::TerminateNodeReason,
14 log_markers::Marker,
15 GetRecordError, MsgResponder, NetworkEvent, ResponseQuorum, CLOSE_GROUP_SIZE,
16};
17use ant_evm::{PaymentQuote, QuotingMetrics};
18use ant_protocol::{
19 messages::{Cmd, Request, Response},
20 storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
21 NetworkAddress, PrettyPrintRecordKey,
22};
23use libp2p::{
24 kad::{
25 store::{Error as StoreError, RecordStore},
26 KBucketDistance as Distance, Record, RecordKey, K_VALUE,
27 },
28 Multiaddr, PeerId,
29};
30use std::{
31 collections::{BTreeMap, HashMap},
32 fmt::Debug,
33 time::Duration,
34};
35use tokio::sync::oneshot;
36use xor_name::XorName;
37
38use crate::time::Instant;
39
40const MAX_CONTINUOUS_HDD_WRITE_ERROR: usize = 5;
41
42const REPLICATION_TIMEOUT: Duration = Duration::from_secs(45);
44
45const MIN_REPLICATION_INTERVAL_S: Duration = Duration::from_secs(30);
47
48#[derive(Debug, Eq, PartialEq, Clone)]
49pub enum NodeIssue {
50 ConnectionIssue,
52 ReplicationFailure,
54 CloseNodesShunning,
56 BadQuoting,
58 FailedChunkProofCheck,
60}
61
62impl std::fmt::Display for NodeIssue {
63 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64 match self {
65 NodeIssue::ConnectionIssue => write!(f, "CriticalConnectionIssue"),
66 NodeIssue::ReplicationFailure => write!(f, "ReplicationFailure"),
67 NodeIssue::CloseNodesShunning => write!(f, "CloseNodesShunning"),
68 NodeIssue::BadQuoting => write!(f, "BadQuoting"),
69 NodeIssue::FailedChunkProofCheck => write!(f, "FailedChunkProofCheck"),
70 }
71 }
72}
73
74pub enum LocalSwarmCmd {
76 GetPeersWithMultiaddr {
78 sender: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
79 },
80 GetKBuckets {
83 sender: oneshot::Sender<BTreeMap<u32, Vec<PeerId>>>,
84 },
85 GetReplicateCandidates {
88 data_addr: NetworkAddress,
89 sender: oneshot::Sender<Result<Vec<PeerId>>>,
90 },
91 GetClosestKLocalPeers {
94 sender: oneshot::Sender<Vec<PeerId>>,
95 },
96 GetCloseGroupLocalPeers {
98 key: NetworkAddress,
99 sender: oneshot::Sender<Vec<PeerId>>,
100 },
101 GetSwarmLocalState(oneshot::Sender<SwarmLocalState>),
102 RecordStoreHasKey {
104 key: RecordKey,
105 sender: oneshot::Sender<Result<bool>>,
106 },
107 GetAllLocalRecordAddresses {
109 sender: oneshot::Sender<Result<HashMap<NetworkAddress, ValidationType>>>,
110 },
111 GetLocalRecord {
113 key: RecordKey,
114 sender: oneshot::Sender<Option<Record>>,
115 },
116 GetLocalQuotingMetrics {
119 key: RecordKey,
120 data_type: u32,
121 data_size: usize,
122 sender: oneshot::Sender<Result<(QuotingMetrics, bool)>>,
123 },
124 PaymentReceived,
126 PutLocalRecord {
128 record: Record,
129 is_client_put: bool,
130 },
131 RemoveFailedLocalRecord {
134 key: RecordKey,
135 },
136 AddLocalRecordAsStored {
139 key: RecordKey,
140 record_type: ValidationType,
141 data_type: DataTypes,
142 },
143 AddPeerToBlockList {
145 peer_id: PeerId,
146 },
147 RecordNodeIssue {
149 peer_id: PeerId,
150 issue: NodeIssue,
151 },
152 IsPeerShunned {
154 target: NetworkAddress,
155 sender: oneshot::Sender<bool>,
156 },
157 QuoteVerification {
159 quotes: Vec<(PeerId, PaymentQuote)>,
160 },
161 FetchCompleted((RecordKey, ValidationType)),
163 TriggerIntervalReplication,
166 TriggerIrrelevantRecordCleanup,
168 AddNetworkDensitySample {
170 distance: Distance,
171 },
172 NotifyPeerScores {
174 peer_scores: Vec<(PeerId, bool)>,
175 },
176 AddFreshReplicateRecords {
178 holder: NetworkAddress,
179 keys: Vec<(NetworkAddress, ValidationType)>,
180 },
181 NotifyPeerVersion {
183 peer: PeerId,
184 version: String,
185 },
186}
187
188pub enum NetworkSwarmCmd {
190 GetClosestPeersToAddressFromNetwork {
192 key: NetworkAddress,
193 sender: oneshot::Sender<Vec<PeerId>>,
194 },
195
196 SendRequest {
198 req: Request,
199 peer: PeerId,
200
201 sender: Option<oneshot::Sender<Result<Response>>>,
208 },
209 SendResponse {
210 resp: Response,
211 channel: MsgResponder,
212 },
213
214 GetNetworkRecord {
216 key: RecordKey,
217 sender: oneshot::Sender<std::result::Result<Record, GetRecordError>>,
218 cfg: GetRecordCfg,
219 },
220
221 PutRecord {
223 record: Record,
224 sender: oneshot::Sender<Result<()>>,
225 quorum: ResponseQuorum,
226 },
227 PutRecordTo {
229 peers: Vec<PeerId>,
230 record: Record,
231 sender: oneshot::Sender<Result<()>>,
232 quorum: ResponseQuorum,
233 },
234}
235
236impl Debug for LocalSwarmCmd {
239 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 match self {
241 LocalSwarmCmd::PutLocalRecord {
242 record,
243 is_client_put,
244 } => {
245 write!(
246 f,
247 "LocalSwarmCmd::PutLocalRecord {{ key: {:?}, is_client_put: {is_client_put:?} }}",
248 PrettyPrintRecordKey::from(&record.key)
249 )
250 }
251 LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
252 write!(
253 f,
254 "LocalSwarmCmd::RemoveFailedLocalRecord {{ key: {:?} }}",
255 PrettyPrintRecordKey::from(key)
256 )
257 }
258 LocalSwarmCmd::AddLocalRecordAsStored {
259 key,
260 record_type,
261 data_type,
262 } => {
263 write!(
264 f,
265 "LocalSwarmCmd::AddLocalRecordAsStored {{ key: {:?}, record_type: {record_type:?}, data_type: {data_type:?} }}",
266 PrettyPrintRecordKey::from(key)
267 )
268 }
269 LocalSwarmCmd::GetReplicateCandidates { .. } => {
270 write!(f, "LocalSwarmCmd::GetReplicateCandidates")
271 }
272 LocalSwarmCmd::GetClosestKLocalPeers { .. } => {
273 write!(f, "LocalSwarmCmd::GetClosestKLocalPeers")
274 }
275 LocalSwarmCmd::GetCloseGroupLocalPeers { key, .. } => {
276 write!(
277 f,
278 "LocalSwarmCmd::GetCloseGroupLocalPeers {{ key: {key:?} }}"
279 )
280 }
281 LocalSwarmCmd::GetLocalQuotingMetrics { .. } => {
282 write!(f, "LocalSwarmCmd::GetLocalQuotingMetrics")
283 }
284 LocalSwarmCmd::PaymentReceived => {
285 write!(f, "LocalSwarmCmd::PaymentReceived")
286 }
287 LocalSwarmCmd::GetLocalRecord { key, .. } => {
288 write!(
289 f,
290 "LocalSwarmCmd::GetLocalRecord {{ key: {:?} }}",
291 PrettyPrintRecordKey::from(key)
292 )
293 }
294 LocalSwarmCmd::GetAllLocalRecordAddresses { .. } => {
295 write!(f, "LocalSwarmCmd::GetAllLocalRecordAddresses")
296 }
297 LocalSwarmCmd::GetPeersWithMultiaddr { .. } => {
298 write!(f, "LocalSwarmCmd::GetPeersWithMultiaddr")
299 }
300 LocalSwarmCmd::GetKBuckets { .. } => {
301 write!(f, "LocalSwarmCmd::GetKBuckets")
302 }
303 LocalSwarmCmd::GetSwarmLocalState { .. } => {
304 write!(f, "LocalSwarmCmd::GetSwarmLocalState")
305 }
306 LocalSwarmCmd::RecordStoreHasKey { key, .. } => {
307 write!(
308 f,
309 "LocalSwarmCmd::RecordStoreHasKey {:?}",
310 PrettyPrintRecordKey::from(key)
311 )
312 }
313 LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
314 write!(f, "LocalSwarmCmd::AddPeerToBlockList {peer_id:?}")
315 }
316 LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
317 write!(
318 f,
319 "LocalSwarmCmd::SendNodeStatus peer {peer_id:?}, issue: {issue:?}"
320 )
321 }
322 LocalSwarmCmd::IsPeerShunned { target, .. } => {
323 write!(f, "LocalSwarmCmd::IsPeerInTrouble target: {target:?}")
324 }
325 LocalSwarmCmd::QuoteVerification { quotes } => {
326 write!(
327 f,
328 "LocalSwarmCmd::QuoteVerification of {} quotes",
329 quotes.len()
330 )
331 }
332 LocalSwarmCmd::FetchCompleted((key, record_type)) => {
333 write!(
334 f,
335 "LocalSwarmCmd::FetchCompleted({record_type:?} : {:?})",
336 PrettyPrintRecordKey::from(key)
337 )
338 }
339 LocalSwarmCmd::TriggerIntervalReplication => {
340 write!(f, "LocalSwarmCmd::TriggerIntervalReplication")
341 }
342 LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
343 write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup")
344 }
345 LocalSwarmCmd::AddNetworkDensitySample { distance } => {
346 write!(f, "LocalSwarmCmd::AddNetworkDensitySample({distance:?})")
347 }
348 LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
349 write!(f, "LocalSwarmCmd::NotifyPeerScores({peer_scores:?})")
350 }
351 LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
352 write!(
353 f,
354 "LocalSwarmCmd::AddFreshReplicateRecords({holder:?}, {keys:?})"
355 )
356 }
357 LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
358 write!(f, "LocalSwarmCmd::NotifyPeerVersion({peer:?}, {version:?})")
359 }
360 }
361 }
362}
363
364impl Debug for NetworkSwarmCmd {
367 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368 match self {
369 NetworkSwarmCmd::GetNetworkRecord { key, cfg, .. } => {
370 write!(
371 f,
372 "NetworkSwarmCmd::GetNetworkRecord {{ key: {:?}, cfg: {cfg:?}",
373 PrettyPrintRecordKey::from(key)
374 )
375 }
376 NetworkSwarmCmd::PutRecord { record, .. } => {
377 write!(
378 f,
379 "NetworkSwarmCmd::PutRecord {{ key: {:?} }}",
380 PrettyPrintRecordKey::from(&record.key)
381 )
382 }
383 NetworkSwarmCmd::PutRecordTo { peers, record, .. } => {
384 write!(
385 f,
386 "NetworkSwarmCmd::PutRecordTo {{ peers: {peers:?}, key: {:?} }}",
387 PrettyPrintRecordKey::from(&record.key)
388 )
389 }
390 NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, .. } => {
391 write!(f, "NetworkSwarmCmd::GetClosestPeers {{ key: {key:?} }}")
392 }
393 NetworkSwarmCmd::SendResponse { resp, .. } => {
394 write!(f, "NetworkSwarmCmd::SendResponse resp: {resp:?}")
395 }
396 NetworkSwarmCmd::SendRequest { req, peer, .. } => {
397 write!(
398 f,
399 "NetworkSwarmCmd::SendRequest req: {req:?}, peer: {peer:?}"
400 )
401 }
402 }
403 }
404}
405#[derive(Debug, Clone)]
407pub struct SwarmLocalState {
408 pub connected_peers: Vec<PeerId>,
410 pub listeners: Vec<Multiaddr>,
412}
413
414impl SwarmDriver {
415 pub(crate) fn handle_network_cmd(&mut self, cmd: NetworkSwarmCmd) -> Result<(), NetworkError> {
416 let start = Instant::now();
417 let cmd_string;
418 match cmd {
419 NetworkSwarmCmd::GetNetworkRecord { key, sender, cfg } => {
420 cmd_string = "GetNetworkRecord";
421
422 for (pending_query, (inflight_record_query_key, senders, _, _)) in
423 self.pending_get_record.iter_mut()
424 {
425 if *inflight_record_query_key == key {
426 debug!(
427 "GetNetworkRecord for {:?} is already in progress. Adding sender to {pending_query:?}",
428 PrettyPrintRecordKey::from(&key)
429 );
430 senders.push(sender);
431
432 return Ok(());
434 }
435 }
436
437 let query_id = self.swarm.behaviour_mut().kademlia.get_record(key.clone());
438
439 debug!(
440 "Record {:?} with task {query_id:?} expected to be held by {:?}",
441 PrettyPrintRecordKey::from(&key),
442 cfg.expected_holders
443 );
444
445 if self
446 .pending_get_record
447 .insert(query_id, (key, vec![sender], Default::default(), cfg))
448 .is_some()
449 {
450 warn!("An existing get_record task {query_id:?} got replaced");
451 }
452 let total_records: usize = self
455 .pending_get_record
456 .iter()
457 .map(|(_, (_, _, result_map, _))| result_map.len())
458 .sum();
459 info!("We now have {} pending get record attempts and cached {total_records} fetched copies",
460 self.pending_get_record.len());
461 }
462 NetworkSwarmCmd::PutRecord {
463 record,
464 sender,
465 quorum,
466 } => {
467 cmd_string = "PutRecord";
468 let record_key = PrettyPrintRecordKey::from(&record.key).into_owned();
469 debug!(
470 "Putting record sized: {:?} to network {:?}",
471 record.value.len(),
472 record_key
473 );
474 let res = match self
475 .swarm
476 .behaviour_mut()
477 .kademlia
478 .put_record(record, quorum.get_kad_quorum())
479 {
480 Ok(request_id) => {
481 debug!("Sent record {record_key:?} to network. Request id: {request_id:?} to network");
482 Ok(())
483 }
484 Err(error) => {
485 error!("Error sending record {record_key:?} to network");
486 Err(NetworkError::from(error))
487 }
488 };
489
490 if let Err(err) = sender.send(res) {
491 error!("Could not send response to PutRecord cmd: {:?}", err);
492 }
493 }
494 NetworkSwarmCmd::PutRecordTo {
495 peers,
496 record,
497 sender,
498 quorum,
499 } => {
500 cmd_string = "PutRecordTo";
501 let record_key = PrettyPrintRecordKey::from(&record.key).into_owned();
502 debug!(
503 "Putting record {record_key:?} sized: {:?} to {peers:?}",
504 record.value.len(),
505 );
506 let peers_count = peers.len();
507 let request_id = self.swarm.behaviour_mut().kademlia.put_record_to(
508 record,
509 peers.into_iter(),
510 quorum.get_kad_quorum(),
511 );
512 debug!("Sent record {record_key:?} to {peers_count:?} peers. Request id: {request_id:?}");
513
514 if let Err(err) = sender.send(Ok(())) {
515 error!("Could not send response to PutRecordTo cmd: {:?}", err);
516 }
517 }
518 NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, sender } => {
519 cmd_string = "GetClosestPeersToAddressFromNetwork";
520 let query_id = self
521 .swarm
522 .behaviour_mut()
523 .kademlia
524 .get_closest_peers(key.as_bytes());
525 let _ = self.pending_get_closest_peers.insert(
526 query_id,
527 (
528 PendingGetClosestType::FunctionCall(sender),
529 Default::default(),
530 ),
531 );
532 }
533
534 NetworkSwarmCmd::SendRequest { req, peer, sender } => {
535 cmd_string = "SendRequest";
536 if peer == *self.swarm.local_peer_id() {
540 trace!("Sending query request to self");
541 if let Request::Query(query) = req {
542 self.send_event(NetworkEvent::QueryRequestReceived {
543 query,
544 channel: MsgResponder::FromSelf(sender),
545 });
546 } else {
547 trace!("Replicate cmd to self received, ignoring");
550 }
551 } else {
552 let request_id = self
553 .swarm
554 .behaviour_mut()
555 .request_response
556 .send_request(&peer, req);
557 trace!("Sending request {request_id:?} to peer {peer:?}");
558 let _ = self.pending_requests.insert(request_id, sender);
559
560 trace!("Pending Requests now: {:?}", self.pending_requests.len());
561 }
562 }
563 NetworkSwarmCmd::SendResponse { resp, channel } => {
564 cmd_string = "SendResponse";
565 match channel {
566 MsgResponder::FromSelf(channel) => {
568 trace!("Sending response to self");
569 match channel {
570 Some(channel) => {
571 channel
572 .send(Ok(resp))
573 .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
574 }
575 None => {
576 self.send_event(NetworkEvent::ResponseReceived { res: resp });
579 }
580 }
581 }
582 MsgResponder::FromPeer(channel) => {
583 self.swarm
584 .behaviour_mut()
585 .request_response
586 .send_response(channel, resp)
587 .map_err(NetworkError::OutgoingResponseDropped)?;
588 }
589 }
590 }
591 }
592
593 self.log_handling(cmd_string.to_string(), start.elapsed());
594
595 Ok(())
596 }
597 pub(crate) fn handle_local_cmd(&mut self, cmd: LocalSwarmCmd) -> Result<(), NetworkError> {
598 let start = Instant::now();
599 let mut cmd_string;
600 match cmd {
601 LocalSwarmCmd::TriggerIntervalReplication => {
602 cmd_string = "TriggerIntervalReplication";
603 self.try_interval_replication()?;
604 }
605 LocalSwarmCmd::GetLocalQuotingMetrics {
606 key,
607 data_type,
608 data_size,
609 sender,
610 } => {
611 cmd_string = "GetLocalQuotingMetrics";
612 let kbucket_status = self.get_kbuckets_status();
613 self.update_on_kbucket_status(&kbucket_status);
614 let (quoting_metrics, is_already_stored) = match self
615 .swarm
616 .behaviour_mut()
617 .kademlia
618 .store_mut()
619 .quoting_metrics(
620 &key,
621 data_type,
622 data_size,
623 Some(kbucket_status.estimated_network_size as u64),
624 ) {
625 Ok(res) => res,
626 Err(err) => {
627 let _res = sender.send(Err(err));
628 return Ok(());
629 }
630 };
631 self.record_metrics(Marker::QuotingMetrics {
632 quoting_metrics: "ing_metrics,
633 });
634
635 let mut bad_nodes: Vec<_> = self
638 .bad_nodes
639 .iter()
640 .filter_map(|(peer_id, (_issue_list, is_bad))| {
641 if *is_bad {
642 Some(NetworkAddress::from_peer(*peer_id))
643 } else {
644 None
645 }
646 })
647 .collect();
648
649 let kbucket_key = NetworkAddress::from_record_key(&key).as_kbucket_key();
651 let closest_peers: Vec<_> = self
652 .swarm
653 .behaviour_mut()
654 .kademlia
655 .get_closest_local_peers(&kbucket_key)
656 .map(|peer| peer.into_preimage())
657 .take(CLOSE_GROUP_SIZE)
658 .collect();
659 if closest_peers.len() >= CLOSE_GROUP_SIZE {
661 let boundary_peer = closest_peers[CLOSE_GROUP_SIZE - 1];
662 let key_address = NetworkAddress::from_record_key(&key);
663 let boundary_distance =
664 key_address.distance(&NetworkAddress::from_peer(boundary_peer));
665 bad_nodes
666 .retain(|peer_addr| key_address.distance(peer_addr) < boundary_distance);
667 }
668
669 let _res = sender.send(Ok((quoting_metrics, is_already_stored)));
670 }
671 LocalSwarmCmd::PaymentReceived => {
672 cmd_string = "PaymentReceived";
673 self.swarm
674 .behaviour_mut()
675 .kademlia
676 .store_mut()
677 .payment_received();
678 }
679 LocalSwarmCmd::GetLocalRecord { key, sender } => {
680 cmd_string = "GetLocalRecord";
681 let record = self
682 .swarm
683 .behaviour_mut()
684 .kademlia
685 .store_mut()
686 .get(&key)
687 .map(|rec| rec.into_owned());
688 let _ = sender.send(record);
689 }
690
691 LocalSwarmCmd::PutLocalRecord {
692 record,
693 is_client_put,
694 } => {
695 cmd_string = "PutLocalRecord";
696 let key = record.key.clone();
697 let record_key = PrettyPrintRecordKey::from(&key);
698
699 let record_type = match RecordHeader::from_record(&record) {
700 Ok(record_header) => {
701 match record_header.kind {
702 RecordKind::DataOnly(DataTypes::Chunk) => ValidationType::Chunk,
703 RecordKind::DataOnly(_) => {
704 let content_hash = XorName::from_content(&record.value);
705 ValidationType::NonChunk(content_hash)
706 }
707 RecordKind::DataWithPayment(_) => {
708 error!("Record {record_key:?} with payment shall not be stored locally.");
709 return Err(NetworkError::InCorrectRecordHeader);
710 }
711 }
712 }
713 Err(err) => {
714 error!("For record {record_key:?}, failed to parse record_header {err:?}");
715 return Err(NetworkError::InCorrectRecordHeader);
716 }
717 };
718
719 let result = self
720 .swarm
721 .behaviour_mut()
722 .kademlia
723 .store_mut()
724 .put_verified(record, record_type.clone(), is_client_put);
725
726 match result {
727 Ok(_) => {
728 }
735 Err(StoreError::MaxRecords) => {
736 let farthest = self
739 .swarm
740 .behaviour_mut()
741 .kademlia
742 .store_mut()
743 .get_farthest()?;
744 self.replication_fetcher.set_farthest_on_full(farthest);
745 }
746 Err(_) => {
747 }
750 }
751
752 let new_keys_to_fetch = self
757 .replication_fetcher
758 .notify_about_new_put(key.clone(), record_type);
759
760 if !new_keys_to_fetch.is_empty() {
761 self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
762 }
763
764 if let Some(distance) = self
767 .swarm
768 .behaviour_mut()
769 .kademlia
770 .store_mut()
771 .get_farthest_replication_distance()?
772 {
773 self.replication_fetcher
774 .set_replication_distance_range(distance);
775 }
776
777 if let Err(err) = result {
778 error!("Can't store verified record {record_key:?} locally: {err:?}");
779 cmd_string = "PutLocalRecord error";
780 self.log_handling(cmd_string.to_string(), start.elapsed());
781 return Err(err.into());
782 };
783 }
784 LocalSwarmCmd::AddLocalRecordAsStored {
785 key,
786 record_type,
787 data_type,
788 } => {
789 cmd_string = "AddLocalRecordAsStored";
790 self.swarm
791 .behaviour_mut()
792 .kademlia
793 .store_mut()
794 .mark_as_stored(key, record_type, data_type);
795 self.hard_disk_write_error = 0;
797 }
798 LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
799 info!("Removing Record locally, for {key:?}");
800 cmd_string = "RemoveFailedLocalRecord";
801 self.swarm.behaviour_mut().kademlia.store_mut().remove(&key);
802 self.hard_disk_write_error = self.hard_disk_write_error.saturating_add(1);
803 if self.hard_disk_write_error > MAX_CONTINUOUS_HDD_WRITE_ERROR {
806 self.send_event(NetworkEvent::TerminateNode {
807 reason: TerminateNodeReason::HardDiskWriteError,
808 });
809 }
810 }
811 LocalSwarmCmd::RecordStoreHasKey { key, sender } => {
812 cmd_string = "RecordStoreHasKey";
813 let has_key = self
814 .swarm
815 .behaviour_mut()
816 .kademlia
817 .store_mut()
818 .contains(&key);
819 let _ = sender.send(has_key);
820 }
821 LocalSwarmCmd::GetAllLocalRecordAddresses { sender } => {
822 cmd_string = "GetAllLocalRecordAddresses";
823 #[allow(clippy::mutable_key_type)] let addresses = self
825 .swarm
826 .behaviour_mut()
827 .kademlia
828 .store_mut()
829 .record_addresses();
830 let _ = sender.send(addresses);
831 }
832 LocalSwarmCmd::GetKBuckets { sender } => {
833 cmd_string = "GetKBuckets";
834 let mut ilog2_kbuckets = BTreeMap::new();
835 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
836 let range = kbucket.range();
837 if let Some(distance) = range.0.ilog2() {
838 let peers_in_kbucket = kbucket
839 .iter()
840 .map(|peer_entry| peer_entry.node.key.into_preimage())
841 .collect::<Vec<PeerId>>();
842 let _ = ilog2_kbuckets.insert(distance, peers_in_kbucket);
843 } else {
844 error!("bucket is ourself ???!!!");
846 }
847 }
848 let _ = sender.send(ilog2_kbuckets);
849 }
850 LocalSwarmCmd::GetPeersWithMultiaddr { sender } => {
851 cmd_string = "GetPeersWithMultiAddr";
852 let mut result: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
853 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
854 let peers_in_kbucket = kbucket
855 .iter()
856 .map(|peer_entry| {
857 (
858 peer_entry.node.key.into_preimage(),
859 peer_entry.node.value.clone().into_vec(),
860 )
861 })
862 .collect::<Vec<(PeerId, Vec<Multiaddr>)>>();
863 result.extend(peers_in_kbucket);
864 }
865 let _ = sender.send(result);
866 }
867 LocalSwarmCmd::GetCloseGroupLocalPeers { key, sender } => {
868 cmd_string = "GetCloseGroupLocalPeers";
869 let key = key.as_kbucket_key();
870 let closest_peers = self
874 .swarm
875 .behaviour_mut()
876 .kademlia
877 .get_closest_local_peers(&key)
878 .map(|peer| peer.into_preimage())
879 .take(CLOSE_GROUP_SIZE)
880 .collect();
881
882 let _ = sender.send(closest_peers);
883 }
884 LocalSwarmCmd::GetClosestKLocalPeers { sender } => {
885 cmd_string = "GetClosestKLocalPeers";
886 let _ = sender.send(self.get_closest_k_value_local_peers());
887 }
888 LocalSwarmCmd::GetReplicateCandidates { data_addr, sender } => {
889 cmd_string = "GetReplicateCandidates";
890 let _ = sender.send(self.get_replicate_candidates(&data_addr));
891 }
892 LocalSwarmCmd::GetSwarmLocalState(sender) => {
893 cmd_string = "GetSwarmLocalState";
894 let current_state = SwarmLocalState {
895 connected_peers: self.swarm.connected_peers().cloned().collect(),
896 listeners: self.swarm.listeners().cloned().collect(),
897 };
898
899 sender
900 .send(current_state)
901 .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
902 }
903 LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
904 cmd_string = "AddPeerToBlockList";
905 self.swarm.behaviour_mut().blocklist.block_peer(peer_id);
906 }
907 LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
908 cmd_string = "RecordNodeIssues";
909 self.record_node_issue(peer_id, issue);
910 }
911 LocalSwarmCmd::IsPeerShunned { target, sender } => {
912 cmd_string = "IsPeerInTrouble";
913 let is_bad = if let Some(peer_id) = target.as_peer_id() {
914 if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
915 *is_bad
916 } else {
917 false
918 }
919 } else {
920 false
921 };
922 let _ = sender.send(is_bad);
923 }
924 LocalSwarmCmd::QuoteVerification { quotes } => {
925 cmd_string = "QuoteVerification";
926 for (peer_id, quote) in quotes {
927 if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
929 if *is_bad {
930 continue;
931 }
932 }
933 self.verify_peer_quote(peer_id, quote);
934 }
935 }
936 LocalSwarmCmd::FetchCompleted((key, record_type)) => {
937 info!(
938 "Fetch of {record_type:?} {:?} early completed, may have fetched an old version of the record.",
939 PrettyPrintRecordKey::from(&key)
940 );
941 cmd_string = "FetchCompleted";
942 let new_keys_to_fetch = self
943 .replication_fetcher
944 .notify_fetch_early_completed(key, record_type);
945 if !new_keys_to_fetch.is_empty() {
946 self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
947 }
948 }
949 LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
950 cmd_string = "TriggerIrrelevantRecordCleanup";
951 self.swarm
952 .behaviour_mut()
953 .kademlia
954 .store_mut()
955 .cleanup_irrelevant_records();
956 }
957 LocalSwarmCmd::AddNetworkDensitySample { distance } => {
958 cmd_string = "AddNetworkDensitySample";
959 self.network_density_samples.add(distance);
960 }
961 LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
962 cmd_string = "NotifyPeerScores";
963 self.replication_fetcher.add_peer_scores(peer_scores);
964 }
965 LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
966 cmd_string = "AddFreshReplicateRecords";
967 let _ = self.add_keys_to_replication_fetcher(holder, keys, true);
968 }
969 LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
970 cmd_string = "NotifyPeerVersion";
971 self.record_node_version(peer, version);
972 }
973 }
974
975 self.log_handling(cmd_string.to_string(), start.elapsed());
976
977 Ok(())
978 }
979
980 fn record_node_version(&mut self, peer_id: PeerId, version: String) {
981 let _ = self.peers_version.insert(peer_id, version);
982
983 let mut peers_in_non_full_buckets = vec![];
985 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
986 let num_entires = kbucket.num_entries();
987 if num_entires >= K_VALUE.get() {
988 continue;
989 } else {
990 let peers_in_kbucket = kbucket
991 .iter()
992 .map(|peer_entry| peer_entry.node.key.into_preimage())
993 .collect::<Vec<PeerId>>();
994 peers_in_non_full_buckets.extend(peers_in_kbucket);
995 }
996 }
997
998 self.peers_version
1000 .retain(|peer_id, _version| peers_in_non_full_buckets.contains(peer_id));
1001 }
1002
1003 pub(crate) fn record_node_issue(&mut self, peer_id: PeerId, issue: NodeIssue) {
1004 info!("Peer {peer_id:?} is reported as having issue {issue:?}");
1005 let (issue_vec, is_bad) = self.bad_nodes.entry(peer_id).or_default();
1006 let mut new_bad_behaviour = None;
1007 let mut is_connection_issue = false;
1008
1009 if !(*is_bad) {
1011 issue_vec.retain(|(_, timestamp)| timestamp.elapsed().as_secs() < 300);
1013
1014 if issue_vec.len() == 10 {
1017 issue_vec.remove(0);
1018 }
1019
1020 let is_new_issue = if let Some((_issue, timestamp)) = issue_vec.last() {
1023 timestamp.elapsed().as_secs() > 10
1024 } else {
1025 true
1026 };
1027
1028 if is_new_issue {
1029 issue_vec.push((issue, Instant::now()));
1030 } else {
1031 return;
1032 }
1033
1034 for (issue, _timestamp) in issue_vec.iter() {
1037 let issue_counts = issue_vec
1038 .iter()
1039 .filter(|(i, _timestamp)| *issue == *i)
1040 .count();
1041 if issue_counts >= 3 {
1042 if matches!(issue, NodeIssue::ConnectionIssue) {
1044 is_connection_issue = true;
1045 }
1046 new_bad_behaviour = Some(issue.clone());
1052 info!("Peer {peer_id:?} accumulated {issue_counts} times of issue {issue:?}. Consider it as a bad node now.");
1053 break;
1055 }
1056 }
1057 }
1058
1059 if is_connection_issue {
1062 issue_vec.retain(|(issue, _timestamp)| !matches!(issue, NodeIssue::ConnectionIssue));
1063 info!("Evicting bad peer {peer_id:?} due to connection issue from RT.");
1064 if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
1065 self.update_on_peer_removal(*dead_peer.node.key.preimage());
1066 }
1067 return;
1068 }
1069
1070 if *is_bad {
1071 info!("Evicting bad peer {peer_id:?} from RT.");
1072 if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
1073 self.update_on_peer_removal(*dead_peer.node.key.preimage());
1074 }
1075
1076 if let Some(bad_behaviour) = new_bad_behaviour {
1077 self.record_metrics(Marker::PeerConsideredAsBad { bad_peer: &peer_id });
1079
1080 warn!("Peer {peer_id:?} is considered as bad due to {bad_behaviour:?}. Informing the peer and adding to blocklist.");
1081 let (tx, rx) = oneshot::channel();
1083 let local_swarm_cmd_sender = self.local_cmd_sender.clone();
1084 tokio::spawn(async move {
1085 match rx.await {
1086 Ok(result) => {
1087 debug!("Got response for Cmd::PeerConsideredAsBad from {peer_id:?} {result:?}");
1088 if let Err(err) = local_swarm_cmd_sender
1089 .send(LocalSwarmCmd::AddPeerToBlockList { peer_id })
1090 .await
1091 {
1092 error!("SwarmDriver failed to send LocalSwarmCmd: {err}");
1093 }
1094 }
1095 Err(err) => {
1096 error!("Failed to get response from one shot channel for Cmd::PeerConsideredAsBad : {err:?}");
1097 }
1098 }
1099 });
1100
1101 let request = Request::Cmd(Cmd::PeerConsideredAsBad {
1103 detected_by: NetworkAddress::from_peer(self.self_peer_id),
1104 bad_peer: NetworkAddress::from_peer(peer_id),
1105 bad_behaviour: bad_behaviour.to_string(),
1106 });
1107 self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1108 req: request,
1109 peer: peer_id,
1110 sender: Some(tx),
1111 });
1112 }
1113 }
1114 }
1115
1116 fn verify_peer_quote(&mut self, peer_id: PeerId, quote: PaymentQuote) {
1117 if let Some(history_quote) = self.quotes_history.get(&peer_id) {
1118 if !history_quote.historical_verify("e) {
1119 info!("From {peer_id:?}, detected a bad quote {quote:?} against history_quote {history_quote:?}");
1120 self.record_node_issue(peer_id, NodeIssue::BadQuoting);
1121 return;
1122 }
1123
1124 if history_quote.is_newer_than("e) {
1125 return;
1126 }
1127 }
1128
1129 let _ = self.quotes_history.insert(peer_id, quote);
1130 }
1131
1132 fn try_interval_replication(&mut self) -> Result<()> {
1133 if let Some(last_replication) = self.last_replication {
1135 if last_replication.elapsed() < MIN_REPLICATION_INTERVAL_S {
1136 info!("Skipping replication as minimum interval hasn't elapsed");
1137 return Ok(());
1138 }
1139 }
1140 self.last_replication = Some(Instant::now());
1142
1143 let self_addr = NetworkAddress::from_peer(self.self_peer_id);
1144 let mut replicate_targets = self.get_replicate_candidates(&self_addr)?;
1145
1146 let now = Instant::now();
1147 self.replication_targets
1148 .retain(|_peer_id, timestamp| *timestamp > now);
1149 replicate_targets.retain(|peer_id| !self.replication_targets.contains_key(peer_id));
1151 if replicate_targets.is_empty() {
1152 return Ok(());
1153 }
1154
1155 let all_records: Vec<_> = self
1156 .swarm
1157 .behaviour_mut()
1158 .kademlia
1159 .store_mut()
1160 .record_addresses_ref()?
1161 .values()
1162 .cloned()
1163 .collect();
1164
1165 if !all_records.is_empty() {
1166 debug!(
1167 "Sending a replication list of {} keys to {replicate_targets:?} ",
1168 all_records.len()
1169 );
1170 let request = Request::Cmd(Cmd::Replicate {
1171 holder: NetworkAddress::from_peer(self.self_peer_id),
1172 keys: all_records
1173 .into_iter()
1174 .map(|(addr, val_type, _data_type)| (addr, val_type))
1175 .collect(),
1176 });
1177 for peer_id in replicate_targets {
1178 self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1179 req: request.clone(),
1180 peer: peer_id,
1181 sender: None,
1182 });
1183
1184 let _ = self
1185 .replication_targets
1186 .insert(peer_id, now + REPLICATION_TIMEOUT);
1187 }
1188 }
1189
1190 Ok(())
1191 }
1192
1193 pub(crate) fn get_replicate_candidates(
1200 &mut self,
1201 target: &NetworkAddress,
1202 ) -> Result<Vec<PeerId>> {
1203 let is_periodic_replicate = target.as_peer_id().is_some();
1204 let expected_candidates = if is_periodic_replicate {
1205 CLOSE_GROUP_SIZE * 2
1206 } else {
1207 CLOSE_GROUP_SIZE
1208 };
1209
1210 let kbucket_key = target.as_kbucket_key();
1212 let closest_k_peers: Vec<PeerId> = self
1213 .swarm
1214 .behaviour_mut()
1215 .kademlia
1216 .get_closest_local_peers(&kbucket_key)
1217 .map(|key| key.into_preimage())
1219 .take(K_VALUE.get())
1220 .collect();
1221
1222 if let Some(responsible_range) = self
1223 .swarm
1224 .behaviour_mut()
1225 .kademlia
1226 .store_mut()
1227 .get_farthest_replication_distance()?
1228 {
1229 let peers_in_range = get_peers_in_range(&closest_k_peers, target, responsible_range);
1230
1231 if peers_in_range.len() >= expected_candidates {
1232 return Ok(peers_in_range);
1233 }
1234 }
1235
1236 Ok(closest_k_peers
1238 .iter()
1239 .take(expected_candidates)
1240 .cloned()
1241 .collect())
1242 }
1243}
1244
1245fn get_peers_in_range(peers: &[PeerId], address: &NetworkAddress, range: Distance) -> Vec<PeerId> {
1247 peers
1248 .iter()
1249 .filter_map(|peer_id| {
1250 if address.distance(&NetworkAddress::from_peer(*peer_id)) <= range {
1251 Some(*peer_id)
1252 } else {
1253 None
1254 }
1255 })
1256 .collect()
1257}