1use crate::{
10 config::GetRecordCfg,
11 driver::{PendingGetClosestType, SwarmDriver},
12 error::{NetworkError, Result},
13 event::TerminateNodeReason,
14 log_markers::Marker,
15 Addresses, GetRecordError, MsgResponder, NetworkEvent, ResponseQuorum, CLOSE_GROUP_SIZE,
16};
17use ant_evm::{PaymentQuote, QuotingMetrics};
18use ant_protocol::{
19 messages::{Cmd, Request, Response},
20 storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
21 NetworkAddress, PrettyPrintRecordKey,
22};
23use libp2p::{
24 kad::{
25 store::{Error as StoreError, RecordStore},
26 KBucketDistance as Distance, Record, RecordKey, K_VALUE,
27 },
28 swarm::dial_opts::{DialOpts, PeerCondition},
29 Multiaddr, PeerId,
30};
31use std::{
32 collections::{BTreeMap, HashMap},
33 fmt::Debug,
34 time::Duration,
35};
36use tokio::sync::oneshot;
37use xor_name::XorName;
38
39use crate::time::Instant;
40
41const MAX_CONTINUOUS_HDD_WRITE_ERROR: usize = 5;
42
43const REPLICATION_TIMEOUT: Duration = Duration::from_secs(45);
45
46const MIN_REPLICATION_INTERVAL_S: Duration = Duration::from_secs(30);
48
49#[derive(Debug, Eq, PartialEq, Clone)]
50pub enum NodeIssue {
51 ConnectionIssue,
53 ReplicationFailure,
55 CloseNodesShunning,
57 BadQuoting,
59 FailedChunkProofCheck,
61}
62
63impl std::fmt::Display for NodeIssue {
64 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
65 match self {
66 NodeIssue::ConnectionIssue => write!(f, "CriticalConnectionIssue"),
67 NodeIssue::ReplicationFailure => write!(f, "ReplicationFailure"),
68 NodeIssue::CloseNodesShunning => write!(f, "CloseNodesShunning"),
69 NodeIssue::BadQuoting => write!(f, "BadQuoting"),
70 NodeIssue::FailedChunkProofCheck => write!(f, "FailedChunkProofCheck"),
71 }
72 }
73}
74
75pub enum LocalSwarmCmd {
77 GetPeersWithMultiaddr {
79 sender: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
80 },
81 GetKBuckets {
84 sender: oneshot::Sender<BTreeMap<u32, Vec<PeerId>>>,
85 },
86 GetClosestKLocalPeers {
89 sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
90 },
91 GetCloseLocalPeersToTarget {
93 key: NetworkAddress,
94 num_of_peers: usize,
95 sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
96 },
97 GetSwarmLocalState(oneshot::Sender<SwarmLocalState>),
98 RecordStoreHasKey {
100 key: RecordKey,
101 sender: oneshot::Sender<Result<bool>>,
102 },
103 GetAllLocalRecordAddresses {
105 sender: oneshot::Sender<Result<HashMap<NetworkAddress, ValidationType>>>,
106 },
107 GetLocalRecord {
109 key: RecordKey,
110 sender: oneshot::Sender<Option<Record>>,
111 },
112 GetLocalQuotingMetrics {
115 key: RecordKey,
116 data_type: u32,
117 data_size: usize,
118 sender: oneshot::Sender<Result<(QuotingMetrics, bool)>>,
119 },
120 PaymentReceived,
122 PutLocalRecord {
124 record: Record,
125 is_client_put: bool,
126 },
127 RemoveFailedLocalRecord {
130 key: RecordKey,
131 },
132 AddLocalRecordAsStored {
135 key: RecordKey,
136 record_type: ValidationType,
137 data_type: DataTypes,
138 },
139 AddPeerToBlockList {
141 peer_id: PeerId,
142 },
143 RecordNodeIssue {
145 peer_id: PeerId,
146 issue: NodeIssue,
147 },
148 IsPeerShunned {
150 target: NetworkAddress,
151 sender: oneshot::Sender<bool>,
152 },
153 QuoteVerification {
155 quotes: Vec<(PeerId, PaymentQuote)>,
156 },
157 FetchCompleted((RecordKey, ValidationType)),
159 TriggerIntervalReplication,
162 TriggerIrrelevantRecordCleanup,
164 AddNetworkDensitySample {
166 distance: Distance,
167 },
168 NotifyPeerScores {
170 peer_scores: Vec<(PeerId, bool)>,
171 },
172 AddFreshReplicateRecords {
174 holder: NetworkAddress,
175 keys: Vec<(NetworkAddress, ValidationType)>,
176 },
177 NotifyPeerVersion {
179 peer: PeerId,
180 version: String,
181 },
182}
183
184pub enum NetworkSwarmCmd {
186 GetClosestPeersToAddressFromNetwork {
188 key: NetworkAddress,
189 sender: oneshot::Sender<Vec<(PeerId, Addresses)>>,
190 },
191
192 SendRequest {
194 req: Request,
195 peer: PeerId,
196 addrs: Option<Addresses>,
198
199 sender: Option<oneshot::Sender<Result<Response>>>,
206 },
207 SendResponse {
208 resp: Response,
209 channel: MsgResponder,
210 },
211
212 GetNetworkRecord {
214 key: RecordKey,
215 sender: oneshot::Sender<std::result::Result<Record, GetRecordError>>,
216 cfg: GetRecordCfg,
217 },
218
219 PutRecord {
221 record: Record,
222 sender: oneshot::Sender<Result<()>>,
223 quorum: ResponseQuorum,
224 },
225 PutRecordTo {
227 peers: Vec<PeerId>,
228 record: Record,
229 sender: oneshot::Sender<Result<()>>,
230 quorum: ResponseQuorum,
231 },
232
233 DialPeer {
235 peer: PeerId,
236 addrs: Addresses,
237 },
238}
239
240impl Debug for LocalSwarmCmd {
243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244 match self {
245 LocalSwarmCmd::PutLocalRecord {
246 record,
247 is_client_put,
248 } => {
249 write!(
250 f,
251 "LocalSwarmCmd::PutLocalRecord {{ key: {:?}, is_client_put: {is_client_put:?} }}",
252 PrettyPrintRecordKey::from(&record.key)
253 )
254 }
255 LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
256 write!(
257 f,
258 "LocalSwarmCmd::RemoveFailedLocalRecord {{ key: {:?} }}",
259 PrettyPrintRecordKey::from(key)
260 )
261 }
262 LocalSwarmCmd::AddLocalRecordAsStored {
263 key,
264 record_type,
265 data_type,
266 } => {
267 write!(
268 f,
269 "LocalSwarmCmd::AddLocalRecordAsStored {{ key: {:?}, record_type: {record_type:?}, data_type: {data_type:?} }}",
270 PrettyPrintRecordKey::from(key)
271 )
272 }
273 LocalSwarmCmd::GetClosestKLocalPeers { .. } => {
274 write!(f, "LocalSwarmCmd::GetClosestKLocalPeers")
275 }
276 LocalSwarmCmd::GetCloseLocalPeersToTarget {
277 key, num_of_peers, ..
278 } => {
279 write!(
280 f,
281 "LocalSwarmCmd::GetCloseLocalPeersToTarget {{ key: {key:?}, num_of_peers: {num_of_peers} }}"
282 )
283 }
284 LocalSwarmCmd::GetLocalQuotingMetrics { .. } => {
285 write!(f, "LocalSwarmCmd::GetLocalQuotingMetrics")
286 }
287 LocalSwarmCmd::PaymentReceived => {
288 write!(f, "LocalSwarmCmd::PaymentReceived")
289 }
290 LocalSwarmCmd::GetLocalRecord { key, .. } => {
291 write!(
292 f,
293 "LocalSwarmCmd::GetLocalRecord {{ key: {:?} }}",
294 PrettyPrintRecordKey::from(key)
295 )
296 }
297 LocalSwarmCmd::GetAllLocalRecordAddresses { .. } => {
298 write!(f, "LocalSwarmCmd::GetAllLocalRecordAddresses")
299 }
300 LocalSwarmCmd::GetPeersWithMultiaddr { .. } => {
301 write!(f, "LocalSwarmCmd::GetPeersWithMultiaddr")
302 }
303 LocalSwarmCmd::GetKBuckets { .. } => {
304 write!(f, "LocalSwarmCmd::GetKBuckets")
305 }
306 LocalSwarmCmd::GetSwarmLocalState { .. } => {
307 write!(f, "LocalSwarmCmd::GetSwarmLocalState")
308 }
309 LocalSwarmCmd::RecordStoreHasKey { key, .. } => {
310 write!(
311 f,
312 "LocalSwarmCmd::RecordStoreHasKey {:?}",
313 PrettyPrintRecordKey::from(key)
314 )
315 }
316 LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
317 write!(f, "LocalSwarmCmd::AddPeerToBlockList {peer_id:?}")
318 }
319 LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
320 write!(
321 f,
322 "LocalSwarmCmd::SendNodeStatus peer {peer_id:?}, issue: {issue:?}"
323 )
324 }
325 LocalSwarmCmd::IsPeerShunned { target, .. } => {
326 write!(f, "LocalSwarmCmd::IsPeerInTrouble target: {target:?}")
327 }
328 LocalSwarmCmd::QuoteVerification { quotes } => {
329 write!(
330 f,
331 "LocalSwarmCmd::QuoteVerification of {} quotes",
332 quotes.len()
333 )
334 }
335 LocalSwarmCmd::FetchCompleted((key, record_type)) => {
336 write!(
337 f,
338 "LocalSwarmCmd::FetchCompleted({record_type:?} : {:?})",
339 PrettyPrintRecordKey::from(key)
340 )
341 }
342 LocalSwarmCmd::TriggerIntervalReplication => {
343 write!(f, "LocalSwarmCmd::TriggerIntervalReplication")
344 }
345 LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
346 write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup")
347 }
348 LocalSwarmCmd::AddNetworkDensitySample { distance } => {
349 write!(f, "LocalSwarmCmd::AddNetworkDensitySample({distance:?})")
350 }
351 LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
352 write!(f, "LocalSwarmCmd::NotifyPeerScores({peer_scores:?})")
353 }
354 LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
355 write!(
356 f,
357 "LocalSwarmCmd::AddFreshReplicateRecords({holder:?}, {keys:?})"
358 )
359 }
360 LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
361 write!(f, "LocalSwarmCmd::NotifyPeerVersion({peer:?}, {version:?})")
362 }
363 }
364 }
365}
366
367impl Debug for NetworkSwarmCmd {
370 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
371 match self {
372 NetworkSwarmCmd::GetNetworkRecord { key, cfg, .. } => {
373 write!(
374 f,
375 "NetworkSwarmCmd::GetNetworkRecord {{ key: {:?}, cfg: {cfg:?}",
376 PrettyPrintRecordKey::from(key)
377 )
378 }
379 NetworkSwarmCmd::PutRecord { record, .. } => {
380 write!(
381 f,
382 "NetworkSwarmCmd::PutRecord {{ key: {:?} }}",
383 PrettyPrintRecordKey::from(&record.key)
384 )
385 }
386 NetworkSwarmCmd::PutRecordTo { peers, record, .. } => {
387 write!(
388 f,
389 "NetworkSwarmCmd::PutRecordTo {{ peers: {peers:?}, key: {:?} }}",
390 PrettyPrintRecordKey::from(&record.key)
391 )
392 }
393 NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, .. } => {
394 write!(f, "NetworkSwarmCmd::GetClosestPeers {{ key: {key:?} }}")
395 }
396 NetworkSwarmCmd::SendResponse { resp, .. } => {
397 write!(f, "NetworkSwarmCmd::SendResponse resp: {resp:?}")
398 }
399 NetworkSwarmCmd::SendRequest { req, peer, .. } => {
400 write!(
401 f,
402 "NetworkSwarmCmd::SendRequest req: {req:?}, peer: {peer:?}"
403 )
404 }
405 NetworkSwarmCmd::DialPeer { peer, .. } => {
406 write!(f, "NetworkSwarmCmd::DialPeer peer: {peer:?}")
407 }
408 }
409 }
410}
411#[derive(Debug, Clone)]
413pub struct SwarmLocalState {
414 pub connected_peers: Vec<PeerId>,
416 pub listeners: Vec<Multiaddr>,
418}
419
420impl SwarmDriver {
421 pub(crate) fn handle_network_cmd(&mut self, cmd: NetworkSwarmCmd) -> Result<(), NetworkError> {
422 let start = Instant::now();
423 let cmd_string;
424 match cmd {
425 NetworkSwarmCmd::GetNetworkRecord { key, sender, cfg } => {
426 cmd_string = "GetNetworkRecord";
427
428 for (pending_query, (inflight_record_query_key, senders, _, _)) in
429 self.pending_get_record.iter_mut()
430 {
431 if *inflight_record_query_key == key {
432 debug!(
433 "GetNetworkRecord for {:?} is already in progress. Adding sender to {pending_query:?}",
434 PrettyPrintRecordKey::from(&key)
435 );
436 senders.push(sender);
437
438 return Ok(());
440 }
441 }
442
443 let query_id = self.swarm.behaviour_mut().kademlia.get_record(key.clone());
444
445 debug!(
446 "Record {:?} with task {query_id:?} expected to be held by {:?}",
447 PrettyPrintRecordKey::from(&key),
448 cfg.expected_holders
449 );
450
451 if self
452 .pending_get_record
453 .insert(query_id, (key, vec![sender], Default::default(), cfg))
454 .is_some()
455 {
456 warn!("An existing get_record task {query_id:?} got replaced");
457 }
458 let total_records: usize = self
461 .pending_get_record
462 .iter()
463 .map(|(_, (_, _, result_map, _))| result_map.len())
464 .sum();
465 info!("We now have {} pending get record attempts and cached {total_records} fetched copies",
466 self.pending_get_record.len());
467 }
468 NetworkSwarmCmd::PutRecord {
469 record,
470 sender,
471 quorum,
472 } => {
473 cmd_string = "PutRecord";
474 let record_key = PrettyPrintRecordKey::from(&record.key).into_owned();
475 debug!(
476 "Putting record sized: {:?} to network {:?}",
477 record.value.len(),
478 record_key
479 );
480 let res = match self
481 .swarm
482 .behaviour_mut()
483 .kademlia
484 .put_record(record, quorum.get_kad_quorum())
485 {
486 Ok(request_id) => {
487 debug!("Sent record {record_key:?} to network. Request id: {request_id:?} to network");
488 Ok(())
489 }
490 Err(error) => {
491 error!("Error sending record {record_key:?} to network");
492 Err(NetworkError::from(error))
493 }
494 };
495
496 if let Err(err) = sender.send(res) {
497 error!("Could not send response to PutRecord cmd: {:?}", err);
498 }
499 }
500 NetworkSwarmCmd::PutRecordTo {
501 peers,
502 record,
503 sender,
504 quorum,
505 } => {
506 cmd_string = "PutRecordTo";
507 let record_key = PrettyPrintRecordKey::from(&record.key).into_owned();
508 debug!(
509 "Putting record {record_key:?} sized: {:?} to {peers:?}",
510 record.value.len(),
511 );
512 let peers_count = peers.len();
513 let request_id = self.swarm.behaviour_mut().kademlia.put_record_to(
514 record,
515 peers.into_iter(),
516 quorum.get_kad_quorum(),
517 );
518 debug!("Sent record {record_key:?} to {peers_count:?} peers. Request id: {request_id:?}");
519
520 if let Err(err) = sender.send(Ok(())) {
521 error!("Could not send response to PutRecordTo cmd: {:?}", err);
522 }
523 }
524 NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { key, sender } => {
525 cmd_string = "GetClosestPeersToAddressFromNetwork";
526 let query_id = self
527 .swarm
528 .behaviour_mut()
529 .kademlia
530 .get_closest_peers(key.as_bytes());
531 let _ = self.pending_get_closest_peers.insert(
532 query_id,
533 (
534 PendingGetClosestType::FunctionCall(sender),
535 Default::default(),
536 ),
537 );
538 }
539
540 NetworkSwarmCmd::SendRequest {
541 req,
542 peer,
543 addrs,
544 sender,
545 } => {
546 cmd_string = "SendRequest";
547 if peer == *self.swarm.local_peer_id() {
551 trace!("Sending query request to self");
552 if let Request::Query(query) = req {
553 self.send_event(NetworkEvent::QueryRequestReceived {
554 query,
555 channel: MsgResponder::FromSelf(sender),
556 });
557 } else {
558 trace!("Replicate cmd to self received, ignoring");
561 }
562 } else {
563 if let Some(addrs) = addrs {
564 if addrs.0.is_empty() {
566 info!("No addresses for peer {peer:?} to send request. This could cause dial failure if swarm could not find the peer's addrs.");
567 } else {
568 let opts = DialOpts::peer_id(peer)
569 .condition(PeerCondition::NotDialing)
571 .addresses(addrs.0.clone())
572 .build();
573
574 match self.swarm.dial(opts) {
575 Ok(()) => {
576 info!("Dialing peer {peer:?} for req_resp with address: {addrs:?}",);
577 }
578 Err(err) => {
579 error!("Failed to dial peer {peer:?} for req_resp with address: {addrs:?} error: {err}",);
580 }
581 }
582 }
583 }
584
585 let request_id = self
586 .swarm
587 .behaviour_mut()
588 .request_response
589 .send_request(&peer, req);
590 trace!("Sending request {request_id:?} to peer {peer:?}");
591 let _ = self.pending_requests.insert(request_id, sender);
592
593 trace!("Pending Requests now: {:?}", self.pending_requests.len());
594 }
595 }
596 NetworkSwarmCmd::SendResponse { resp, channel } => {
597 cmd_string = "SendResponse";
598 match channel {
599 MsgResponder::FromSelf(channel) => {
601 trace!("Sending response to self");
602 match channel {
603 Some(channel) => {
604 channel
605 .send(Ok(resp))
606 .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
607 }
608 None => {
609 self.send_event(NetworkEvent::ResponseReceived { res: resp });
612 }
613 }
614 }
615 MsgResponder::FromPeer(channel) => {
616 self.swarm
617 .behaviour_mut()
618 .request_response
619 .send_response(channel, resp)
620 .map_err(NetworkError::OutgoingResponseDropped)?;
621 }
622 }
623 }
624 NetworkSwarmCmd::DialPeer { peer, addrs } => {
625 cmd_string = "DialPeer";
626 let opts = DialOpts::peer_id(peer)
627 .condition(PeerCondition::NotDialing)
629 .addresses(addrs.0.clone())
630 .build();
631
632 match self.swarm.dial(opts) {
633 Ok(()) => {
634 info!("Manual dialing peer {peer:?} with address: {addrs:?}",);
635 }
636 Err(err) => {
637 error!("Failed to manual dial peer {peer:?} with address: {addrs:?} error: {err}",);
638 }
639 }
640 }
641 }
642
643 self.log_handling(cmd_string.to_string(), start.elapsed());
644
645 Ok(())
646 }
647 pub(crate) fn handle_local_cmd(&mut self, cmd: LocalSwarmCmd) -> Result<(), NetworkError> {
648 let start = Instant::now();
649 let mut cmd_string;
650 match cmd {
651 LocalSwarmCmd::TriggerIntervalReplication => {
652 cmd_string = "TriggerIntervalReplication";
653 self.try_interval_replication()?;
654 }
655 LocalSwarmCmd::GetLocalQuotingMetrics {
656 key,
657 data_type,
658 data_size,
659 sender,
660 } => {
661 cmd_string = "GetLocalQuotingMetrics";
662 let kbucket_status = self.get_kbuckets_status();
663 self.update_on_kbucket_status(&kbucket_status);
664 let (quoting_metrics, is_already_stored) = match self
665 .swarm
666 .behaviour_mut()
667 .kademlia
668 .store_mut()
669 .quoting_metrics(
670 &key,
671 data_type,
672 data_size,
673 Some(kbucket_status.estimated_network_size as u64),
674 ) {
675 Ok(res) => res,
676 Err(err) => {
677 let _res = sender.send(Err(err));
678 return Ok(());
679 }
680 };
681 self.record_metrics(Marker::QuotingMetrics {
682 quoting_metrics: "ing_metrics,
683 });
684
685 let mut bad_nodes: Vec<_> = self
688 .bad_nodes
689 .iter()
690 .filter_map(|(peer_id, (_issue_list, is_bad))| {
691 if *is_bad {
692 Some(NetworkAddress::from_peer(*peer_id))
693 } else {
694 None
695 }
696 })
697 .collect();
698
699 let kbucket_key = NetworkAddress::from_record_key(&key).as_kbucket_key();
701 let closest_peers: Vec<_> = self
702 .swarm
703 .behaviour_mut()
704 .kademlia
705 .get_closest_local_peers(&kbucket_key)
706 .map(|peer| peer.into_preimage())
707 .take(CLOSE_GROUP_SIZE)
708 .collect();
709 if closest_peers.len() >= CLOSE_GROUP_SIZE {
711 let boundary_peer = closest_peers[CLOSE_GROUP_SIZE - 1];
712 let key_address = NetworkAddress::from_record_key(&key);
713 let boundary_distance =
714 key_address.distance(&NetworkAddress::from_peer(boundary_peer));
715 bad_nodes
716 .retain(|peer_addr| key_address.distance(peer_addr) < boundary_distance);
717 }
718
719 let _res = sender.send(Ok((quoting_metrics, is_already_stored)));
720 }
721 LocalSwarmCmd::PaymentReceived => {
722 cmd_string = "PaymentReceived";
723 self.swarm
724 .behaviour_mut()
725 .kademlia
726 .store_mut()
727 .payment_received();
728 }
729 LocalSwarmCmd::GetLocalRecord { key, sender } => {
730 cmd_string = "GetLocalRecord";
731 let record = self
732 .swarm
733 .behaviour_mut()
734 .kademlia
735 .store_mut()
736 .get(&key)
737 .map(|rec| rec.into_owned());
738 let _ = sender.send(record);
739 }
740
741 LocalSwarmCmd::PutLocalRecord {
742 record,
743 is_client_put,
744 } => {
745 cmd_string = "PutLocalRecord";
746 let key = record.key.clone();
747 let record_key = PrettyPrintRecordKey::from(&key);
748
749 let record_type = match RecordHeader::from_record(&record) {
750 Ok(record_header) => {
751 match record_header.kind {
752 RecordKind::DataOnly(DataTypes::Chunk) => ValidationType::Chunk,
753 RecordKind::DataOnly(_) => {
754 let content_hash = XorName::from_content(&record.value);
755 ValidationType::NonChunk(content_hash)
756 }
757 RecordKind::DataWithPayment(_) => {
758 error!("Record {record_key:?} with payment shall not be stored locally.");
759 return Err(NetworkError::InCorrectRecordHeader);
760 }
761 }
762 }
763 Err(err) => {
764 error!("For record {record_key:?}, failed to parse record_header {err:?}");
765 return Err(NetworkError::InCorrectRecordHeader);
766 }
767 };
768
769 let result = self
770 .swarm
771 .behaviour_mut()
772 .kademlia
773 .store_mut()
774 .put_verified(record, record_type.clone(), is_client_put);
775
776 match result {
777 Ok(_) => {
778 }
785 Err(StoreError::MaxRecords) => {
786 let farthest = self
789 .swarm
790 .behaviour_mut()
791 .kademlia
792 .store_mut()
793 .get_farthest()?;
794 self.replication_fetcher.set_farthest_on_full(farthest);
795 }
796 Err(_) => {
797 }
800 }
801
802 let new_keys_to_fetch = self
807 .replication_fetcher
808 .notify_about_new_put(key.clone(), record_type);
809
810 if !new_keys_to_fetch.is_empty() {
811 self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
812 }
813
814 if let Some(distance) = self
817 .swarm
818 .behaviour_mut()
819 .kademlia
820 .store_mut()
821 .get_farthest_replication_distance()?
822 {
823 self.replication_fetcher
824 .set_replication_distance_range(distance);
825 }
826
827 if let Err(err) = result {
828 error!("Can't store verified record {record_key:?} locally: {err:?}");
829 cmd_string = "PutLocalRecord error";
830 self.log_handling(cmd_string.to_string(), start.elapsed());
831 return Err(err.into());
832 };
833 }
834 LocalSwarmCmd::AddLocalRecordAsStored {
835 key,
836 record_type,
837 data_type,
838 } => {
839 cmd_string = "AddLocalRecordAsStored";
840 self.swarm
841 .behaviour_mut()
842 .kademlia
843 .store_mut()
844 .mark_as_stored(key, record_type, data_type);
845 self.hard_disk_write_error = 0;
847 }
848 LocalSwarmCmd::RemoveFailedLocalRecord { key } => {
849 info!("Removing Record locally, for {key:?}");
850 cmd_string = "RemoveFailedLocalRecord";
851 self.swarm.behaviour_mut().kademlia.store_mut().remove(&key);
852 self.hard_disk_write_error = self.hard_disk_write_error.saturating_add(1);
853 if self.hard_disk_write_error > MAX_CONTINUOUS_HDD_WRITE_ERROR {
856 self.send_event(NetworkEvent::TerminateNode {
857 reason: TerminateNodeReason::HardDiskWriteError,
858 });
859 }
860 }
861 LocalSwarmCmd::RecordStoreHasKey { key, sender } => {
862 cmd_string = "RecordStoreHasKey";
863 let has_key = self
864 .swarm
865 .behaviour_mut()
866 .kademlia
867 .store_mut()
868 .contains(&key);
869 let _ = sender.send(has_key);
870 }
871 LocalSwarmCmd::GetAllLocalRecordAddresses { sender } => {
872 cmd_string = "GetAllLocalRecordAddresses";
873 #[allow(clippy::mutable_key_type)] let addresses = self
875 .swarm
876 .behaviour_mut()
877 .kademlia
878 .store_mut()
879 .record_addresses();
880 let _ = sender.send(addresses);
881 }
882 LocalSwarmCmd::GetKBuckets { sender } => {
883 cmd_string = "GetKBuckets";
884 let mut ilog2_kbuckets = BTreeMap::new();
885 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
886 let range = kbucket.range();
887 if let Some(distance) = range.0.ilog2() {
888 let peers_in_kbucket = kbucket
889 .iter()
890 .map(|peer_entry| peer_entry.node.key.into_preimage())
891 .collect::<Vec<PeerId>>();
892 let _ = ilog2_kbuckets.insert(distance, peers_in_kbucket);
893 } else {
894 error!("bucket is ourself ???!!!");
896 }
897 }
898 let _ = sender.send(ilog2_kbuckets);
899 }
900 LocalSwarmCmd::GetPeersWithMultiaddr { sender } => {
901 cmd_string = "GetPeersWithMultiAddr";
902 let mut result: Vec<(PeerId, Vec<Multiaddr>)> = vec![];
903 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
904 let peers_in_kbucket = kbucket
905 .iter()
906 .map(|peer_entry| {
907 (
908 peer_entry.node.key.into_preimage(),
909 peer_entry.node.value.clone().into_vec(),
910 )
911 })
912 .collect::<Vec<(PeerId, Vec<Multiaddr>)>>();
913 result.extend(peers_in_kbucket);
914 }
915 let _ = sender.send(result);
916 }
917 LocalSwarmCmd::GetCloseLocalPeersToTarget {
918 key,
919 num_of_peers,
920 sender,
921 } => {
922 cmd_string = "GetCloseLocalPeersToTarget";
923 let closest_peers = self.get_closest_local_peers_to_target(&key, num_of_peers);
924
925 let _ = sender.send(closest_peers);
926 }
927 LocalSwarmCmd::GetClosestKLocalPeers { sender } => {
928 cmd_string = "GetClosestKLocalPeers";
929 let _ = sender.send(self.get_closest_k_value_local_peers());
930 }
931 LocalSwarmCmd::GetSwarmLocalState(sender) => {
932 cmd_string = "GetSwarmLocalState";
933 let current_state = SwarmLocalState {
934 connected_peers: self.swarm.connected_peers().cloned().collect(),
935 listeners: self.swarm.listeners().cloned().collect(),
936 };
937
938 sender
939 .send(current_state)
940 .map_err(|_| NetworkError::InternalMsgChannelDropped)?;
941 }
942 LocalSwarmCmd::AddPeerToBlockList { peer_id } => {
943 cmd_string = "AddPeerToBlockList";
944 self.swarm.behaviour_mut().blocklist.block_peer(peer_id);
945 }
946 LocalSwarmCmd::RecordNodeIssue { peer_id, issue } => {
947 cmd_string = "RecordNodeIssues";
948 self.record_node_issue(peer_id, issue);
949 }
950 LocalSwarmCmd::IsPeerShunned { target, sender } => {
951 cmd_string = "IsPeerInTrouble";
952 let is_bad = if let Some(peer_id) = target.as_peer_id() {
953 if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
954 *is_bad
955 } else {
956 false
957 }
958 } else {
959 false
960 };
961 let _ = sender.send(is_bad);
962 }
963 LocalSwarmCmd::QuoteVerification { quotes } => {
964 cmd_string = "QuoteVerification";
965 for (peer_id, quote) in quotes {
966 if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
968 if *is_bad {
969 continue;
970 }
971 }
972 self.verify_peer_quote(peer_id, quote);
973 }
974 }
975 LocalSwarmCmd::FetchCompleted((key, record_type)) => {
976 info!(
977 "Fetch of {record_type:?} {:?} early completed, may have fetched an old version of the record.",
978 PrettyPrintRecordKey::from(&key)
979 );
980 cmd_string = "FetchCompleted";
981 let new_keys_to_fetch = self
982 .replication_fetcher
983 .notify_fetch_early_completed(key, record_type);
984 if !new_keys_to_fetch.is_empty() {
985 self.send_event(NetworkEvent::KeysToFetchForReplication(new_keys_to_fetch));
986 }
987 }
988 LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
989 cmd_string = "TriggerIrrelevantRecordCleanup";
990 self.swarm
991 .behaviour_mut()
992 .kademlia
993 .store_mut()
994 .cleanup_irrelevant_records();
995 }
996 LocalSwarmCmd::AddNetworkDensitySample { distance } => {
997 cmd_string = "AddNetworkDensitySample";
998 self.network_density_samples.add(distance);
999 }
1000 LocalSwarmCmd::NotifyPeerScores { peer_scores } => {
1001 cmd_string = "NotifyPeerScores";
1002 self.replication_fetcher.add_peer_scores(peer_scores);
1003 }
1004 LocalSwarmCmd::AddFreshReplicateRecords { holder, keys } => {
1005 cmd_string = "AddFreshReplicateRecords";
1006 let _ = self.add_keys_to_replication_fetcher(holder, keys, true);
1007 }
1008 LocalSwarmCmd::NotifyPeerVersion { peer, version } => {
1009 cmd_string = "NotifyPeerVersion";
1010 self.record_node_version(peer, version);
1011 }
1012 }
1013
1014 self.log_handling(cmd_string.to_string(), start.elapsed());
1015
1016 Ok(())
1017 }
1018
1019 fn record_node_version(&mut self, peer_id: PeerId, version: String) {
1020 let _ = self.peers_version.insert(peer_id, version);
1021
1022 let mut peers_in_non_full_buckets = vec![];
1024 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
1025 let num_entires = kbucket.num_entries();
1026 if num_entires >= K_VALUE.get() {
1027 continue;
1028 } else {
1029 let peers_in_kbucket = kbucket
1030 .iter()
1031 .map(|peer_entry| peer_entry.node.key.into_preimage())
1032 .collect::<Vec<PeerId>>();
1033 peers_in_non_full_buckets.extend(peers_in_kbucket);
1034 }
1035 }
1036
1037 self.peers_version
1039 .retain(|peer_id, _version| peers_in_non_full_buckets.contains(peer_id));
1040 }
1041
1042 pub(crate) fn record_node_issue(&mut self, peer_id: PeerId, issue: NodeIssue) {
1043 info!("Peer {peer_id:?} is reported as having issue {issue:?}");
1044 let (issue_vec, is_bad) = self.bad_nodes.entry(peer_id).or_default();
1045 let mut new_bad_behaviour = None;
1046 let mut is_connection_issue = false;
1047
1048 if !(*is_bad) {
1050 issue_vec.retain(|(_, timestamp)| timestamp.elapsed().as_secs() < 300);
1052
1053 if issue_vec.len() == 10 {
1056 issue_vec.remove(0);
1057 }
1058
1059 let is_new_issue = if let Some((_issue, timestamp)) = issue_vec.last() {
1062 timestamp.elapsed().as_secs() > 10
1063 } else {
1064 true
1065 };
1066
1067 if is_new_issue {
1068 issue_vec.push((issue, Instant::now()));
1069 } else {
1070 return;
1071 }
1072
1073 for (issue, _timestamp) in issue_vec.iter() {
1076 let issue_counts = issue_vec
1077 .iter()
1078 .filter(|(i, _timestamp)| *issue == *i)
1079 .count();
1080 if issue_counts >= 3 {
1081 if matches!(issue, NodeIssue::ConnectionIssue) {
1083 is_connection_issue = true;
1084 }
1085 new_bad_behaviour = Some(issue.clone());
1091 info!("Peer {peer_id:?} accumulated {issue_counts} times of issue {issue:?}. Consider it as a bad node now.");
1092 break;
1094 }
1095 }
1096 }
1097
1098 if is_connection_issue {
1101 issue_vec.retain(|(issue, _timestamp)| !matches!(issue, NodeIssue::ConnectionIssue));
1102 info!("Evicting bad peer {peer_id:?} due to connection issue from RT.");
1103 if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
1104 self.update_on_peer_removal(*dead_peer.node.key.preimage());
1105 }
1106 return;
1107 }
1108
1109 if *is_bad {
1110 info!("Evicting bad peer {peer_id:?} from RT.");
1111 let addrs = if let Some(dead_peer) =
1112 self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id)
1113 {
1114 self.update_on_peer_removal(*dead_peer.node.key.preimage());
1115 Addresses(dead_peer.node.value.into_vec())
1116 } else {
1117 Addresses(Vec::new())
1118 };
1119
1120 if let Some(bad_behaviour) = new_bad_behaviour {
1121 self.record_metrics(Marker::PeerConsideredAsBad { bad_peer: &peer_id });
1123
1124 warn!("Peer {peer_id:?} is considered as bad due to {bad_behaviour:?}. Informing the peer and adding to blocklist.");
1125 let (tx, rx) = oneshot::channel();
1127 let local_swarm_cmd_sender = self.local_cmd_sender.clone();
1128 tokio::spawn(async move {
1129 match rx.await {
1130 Ok(result) => {
1131 debug!("Got response for Cmd::PeerConsideredAsBad from {peer_id:?} {result:?}");
1132 if let Err(err) = local_swarm_cmd_sender
1133 .send(LocalSwarmCmd::AddPeerToBlockList { peer_id })
1134 .await
1135 {
1136 error!("SwarmDriver failed to send LocalSwarmCmd: {err}");
1137 }
1138 }
1139 Err(err) => {
1140 error!("Failed to get response from one shot channel for Cmd::PeerConsideredAsBad : {err:?}");
1141 }
1142 }
1143 });
1144
1145 let request = Request::Cmd(Cmd::PeerConsideredAsBad {
1147 detected_by: NetworkAddress::from_peer(self.self_peer_id),
1148 bad_peer: NetworkAddress::from_peer(peer_id),
1149 bad_behaviour: bad_behaviour.to_string(),
1150 });
1151 self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1152 req: request,
1153 addrs: Some(addrs),
1154 peer: peer_id,
1155 sender: Some(tx),
1156 });
1157 }
1158 }
1159 }
1160
1161 fn verify_peer_quote(&mut self, peer_id: PeerId, quote: PaymentQuote) {
1162 if let Some(history_quote) = self.quotes_history.get(&peer_id) {
1163 if !history_quote.historical_verify("e) {
1164 info!("From {peer_id:?}, detected a bad quote {quote:?} against history_quote {history_quote:?}");
1165 self.record_node_issue(peer_id, NodeIssue::BadQuoting);
1166 return;
1167 }
1168
1169 if history_quote.is_newer_than("e) {
1170 return;
1171 }
1172 }
1173
1174 let _ = self.quotes_history.insert(peer_id, quote);
1175 }
1176
1177 fn try_interval_replication(&mut self) -> Result<()> {
1178 if let Some(last_replication) = self.last_replication {
1180 if last_replication.elapsed() < MIN_REPLICATION_INTERVAL_S {
1181 info!("Skipping replication as minimum interval hasn't elapsed");
1182 return Ok(());
1183 }
1184 }
1185 self.last_replication = Some(Instant::now());
1187
1188 let self_addr = NetworkAddress::from_peer(self.self_peer_id);
1189 let mut replicate_targets = self.get_replicate_candidates(&self_addr)?;
1190
1191 let now = Instant::now();
1192 self.replication_targets
1193 .retain(|_peer_id, timestamp| *timestamp > now);
1194 replicate_targets
1196 .retain(|(peer_id, _addresses)| !self.replication_targets.contains_key(peer_id));
1197 if replicate_targets.is_empty() {
1198 return Ok(());
1199 }
1200
1201 let all_records: Vec<_> = self
1202 .swarm
1203 .behaviour_mut()
1204 .kademlia
1205 .store_mut()
1206 .record_addresses_ref()?
1207 .values()
1208 .cloned()
1209 .collect();
1210
1211 if !all_records.is_empty() {
1212 debug!(
1213 "Sending a replication list of {} keys to {replicate_targets:?} ",
1214 all_records.len()
1215 );
1216 let request = Request::Cmd(Cmd::Replicate {
1217 holder: NetworkAddress::from_peer(self.self_peer_id),
1218 keys: all_records
1219 .into_iter()
1220 .map(|(addr, val_type, _data_type)| (addr, val_type))
1221 .collect(),
1222 });
1223 for (peer_id, _addrs) in replicate_targets {
1224 self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
1225 req: request.clone(),
1226 peer: peer_id,
1227 addrs: None,
1229 sender: None,
1230 });
1231
1232 let _ = self
1233 .replication_targets
1234 .insert(peer_id, now + REPLICATION_TIMEOUT);
1235 }
1236 }
1237
1238 Ok(())
1239 }
1240
1241 pub(crate) fn get_replicate_candidates(
1247 &mut self,
1248 target: &NetworkAddress,
1249 ) -> Result<Vec<(PeerId, Addresses)>> {
1250 let is_periodic_replicate = target.as_peer_id().is_some();
1251 let expected_candidates = if is_periodic_replicate {
1252 CLOSE_GROUP_SIZE * 2
1253 } else {
1254 CLOSE_GROUP_SIZE
1255 };
1256
1257 let closest_k_peers = self.get_closest_local_peers_to_target(target, K_VALUE.get());
1259
1260 if let Some(responsible_range) = self
1261 .swarm
1262 .behaviour_mut()
1263 .kademlia
1264 .store_mut()
1265 .get_farthest_replication_distance()?
1266 {
1267 let peers_in_range = get_peers_in_range(&closest_k_peers, target, responsible_range);
1268
1269 if peers_in_range.len() >= expected_candidates {
1270 return Ok(peers_in_range);
1271 }
1272 }
1273
1274 Ok(closest_k_peers
1276 .iter()
1277 .take(expected_candidates)
1278 .cloned()
1279 .collect())
1280 }
1281}
1282
1283fn get_peers_in_range(
1285 peers: &[(PeerId, Addresses)],
1286 address: &NetworkAddress,
1287 range: Distance,
1288) -> Vec<(PeerId, Addresses)> {
1289 peers
1290 .iter()
1291 .filter_map(|(peer_id, addresses)| {
1292 if address.distance(&NetworkAddress::from_peer(*peer_id)) <= range {
1293 Some((*peer_id, addresses.clone()))
1294 } else {
1295 None
1296 }
1297 })
1298 .collect()
1299}