1use crate::time::Instant;
10use crate::{
11 config::GetRecordCfg,
12 driver::{PendingGetClosestType, SwarmDriver},
13 error::{NetworkError, Result},
14 event::TerminateNodeReason,
15 log_markers::Marker,
16 Addresses, GetRecordError, MsgResponder, NetworkEvent, ResponseQuorum, CLOSE_GROUP_SIZE,
17};
18use ant_evm::{PaymentQuote, QuotingMetrics};
19use ant_protocol::messages::ConnectionInfo;
20use ant_protocol::{
21 messages::{Cmd, Request, Response},
22 storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
23 NetworkAddress, PrettyPrintRecordKey,
24};
25use libp2p::{
26 kad::{
27 store::{Error as StoreError, RecordStore},
28 KBucketDistance as Distance, Record, RecordKey, K_VALUE,
29 },
30 swarm::dial_opts::{DialOpts, PeerCondition},
31 Multiaddr, PeerId,
32};
33use std::{
34 collections::{BTreeMap, HashMap},
35 fmt::Debug,
36 time::Duration,
37};
38use tokio::sync::oneshot;
39use xor_name::XorName;
40
41const MAX_CONTINUOUS_HDD_WRITE_ERROR: usize = 5;
42
43const REPLICATION_TIMEOUT: Duration = Duration::from_secs(45);
45
46const MIN_REPLICATION_INTERVAL_S: Duration = Duration::from_secs(30);
48
49#[derive(Debug, Eq, PartialEq, Clone)]
50pub enum NodeIssue {
51 ConnectionIssue,
53 ReplicationFailure,
55 CloseNodesShunning,
57 BadQuoting,
59 FailedChunkProofCheck,
61}
62
63impl std::fmt::Display for NodeIssue {
64 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
65 match self {
66 NodeIssue::ConnectionIssue => write!(f, "CriticalConnectionIssue"),
67 NodeIssue::ReplicationFailure => write!(f, "ReplicationFailure"),
68 NodeIssue::CloseNodesShunning => write!(f, "CloseNodesShunning"),
69 NodeIssue::BadQuoting => write!(f, "BadQuoting"),
70 NodeIssue::FailedChunkProofCheck => write!(f, "FailedChunkProofCheck"),
71 }
72 }
73}
74
75pub enum LocalSwarmCmd {
77 GetPeersWithMultiaddr {
79 sender: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
80 },
81 GetKBuckets {
84 sender: oneshot::Sender<BTreeMap<u32, Vec<PeerId>>>,
85 },
86 GetClosestKLocalPeers {
89 sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
90 },
91 GetCloseLocalPeersToTarget {
93 key: NetworkAddress,
94 num_of_peers: usize,
95 sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
96 },
97 GetSwarmLocalState(oneshot::Sender<SwarmLocalState>),
98 RecordStoreHasKey {
100 key: RecordKey,
101 sender: oneshot::Sender<Result<bool>>,
102 },
103 GetAllLocalRecordAddresses {
105 sender: oneshot::Sender<Result<HashMap<NetworkAddress, ValidationType>>>,
106 },
107 GetLocalRecord {
109 key: RecordKey,
110 sender: oneshot::Sender<Option<Record>>,
111 },
112 GetLocalQuotingMetrics {
115 key: RecordKey,
116 data_type: u32,
117 data_size: usize,
118 sender: oneshot::Sender<Result<(QuotingMetrics, bool)>>,
119 },
120 PaymentReceived,
122 PutLocalRecord {
124 record: Record,
125 is_client_put: bool,
126 },
127 RemoveFailedLocalRecord {
130 key: RecordKey,
131 },
132 AddLocalRecordAsStored {
135 key: RecordKey,
136 record_type: ValidationType,
137 data_type: DataTypes,
138 },
139 AddPeerToBlockList {
141 peer_id: PeerId,
142 },
143 RecordNodeIssue {
145 peer_id: PeerId,
146 issue: NodeIssue,
147 },
148 IsPeerShunned {
150 target: NetworkAddress,
151 sender: oneshot::Sender<bool>,
152 },
153 QuoteVerification {
155 quotes: Vec<(PeerId, PaymentQuote)>,
156 },
157 FetchCompleted((RecordKey, ValidationType)),
159 TriggerIntervalReplication,
162 TriggerIrrelevantRecordCleanup,
164 AddNetworkDensitySample {
166 distance: Distance,
167 },
168 NotifyPeerScores {
170 peer_scores: Vec<(PeerId, bool)>,
171 },
172 AddFreshReplicateRecords {
174 holder: NetworkAddress,
175 keys: Vec<(NetworkAddress, ValidationType)>,
176 },
177 NotifyPeerVersion {
179 peer: PeerId,
180 version: String,
181 },
182 GetNetworkDensity {
184 sender: oneshot::Sender<Option<Distance>>,
185 },
186 RemovePeer {
188 peer: PeerId,
189 },
190}
191
192pub enum NetworkSwarmCmd {
194 GetClosestPeersToAddressFromNetwork {
196 key: NetworkAddress,
197 sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
198 },
199
200 SendRequest {
202 req: Request,
203 peer: PeerId,
204 addrs: Option<Addresses>,
206
207 #[allow(clippy::type_complexity)]
214 sender: Option<oneshot::Sender<Result<(Response, Option<ConnectionInfo>)>>>,
215 },
216 SendResponse {
217 resp: Response,
218 channel: MsgResponder,
219 },
220
221 GetNetworkRecord {
223 key: RecordKey,
224 sender: oneshot::Sender<std::result::Result<Record, GetRecordError>>,
225 cfg: GetRecordCfg,
226 },
227
228 PutRecord {
230 record: Record,
231 sender: oneshot::Sender<Result<()>>,
232 quorum: ResponseQuorum,
233 },
234 PutRecordTo {
236 peers: Vec<PeerId>,
237 record: Record,
238 sender: oneshot::Sender<Result<()>>,
239 quorum: ResponseQuorum,
240 },
241
242 DialPeer {
244 peer: PeerId,
245 addrs: Addresses,
246 },
247}
248
249impl Debug for LocalSwarmCmd {
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 match self {
254 LocalSwarmCmd::PutLocalRecord {
255 record,
256 is_client_put,
257 } => {
258 write!(
259 f,
260 "LocalSwarmCmd::PutLocalRecord {{ key: {:?}, is_client_put: {is_client_put:?} }}",
261 PrettyPrintRecordKey::from(&record.key)
262 )
263 }
264 LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
265 write!(
266 f,
267 "LocalSwarmCmd::RemoveFailedLocalRecord {{ key: {:?} }}",
268 PrettyPrintRecordKey::from(key)
269 )
270 }
271 LocalSwarmCmd::AddLocalRecordAsStored {
272 key,
273 record_type,
274 data_type,
275 } => {
276 write!(
277 f,
278 "LocalSwarmCmd::AddLocalRecordAsStored {{ key: {:?}, record_type: {record_type:?}, data_type: {data_type:?} }}",
279 PrettyPrintRecordKey::from(key)
280 )
281 }
282 LocalSwarmCmd::GetClosestKLocalPeers { .. } => {
283 write!(f, "LocalSwarmCmd::GetClosestKLocalPeers")
284 }
285 LocalSwarmCmd::GetCloseLocalPeersToTarget {
286 key, num_of_peers, ..
287 } => {
288 write!(
289 f,
290 "LocalSwarmCmd::GetCloseLocalPeersToTarget {{ key: {key:?}, num_of_peers: {num_of_peers} }}"
291 )
292 }
293 LocalSwarmCmd::GetLocalQuotingMetrics { .. } => {
294 write!(f, "LocalSwarmCmd::GetLocalQuotingMetrics")
295 }
296 LocalSwarmCmd::PaymentReceived => {
297 write!(f, "LocalSwarmCmd::PaymentReceived")
298 }
299 LocalSwarmCmd::GetLocalRecord { key, .. } => {
300 write!(
301 f,
302 "LocalSwarmCmd::GetLocalRecord {{ key: {:?} }}",
303 PrettyPrintRecordKey::from(key)
304 )
305 }
306 LocalSwarmCmd::GetAllLocalRecordAddresses { .. } => {
307 write!(f, "LocalSwarmCmd::GetAllLocalRecordAddresses")
308 }
309 LocalSwarmCmd::GetPeersWithMultiaddr { .. } => {
310 write!(f, "LocalSwarmCmd::GetPeersWithMultiaddr")
311 }
312 LocalSwarmCmd::GetKBuckets { .. } => {
313 write!(f, "LocalSwarmCmd::GetKBuckets")
314 }
315 LocalSwarmCmd::GetSwarmLocalState { .. } => {
316 write!(f, "LocalSwarmCmd::GetSwarmLocalState")
317 }
318 LocalSwarmCmd::RecordStoreHasKey { key, .. } => {
319 write!(
320 f,
321 "LocalSwarmCmd::RecordStoreHasKey {:?}",
322 PrettyPrintRecordKey::from(key)
323 )
324 }
325 LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
326 write!(f, "LocalSwarmCmd::AddPeerToBlockList {peer_id:?}")
327 }
328 LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
329 write!(
330 f,
331 "LocalSwarmCmd::SendNodeStatus peer {peer_id:?}, issue: {issue:?}"
332 )
333 }
334 LocalSwarmCmd::IsPeerShunned { target, .. } => {
335 write!(f, "LocalSwarmCmd::IsPeerInTrouble target: {target:?}")
336 }
337 LocalSwarmCmd::QuoteVerification { quotes } => {
338 write!(
339 f,
340 "LocalSwarmCmd::QuoteVerification of {} quotes",
341 quotes.len()
342 )
343 }
344 LocalSwarmCmd::FetchCompleted((key, record_type)) => {
345 write!(
346 f,
347 "LocalSwarmCmd::FetchCompleted({record_type:?} : {:?})",
348 PrettyPrintRecordKey::from(key)
349 )
350 }
351 LocalSwarmCmd::TriggerIntervalReplication => {
352 write!(f, "LocalSwarmCmd::TriggerIntervalReplication")
353 }
354 LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
355 write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup")
356 }
357 LocalSwarmCmd::AddNetworkDensitySample { distance } => {
358 write!(f, "LocalSwarmCmd::AddNetworkDensitySample({distance:?})")
359 }
360 LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
361 write!(f, "LocalSwarmCmd::NotifyPeerScores({peer_scores:?})")
362 }
363 LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
364 write!(
365 f,
366 "LocalSwarmCmd::AddFreshReplicateRecords({holder:?}, {keys:?})"
367 )
368 }
369 LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
370 write!(f, "LocalSwarmCmd::NotifyPeerVersion({peer:?}, {version:?})")
371 }
372 LocalSwarmCmd::GetNetworkDensity { .. } => {
373 write!(f, "LocalSwarmCmd::GetNetworkDensity")
374 }
375 LocalSwarmCmd::RemovePeer { peer } => {
376 write!(f, "LocalSwarmCmd::RemovePeer({peer:?})")
377 }
378 }
379 }
380}
381
382impl Debug for NetworkSwarmCmd {
385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 match self {
387 NetworkSwarmCmd::GetNetworkRecord { key, cfg, .. } => {
388 write!(
389 f,
390 "NetworkSwarmCmd::GetNetworkRecord {{ key: {:?}, cfg: {cfg:?}",
391 PrettyPrintRecordKey::from(key)
392 )
393 }
394 NetworkSwarmCmd::PutRecord { record, .. } => {
395 write!(
396 f,
397 "NetworkSwarmCmd::PutRecord {{ key: {:?} }}",
398 PrettyPrintRecordKey::from(&record.key)
399 )
400 }
401 NetworkSwarmCmd::PutRecordTo { peers, record, .. } => {
402 write!(
403 f,
404 "NetworkSwarmCmd::PutRecordTo {{ peers: {peers:?}, key: {:?} }}",
405 PrettyPrintRecordKey::from(&record.key)
406 )
407 }
408 NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, .. } => {
409 write!(f, "NetworkSwarmCmd::GetClosestPeers {{ key: {key:?} }}")
410 }
411 NetworkSwarmCmd::SendResponse { resp, .. } => {
412 write!(f, "NetworkSwarmCmd::SendResponse resp: {resp:?}")
413 }
414 NetworkSwarmCmd::SendRequest { req, peer, .. } => {
415 write!(
416 f,
417 "NetworkSwarmCmd::SendRequest req: {req:?}, peer: {peer:?}"
418 )
419 }
420 NetworkSwarmCmd::DialPeer { peer, .. } => {
421 write!(f, "NetworkSwarmCmd::DialPeer peer: {peer:?}")
422 }
423 }
424 }
425}
426#[derive(Debug, Clone)]
428pub struct SwarmLocalState {
429 pub connected_peers: Vec<PeerId>,
431 pub peers_in_routing_table: usize,
433 pub listeners: Vec<Multiaddr>,
435}
436
437impl SwarmDriver {
438 pub(crate) fn handle_network_cmd(&mut self, cmd: NetworkSwarmCmd) -> Result<(), NetworkError> {
439 let start = Instant::now();
440 let cmd_string;
441 match cmd {
442 NetworkSwarmCmd::GetNetworkRecord { key, sender, cfg } => {
443 cmd_string = "GetNetworkRecord";
444
445 for (pending_query, (inflight_record_query_key, senders, _, _)) in
446 self.pending_get_record.iter_mut()
447 {
448 if *inflight_record_query_key == key {
449 debug!(
450 "GetNetworkRecord for {:?} is already in progress. Adding sender to {pending_query:?}",
451 PrettyPrintRecordKey::from(&key)
452 );
453 senders.push(sender);
454
455 return Ok(());
457 }
458 }
459
460 let query_id = self.swarm.behaviour_mut().kademlia.get_record(key.clone());
461
462 debug!(
463 "Record {:?} with task {query_id:?} expected to be held by {:?}",
464 PrettyPrintRecordKey::from(&key),
465 cfg.expected_holders
466 );
467
468 if self
469 .pending_get_record
470 .insert(query_id, (key, vec![sender], Default::default(), cfg))
471 .is_some()
472 {
473 warn!("An existing get_record task {query_id:?} got replaced");
474 }
475 let total_records: usize = self
478 .pending_get_record
479 .iter()
480 .map(|(_, (_, _, result_map, _))| result_map.len())
481 .sum();
482 info!("We now have {} pending get record attempts and cached {total_records} fetched copies",
483 self.pending_get_record.len());
484 }
485 NetworkSwarmCmd::PutRecord {
486 record,
487 sender,
488 quorum,
489 } => {
490 cmd_string = "PutRecord";
491 let record_key = PrettyPrintRecordKey::from(&record.key).into_owned();
492 debug!(
493 "Putting record sized: {:?} to network {:?}",
494 record.value.len(),
495 record_key
496 );
497 let res = match self
498 .swarm
499 .behaviour_mut()
500 .kademlia
501 .put_record(record, quorum.get_kad_quorum())
502 {
503 Ok(request_id) => {
504 debug!("Sent record {record_key:?} to network. Request id: {request_id:?} to network");
505 Ok(())
506 }
507 Err(error) => {
508 error!("Error sending record {record_key:?} to network");
509 Err(NetworkError::from(error))
510 }
511 };
512
513 if let Err(err) = sender.send(res) {
514 error!("Could not send response to PutRecord cmd: {:?}", err);
515 }
516 }
517 NetworkSwarmCmd::PutRecordTo {
518 peers,
519 record,
520 sender,
521 quorum,
522 } => {
523 cmd_string = "PutRecordTo";
524 let record_key = PrettyPrintRecordKey::from(&record.key).into_owned();
525 debug!(
526 "Putting record {record_key:?} sized: {:?} to {peers:?}",
527 record.value.len(),
528 );
529 let peers_count = peers.len();
530 let request_id = self.swarm.behaviour_mut().kademlia.put_record_to(
531 record,
532 peers.into_iter(),
533 quorum.get_kad_quorum(),
534 );
535 info!("Sent record {record_key:?} to {peers_count:?} peers. Request id: {request_id:?}");
536
537 if let Err(err) = sender.send(Ok(())) {
538 error!("Could not send response to PutRecordTo cmd: {:?}", err);
539 }
540 }
541 NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, sender } => {
542 cmd_string = "GetClosestPeersToAddressFromNetwork";
543 let query_id = self
544 .swarm
545 .behaviour_mut()
546 .kademlia
547 .get_closest_peers(key.as_bytes());
548 let _ = self.pending_get_closest_peers.insert(
549 query_id,
550 (
551 PendingGetClosestType::FunctionCall(sender),
552 Default::default(),
553 ),
554 );
555 }
556
557 NetworkSwarmCmd::SendRequest {
558 req,
559 peer,
560 addrs,
561 sender,
562 } => {
563 cmd_string = "SendRequest";
564 if peer == *self.swarm.local_peer_id() {
568 trace!("Sending query request to self");
569 if let Request::Query(query) = req {
570 self.send_event(NetworkEvent::QueryRequestReceived {
571 query,
572 channel: MsgResponder::FromSelf(sender),
573 });
574 } else {
575 trace!("Replicate cmd to self received, ignoring");
578 }
579 } else {
580 if let Some(addrs) = addrs {
581 if addrs.0.is_empty() {
583 info!("No addresses for peer {peer:?} to send request. This could cause dial failure if swarm could not find the peer's addrs.");
584 } else {
585 let opts = DialOpts::peer_id(peer)
586 .condition(PeerCondition::NotDialing)
588 .addresses(addrs.0.clone())
589 .build();
590
591 match self.swarm.dial(opts) {
592 Ok(()) => {
593 info!("Dialing peer {peer:?} for req_resp with address: {addrs:?}",);
594 }
595 Err(err) => {
596 error!("Failed to dial peer {peer:?} for req_resp with address: {addrs:?} error: {err}",);
597 }
598 }
599 }
600 }
601
602 let request_id = self
603 .swarm
604 .behaviour_mut()
605 .request_response
606 .send_request(&peer, req);
607 trace!("Sending request {request_id:?} to peer {peer:?}");
608 let _ = self.pending_requests.insert(request_id, sender);
609
610 trace!("Pending Requests now: {:?}", self.pending_requests.len());
611 }
612 }
613 NetworkSwarmCmd::SendResponse { resp, channel } => {
614 cmd_string = "SendResponse";
615 match channel {
616 MsgResponder::FromSelf(channel) => {
618 trace!("Sending response to self");
619 match channel {
620 Some(channel) => {
621 channel
622 .send(Ok((resp, None)))
623 .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
624 }
625 None => {
626 self.send_event(NetworkEvent::ResponseReceived { res: resp });
629 }
630 }
631 }
632 MsgResponder::FromPeer(channel) => {
633 self.swarm
634 .behaviour_mut()
635 .request_response
636 .send_response(channel, resp)
637 .map_err(NetworkError::OutgoingResponseDropped)?;
638 }
639 }
640 }
641 NetworkSwarmCmd::DialPeer { peer, addrs } => {
642 cmd_string = "DialPeer";
643 let opts = DialOpts::peer_id(peer)
644 .condition(PeerCondition::NotDialing)
646 .addresses(addrs.0.clone())
647 .build();
648
649 match self.swarm.dial(opts) {
650 Ok(()) => {
651 info!("Manual dialing peer {peer:?} with address: {addrs:?}",);
652 }
653 Err(err) => {
654 error!("Failed to manual dial peer {peer:?} with address: {addrs:?} error: {err}",);
655 }
656 }
657 }
658 }
659
660 self.log_handling(cmd_string.to_string(), start.elapsed());
661
662 Ok(())
663 }
664 pub(crate) fn handle_local_cmd(&mut self, cmd: LocalSwarmCmd) -> Result<(), NetworkError> {
665 let start = Instant::now();
666 let mut cmd_string;
667 match cmd {
668 LocalSwarmCmd::TriggerIntervalReplication => {
669 cmd_string = "TriggerIntervalReplication";
670 self.try_interval_replication()?;
671 }
672 LocalSwarmCmd::GetLocalQuotingMetrics {
673 key,
674 data_type,
675 data_size,
676 sender,
677 } => {
678 cmd_string = "GetLocalQuotingMetrics";
679 let kbucket_status = self.get_kbuckets_status();
680 self.update_on_kbucket_status(&kbucket_status);
681 let (quoting_metrics, is_already_stored) = match self
682 .swarm
683 .behaviour_mut()
684 .kademlia
685 .store_mut()
686 .quoting_metrics(
687 &key,
688 data_type,
689 data_size,
690 Some(kbucket_status.estimated_network_size as u64),
691 ) {
692 Ok(res) => res,
693 Err(err) => {
694 let _res = sender.send(Err(err));
695 return Ok(());
696 }
697 };
698 self.record_metrics(Marker::QuotingMetrics {
699 quoting_metrics: "ing_metrics,
700 });
701
702 let mut bad_nodes: Vec<_> = self
705 .bad_nodes
706 .iter()
707 .filter_map(|(peer_id, (_issue_list, is_bad))| {
708 if *is_bad {
709 Some(NetworkAddress::from(*peer_id))
710 } else {
711 None
712 }
713 })
714 .collect();
715
716 let kbucket_key = NetworkAddress::from(&key).as_kbucket_key();
718 let closest_peers: Vec<_> = self
719 .swarm
720 .behaviour_mut()
721 .kademlia
722 .get_closest_local_peers(&kbucket_key)
723 .map(|peer| peer.into_preimage())
724 .take(CLOSE_GROUP_SIZE)
725 .collect();
726 if closest_peers.len() >= CLOSE_GROUP_SIZE {
728 let boundary_peer = closest_peers[CLOSE_GROUP_SIZE - 1];
729 let key_address = NetworkAddress::from(&key);
730 let boundary_distance =
731 key_address.distance(&NetworkAddress::from(boundary_peer));
732 bad_nodes
733 .retain(|peer_addr| key_address.distance(peer_addr) < boundary_distance);
734 }
735
736 let _res = sender.send(Ok((quoting_metrics, is_already_stored)));
737 }
738 LocalSwarmCmd::PaymentReceived => {
739 cmd_string = "PaymentReceived";
740 self.swarm
741 .behaviour_mut()
742 .kademlia
743 .store_mut()
744 .payment_received();
745 }
746 LocalSwarmCmd::GetLocalRecord { key, sender } => {
747 cmd_string = "GetLocalRecord";
748 let record = self
749 .swarm
750 .behaviour_mut()
751 .kademlia
752 .store_mut()
753 .get(&key)
754 .map(|rec| rec.into_owned());
755 let _ = sender.send(record);
756 }
757
758 LocalSwarmCmd::PutLocalRecord {
759 record,
760 is_client_put,
761 } => {
762 cmd_string = "PutLocalRecord";
763 let key = record.key.clone();
764 let record_key = PrettyPrintRecordKey::from(&key);
765
766 let record_type = match RecordHeader::from_record(&record) {
767 Ok(record_header) => {
768 match record_header.kind {
769 RecordKind::DataOnly(DataTypes::Chunk) => ValidationType::Chunk,
770 RecordKind::DataOnly(_) => {
771 let content_hash = XorName::from_content(&record.value);
772 ValidationType::NonChunk(content_hash)
773 }
774 RecordKind::DataWithPayment(_) => {
775 error!("Record {record_key:?} with payment shall not be stored locally.");
776 return Err(NetworkError::InCorrectRecordHeader);
777 }
778 }
779 }
780 Err(err) => {
781 error!("For record {record_key:?}, failed to parse record_header {err:?}");
782 return Err(NetworkError::InCorrectRecordHeader);
783 }
784 };
785
786 let result = self
787 .swarm
788 .behaviour_mut()
789 .kademlia
790 .store_mut()
791 .put_verified(record, record_type.clone(), is_client_put);
792
793 match result {
794 Ok(_) => {
795 }
802 Err(StoreError::MaxRecords) => {
803 let farthest = self
806 .swarm
807 .behaviour_mut()
808 .kademlia
809 .store_mut()
810 .get_farthest()?;
811 self.replication_fetcher.set_farthest_on_full(farthest);
812 }
813 Err(_) => {
814 }
817 }
818
819 let new_keys_to_fetch = self
824 .replication_fetcher
825 .notify_about_new_put(key.clone(), record_type);
826
827 if !new_keys_to_fetch.is_empty() {
828 self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
829 }
830
831 if let Some(distance) = self
834 .swarm
835 .behaviour_mut()
836 .kademlia
837 .store_mut()
838 .get_farthest_replication_distance()?
839 {
840 self.replication_fetcher
841 .set_replication_distance_range(distance);
842 }
843
844 if let Err(err) = result {
845 error!("Can't store verified record {record_key:?} locally: {err:?}");
846 cmd_string = "PutLocalRecord error";
847 self.log_handling(cmd_string.to_string(), start.elapsed());
848 return Err(err.into());
849 };
850 }
851 LocalSwarmCmd::AddLocalRecordAsStored {
852 key,
853 record_type,
854 data_type,
855 } => {
856 cmd_string = "AddLocalRecordAsStored";
857 self.swarm
858 .behaviour_mut()
859 .kademlia
860 .store_mut()
861 .mark_as_stored(key, record_type, data_type);
862 self.hard_disk_write_error = 0;
864 }
865 LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
866 info!("Removing Record locally, for {key:?}");
867 cmd_string = "RemoveFailedLocalRecord";
868 self.swarm.behaviour_mut().kademlia.store_mut().remove(&key);
869 self.hard_disk_write_error = self.hard_disk_write_error.saturating_add(1);
870 if self.hard_disk_write_error > MAX_CONTINUOUS_HDD_WRITE_ERROR {
873 self.send_event(NetworkEvent::TerminateNode {
874 reason: TerminateNodeReason::HardDiskWriteError,
875 });
876 }
877 }
878 LocalSwarmCmd::RecordStoreHasKey { key, sender } => {
879 cmd_string = "RecordStoreHasKey";
880 let has_key = self
881 .swarm
882 .behaviour_mut()
883 .kademlia
884 .store_mut()
885 .contains(&key);
886 let _ = sender.send(has_key);
887 }
888 LocalSwarmCmd::GetAllLocalRecordAddresses { sender } => {
889 cmd_string = "GetAllLocalRecordAddresses";
890 #[allow(clippy::mutable_key_type)] let addresses = self
892 .swarm
893 .behaviour_mut()
894 .kademlia
895 .store_mut()
896 .record_addresses();
897 let _ = sender.send(addresses);
898 }
899 LocalSwarmCmd::GetKBuckets { sender } => {
900 cmd_string = "GetKBuckets";
901 let mut ilog2_kbuckets = BTreeMap::new();
902 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
903 let range = kbucket.range();
904 if let Some(distance) = range.0.ilog2() {
905 let peers_in_kbucket = kbucket
906 .iter()
907 .map(|peer_entry| peer_entry.node.key.into_preimage())
908 .collect::<Vec<PeerId>>();
909 let _ = ilog2_kbuckets.insert(distance, peers_in_kbucket);
910 } else {
911 error!("bucket is ourself ???!!!");
913 }
914 }
915 let _ = sender.send(ilog2_kbuckets);
916 }
917 LocalSwarmCmd::GetPeersWithMultiaddr { sender } => {
918 cmd_string = "GetPeersWithMultiAddr";
919 let mut result: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
920 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
921 let peers_in_kbucket = kbucket
922 .iter()
923 .map(|peer_entry| {
924 (
925 peer_entry.node.key.into_preimage(),
926 peer_entry.node.value.clone().into_vec(),
927 )
928 })
929 .collect::<Vec<(PeerId, Vec<Multiaddr>)>>();
930 result.extend(peers_in_kbucket);
931 }
932 let _ = sender.send(result);
933 }
934 LocalSwarmCmd::GetCloseLocalPeersToTarget {
935 key,
936 num_of_peers,
937 sender,
938 } => {
939 cmd_string = "GetCloseLocalPeersToTarget";
940 let closest_peers = self.get_closest_local_peers_to_target(&key, num_of_peers);
941
942 let _ = sender.send(closest_peers);
943 }
944 LocalSwarmCmd::GetClosestKLocalPeers { sender } => {
945 cmd_string = "GetClosestKLocalPeers";
946 let _ = sender.send(self.get_closest_k_value_local_peers());
947 }
948 LocalSwarmCmd::GetSwarmLocalState(sender) => {
949 cmd_string = "GetSwarmLocalState";
950 let current_state = SwarmLocalState {
951 connected_peers: self.swarm.connected_peers().cloned().collect(),
952 peers_in_routing_table: self.peers_in_rt,
953 listeners: self.swarm.listeners().cloned().collect(),
954 };
955
956 sender
957 .send(current_state)
958 .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
959 }
960 LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
961 cmd_string = "AddPeerToBlockList";
962 self.swarm.behaviour_mut().blocklist.block_peer(peer_id);
963 }
964 LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
965 cmd_string = "RecordNodeIssues";
966 self.record_node_issue(peer_id, issue);
967 }
968 LocalSwarmCmd::IsPeerShunned { target, sender } => {
969 cmd_string = "IsPeerInTrouble";
970 let is_bad = if let Some(peer_id) = target.as_peer_id() {
971 if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
972 *is_bad
973 } else {
974 false
975 }
976 } else {
977 false
978 };
979 let _ = sender.send(is_bad);
980 }
981 LocalSwarmCmd::QuoteVerification { quotes } => {
982 cmd_string = "QuoteVerification";
983 for (peer_id, quote) in quotes {
984 if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
986 if *is_bad {
987 continue;
988 }
989 }
990 self.verify_peer_quote(peer_id, quote);
991 }
992 }
993 LocalSwarmCmd::FetchCompleted((key, record_type)) => {
994 info!(
995 "Fetch of {record_type:?} {:?} early completed, may have fetched an old version of the record.",
996 PrettyPrintRecordKey::from(&key)
997 );
998 cmd_string = "FetchCompleted";
999 let new_keys_to_fetch = self
1000 .replication_fetcher
1001 .notify_fetch_early_completed(key, record_type);
1002 if !new_keys_to_fetch.is_empty() {
1003 self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
1004 }
1005 }
1006 LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
1007 cmd_string = "TriggerIrrelevantRecordCleanup";
1008 self.swarm
1009 .behaviour_mut()
1010 .kademlia
1011 .store_mut()
1012 .cleanup_irrelevant_records();
1013 }
1014 LocalSwarmCmd::AddNetworkDensitySample { distance } => {
1015 cmd_string = "AddNetworkDensitySample";
1016 self.network_density_samples.add(distance);
1017 }
1018 LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
1019 cmd_string = "NotifyPeerScores";
1020 self.replication_fetcher.add_peer_scores(peer_scores);
1021 }
1022 LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
1023 cmd_string = "AddFreshReplicateRecords";
1024 let _ = self.add_keys_to_replication_fetcher(holder, keys, true);
1025 }
1026 LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
1027 cmd_string = "NotifyPeerVersion";
1028 self.record_node_version(peer, version);
1029 }
1030 LocalSwarmCmd::GetNetworkDensity { sender } => {
1031 cmd_string = "GetNetworkDensity";
1032 let density = self
1033 .swarm
1034 .behaviour_mut()
1035 .kademlia
1036 .store_mut()
1037 .get_farthest_replication_distance()
1038 .unwrap_or_default();
1039 let _ = sender.send(density);
1040 }
1041 LocalSwarmCmd::RemovePeer { peer } => {
1042 cmd_string = "RemovePeer";
1043 if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer) {
1044 self.update_on_peer_removal(*dead_peer.node.key.preimage());
1045 }
1046 }
1047 }
1048
1049 self.log_handling(cmd_string.to_string(), start.elapsed());
1050
1051 Ok(())
1052 }
1053
1054 fn record_node_version(&mut self, peer_id: PeerId, version: String) {
1055 let _ = self.peers_version.insert(peer_id, version);
1056 }
1057
1058 pub(crate) fn record_node_issue(&mut self, peer_id: PeerId, issue: NodeIssue) {
1059 info!("Peer {peer_id:?} is reported as having issue {issue:?}");
1060 let (issue_vec, is_bad) = self.bad_nodes.entry(peer_id).or_default();
1061 let mut new_bad_behaviour = None;
1062 let mut is_connection_issue = false;
1063
1064 if !(*is_bad) {
1066 issue_vec.retain(|(_, timestamp)| timestamp.elapsed().as_secs() < 300);
1068
1069 if issue_vec.len() == 10 {
1072 issue_vec.remove(0);
1073 }
1074
1075 let is_new_issue = if let Some((_issue, timestamp)) = issue_vec.last() {
1078 timestamp.elapsed().as_secs() > 10
1079 } else {
1080 true
1081 };
1082
1083 if is_new_issue {
1084 issue_vec.push((issue, Instant::now()));
1085 } else {
1086 return;
1087 }
1088
1089 for (issue, _timestamp) in issue_vec.iter() {
1092 let issue_counts = issue_vec
1093 .iter()
1094 .filter(|(i, _timestamp)| *issue == *i)
1095 .count();
1096 if issue_counts >= 3 {
1097 if matches!(issue, NodeIssue::ConnectionIssue) {
1099 is_connection_issue = true;
1100 }
1101 new_bad_behaviour = Some(issue.clone());
1107 info!("Peer {peer_id:?} accumulated {issue_counts} times of issue {issue:?}. Consider it as a bad node now.");
1108 break;
1110 }
1111 }
1112 }
1113
1114 if is_connection_issue {
1117 issue_vec.retain(|(issue, _timestamp)| !matches!(issue, NodeIssue::ConnectionIssue));
1118 info!("Evicting bad peer {peer_id:?} due to connection issue from RT.");
1119 if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
1120 self.update_on_peer_removal(*dead_peer.node.key.preimage());
1121 }
1122 return;
1123 }
1124
1125 if *is_bad {
1126 info!("Evicting bad peer {peer_id:?} from RT.");
1127 let addrs = if let Some(dead_peer) =
1128 self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id)
1129 {
1130 self.update_on_peer_removal(*dead_peer.node.key.preimage());
1131 Addresses(dead_peer.node.value.into_vec())
1132 } else {
1133 Addresses(Vec::new())
1134 };
1135
1136 if let Some(bad_behaviour) = new_bad_behaviour {
1137 self.record_metrics(Marker::PeerConsideredAsBad { bad_peer: &peer_id });
1139
1140 warn!("Peer {peer_id:?} is considered as bad due to {bad_behaviour:?}. Informing the peer and adding to blocklist.");
1141 let (tx, rx) = oneshot::channel();
1143 let local_swarm_cmd_sender = self.local_cmd_sender.clone();
1144 tokio::spawn(async move {
1145 match rx.await {
1146 Ok(result) => {
1147 debug!("Got response for Cmd::PeerConsideredAsBad from {peer_id:?} {result:?}");
1148 if let Err(err) = local_swarm_cmd_sender
1149 .send(LocalSwarmCmd::AddPeerToBlockList { peer_id })
1150 .await
1151 {
1152 error!("SwarmDriver failed to send LocalSwarmCmd: {err}");
1153 }
1154 }
1155 Err(err) => {
1156 error!("Failed to get response from one shot channel for Cmd::PeerConsideredAsBad : {err:?}");
1157 }
1158 }
1159 });
1160
1161 let request = Request::Cmd(Cmd::PeerConsideredAsBad {
1163 detected_by: NetworkAddress::from(self.self_peer_id),
1164 bad_peer: NetworkAddress::from(peer_id),
1165 bad_behaviour: bad_behaviour.to_string(),
1166 });
1167 self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1168 req: request,
1169 addrs: Some(addrs),
1170 peer: peer_id,
1171 sender: Some(tx),
1172 });
1173 }
1174 }
1175 }
1176
1177 fn verify_peer_quote(&mut self, peer_id: PeerId, quote: PaymentQuote) {
1178 if let Some(history_quote) = self.quotes_history.get(&peer_id) {
1179 if !history_quote.historical_verify("e) {
1180 info!("From {peer_id:?}, detected a bad quote {quote:?} against history_quote {history_quote:?}");
1181 self.record_node_issue(peer_id, NodeIssue::BadQuoting);
1182 return;
1183 }
1184
1185 if history_quote.is_newer_than("e) {
1186 return;
1187 }
1188 }
1189
1190 let _ = self.quotes_history.insert(peer_id, quote);
1191 }
1192
1193 fn try_interval_replication(&mut self) -> Result<()> {
1194 if let Some(last_replication) = self.last_replication {
1196 if last_replication.elapsed() < MIN_REPLICATION_INTERVAL_S {
1197 info!("Skipping replication as minimum interval hasn't elapsed");
1198 return Ok(());
1199 }
1200 }
1201 self.last_replication = Some(Instant::now());
1203
1204 let self_addr = NetworkAddress::from(self.self_peer_id);
1205 let mut replicate_targets = self.get_replicate_candidates(&self_addr)?;
1206
1207 let now = Instant::now();
1208 self.replication_targets
1209 .retain(|_peer_id, timestamp| *timestamp > now);
1210 replicate_targets
1212 .retain(|(peer_id, _addresses)| !self.replication_targets.contains_key(peer_id));
1213 if replicate_targets.is_empty() {
1214 return Ok(());
1215 }
1216
1217 let all_records: Vec<_> = self
1218 .swarm
1219 .behaviour_mut()
1220 .kademlia
1221 .store_mut()
1222 .record_addresses_ref()?
1223 .values()
1224 .cloned()
1225 .collect();
1226
1227 if !all_records.is_empty() {
1228 debug!(
1229 "Sending a replication list of {} keys to {replicate_targets:?} ",
1230 all_records.len()
1231 );
1232 let request = Request::Cmd(Cmd::Replicate {
1233 holder: NetworkAddress::from(self.self_peer_id),
1234 keys: all_records
1235 .into_iter()
1236 .map(|(addr, val_type, _data_type)| (addr, val_type))
1237 .collect(),
1238 });
1239 for (peer_id, _addrs) in replicate_targets {
1240 self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1241 req: request.clone(),
1242 peer: peer_id,
1243 addrs: None,
1245 sender: None,
1246 });
1247
1248 let _ = self
1249 .replication_targets
1250 .insert(peer_id, now + REPLICATION_TIMEOUT);
1251 }
1252 }
1253
1254 Ok(())
1255 }
1256
1257 pub(crate) fn get_replicate_candidates(
1263 &mut self,
1264 target: &NetworkAddress,
1265 ) -> Result<Vec<(PeerId, Addresses)>> {
1266 let is_periodic_replicate = target.as_peer_id().is_some();
1267 let expected_candidates = if is_periodic_replicate {
1268 CLOSE_GROUP_SIZE * 2
1269 } else {
1270 CLOSE_GROUP_SIZE
1271 };
1272
1273 let closest_k_peers = self.get_closest_local_peers_to_target(target, K_VALUE.get());
1275
1276 if let Some(responsible_range) = self
1277 .swarm
1278 .behaviour_mut()
1279 .kademlia
1280 .store_mut()
1281 .get_farthest_replication_distance()?
1282 {
1283 let peers_in_range = get_peers_in_range(&closest_k_peers, target, responsible_range);
1284
1285 if peers_in_range.len() >= expected_candidates {
1286 return Ok(peers_in_range);
1287 }
1288 }
1289
1290 Ok(closest_k_peers
1292 .iter()
1293 .take(expected_candidates)
1294 .cloned()
1295 .collect())
1296 }
1297}
1298
1299fn get_peers_in_range(
1301 peers: &[(PeerId, Addresses)],
1302 address: &NetworkAddress,
1303 range: Distance,
1304) -> Vec<(PeerId, Addresses)> {
1305 peers
1306 .iter()
1307 .filter_map(|(peer_id, addresses)| {
1308 if address.distance(&NetworkAddress::from(*peer_id)) <= range {
1309 Some((*peer_id, addresses.clone()))
1310 } else {
1311 None
1312 }
1313 })
1314 .collect()
1315}