1#[macro_use]
10extern crate tracing;
11
12mod bootstrap;
13mod circular_vec;
14mod cmd;
15mod config;
16mod driver;
17mod error;
18mod event;
19mod external_address;
20mod fifo_register;
21mod graph;
22mod log_markers;
23#[cfg(feature = "open-metrics")]
24mod metrics;
25mod network_discovery;
26mod record_store;
27mod record_store_api;
28mod relay_manager;
29mod replication_fetcher;
30pub mod time;
31mod transport;
32
33use cmd::LocalSwarmCmd;
34use xor_name::XorName;
35
36pub use self::{
38 cmd::{NodeIssue, SwarmLocalState},
39 config::{GetRecordCfg, PutRecordCfg, ResponseQuorum, RetryStrategy, VerificationKind},
40 driver::{NetworkBuilder, SwarmDriver, MAX_PACKET_SIZE},
41 error::{GetRecordError, NetworkError},
42 event::{MsgResponder, NetworkEvent},
43 graph::get_graph_entry_from_record,
44 record_store::NodeRecordStore,
45};
46#[cfg(feature = "open-metrics")]
47pub use metrics::service::MetricsRegistries;
48pub use time::{interval, sleep, spawn, Instant, Interval};
49
50use self::{cmd::NetworkSwarmCmd, error::Result};
51use ant_evm::{PaymentQuote, QuotingMetrics};
52use ant_protocol::{
53 error::Error as ProtocolError,
54 messages::{ChunkProof, Nonce, Query, QueryResponse, Request, Response},
55 storage::{DataTypes, Pointer, Scratchpad, ValidationType},
56 NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
57};
58use futures::future::select_all;
59use libp2p::{
60 identity::Keypair,
61 kad::{KBucketDistance, KBucketKey, Record, RecordKey},
62 multiaddr::Protocol,
63 request_response::OutboundFailure,
64 Multiaddr, PeerId,
65};
66use rand::Rng;
67use std::{
68 collections::{BTreeMap, HashMap},
69 net::IpAddr,
70 sync::Arc,
71};
72use tokio::sync::{
73 mpsc::{self, Sender},
74 oneshot,
75};
76use tokio::time::Duration;
77use {
78 ant_protocol::storage::GraphEntry,
79 ant_protocol::storage::{
80 try_deserialize_record, try_serialize_record, RecordHeader, RecordKind,
81 },
82 std::collections::HashSet,
83};
84
85#[inline]
87pub const fn close_group_majority() -> usize {
88 CLOSE_GROUP_SIZE / 2 + 1
91}
92
93const MAX_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(750);
95const MIN_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(300);
97
98pub fn sort_peers_by_address(
101 peers: Vec<(PeerId, Addresses)>,
102 address: &NetworkAddress,
103 expected_entries: usize,
104) -> Result<Vec<(PeerId, Addresses)>> {
105 sort_peers_by_key(peers, &address.as_kbucket_key(), expected_entries)
106}
107
108pub fn sort_peers_by_key<T>(
111 peers: Vec<(PeerId, Addresses)>,
112 key: &KBucketKey<T>,
113 expected_entries: usize,
114) -> Result<Vec<(PeerId, Addresses)>> {
115 if CLOSE_GROUP_SIZE > peers.len() {
118 warn!("Not enough peers in the k-bucket to satisfy the request");
119 return Err(NetworkError::NotEnoughPeers {
120 found: peers.len(),
121 required: CLOSE_GROUP_SIZE,
122 });
123 }
124
125 let mut peer_distances: Vec<(PeerId, Addresses, KBucketDistance)> =
128 Vec::with_capacity(peers.len());
129
130 for (peer_id, addrs) in peers.into_iter() {
131 let addr = NetworkAddress::from_peer(peer_id);
132 let distance = key.distance(&addr.as_kbucket_key());
133 peer_distances.push((peer_id, addrs, distance));
134 }
135
136 peer_distances.sort_by(|a, b| a.2.cmp(&b.2));
138
139 let sorted_peers: Vec<(PeerId, Addresses)> = peer_distances
141 .into_iter()
142 .take(expected_entries)
143 .map(|(peer_id, addrs, _)| (peer_id, addrs))
144 .collect();
145
146 Ok(sorted_peers)
147}
148
149#[derive(Clone, Debug, Default)]
151pub struct Addresses(pub Vec<Multiaddr>);
152
153#[derive(Clone, Debug)]
154pub struct Network {
156 inner: Arc<NetworkInner>,
157}
158
159#[derive(Debug)]
162struct NetworkInner {
163 network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
164 local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
165 peer_id: PeerId,
166 keypair: Keypair,
167}
168
169impl Network {
170 pub fn new(
171 network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
172 local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
173 peer_id: PeerId,
174 keypair: Keypair,
175 ) -> Self {
176 Self {
177 inner: Arc::new(NetworkInner {
178 network_swarm_cmd_sender,
179 local_swarm_cmd_sender,
180 peer_id,
181 keypair,
182 }),
183 }
184 }
185
186 pub fn peer_id(&self) -> PeerId {
188 self.inner.peer_id
189 }
190
191 pub fn keypair(&self) -> &Keypair {
193 &self.inner.keypair
194 }
195
196 pub(crate) fn network_swarm_cmd_sender(&self) -> &mpsc::Sender<NetworkSwarmCmd> {
198 &self.inner.network_swarm_cmd_sender
199 }
200 pub(crate) fn local_swarm_cmd_sender(&self) -> &mpsc::Sender<LocalSwarmCmd> {
202 &self.inner.local_swarm_cmd_sender
203 }
204
205 pub fn sign(&self, msg: &[u8]) -> Result<Vec<u8>> {
207 self.keypair().sign(msg).map_err(NetworkError::from)
208 }
209
210 pub fn verify(&self, msg: &[u8], sig: &[u8]) -> bool {
212 self.keypair().public().verify(msg, sig)
213 }
214
215 pub fn get_pub_key(&self) -> Vec<u8> {
217 self.keypair().public().encode_protobuf()
218 }
219
220 pub async fn client_get_all_close_peers_in_range_or_close_group(
223 &self,
224 key: &NetworkAddress,
225 ) -> Result<Vec<(PeerId, Addresses)>> {
226 self.get_all_close_peers_in_range_or_close_group(key, true)
227 .await
228 }
229
230 pub async fn node_get_closest_peers(
234 &self,
235 key: &NetworkAddress,
236 ) -> Result<Vec<(PeerId, Addresses)>> {
237 self.get_all_close_peers_in_range_or_close_group(key, false)
238 .await
239 }
240
241 pub async fn get_local_peers_with_multiaddr(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>> {
244 let (sender, receiver) = oneshot::channel();
245 self.send_local_swarm_cmd(LocalSwarmCmd::GetPeersWithMultiaddr { sender });
246 receiver
247 .await
248 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
249 }
250
251 pub async fn get_kbuckets(&self) -> Result<BTreeMap<u32, Vec<PeerId>>> {
255 let (sender, receiver) = oneshot::channel();
256 self.send_local_swarm_cmd(LocalSwarmCmd::GetKBuckets { sender });
257 receiver
258 .await
259 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
260 }
261
262 pub async fn get_closest_k_value_local_peers(&self) -> Result<Vec<(PeerId, Addresses)>> {
265 let (sender, receiver) = oneshot::channel();
266 self.send_local_swarm_cmd(LocalSwarmCmd::GetClosestKLocalPeers { sender });
267
268 receiver
269 .await
270 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
271 }
272
273 pub async fn get_close_peers_to_the_target(
276 &self,
277 key: NetworkAddress,
278 num_of_peers: usize,
279 ) -> Result<Vec<(PeerId, Addresses)>> {
280 let (sender, receiver) = oneshot::channel();
281 self.send_local_swarm_cmd(LocalSwarmCmd::GetCloseLocalPeersToTarget {
282 key,
283 num_of_peers,
284 sender,
285 });
286
287 receiver
288 .await
289 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
290 }
291
292 pub async fn verify_chunk_existence(
295 &self,
296 chunk_address: NetworkAddress,
297 nonce: Nonce,
298 expected_proof: ChunkProof,
299 quorum: ResponseQuorum,
300 retry_strategy: RetryStrategy,
301 ) -> Result<()> {
302 let total_attempts = retry_strategy.attempts();
303
304 let pretty_key = PrettyPrintRecordKey::from(&chunk_address.to_record_key()).into_owned();
305 let expected_n_verified = quorum.get_value();
306
307 let mut close_nodes = Vec::new();
308 let mut retry_attempts = 0;
309 while retry_attempts < total_attempts {
310 if retry_attempts % 2 == 0 {
312 close_nodes = self
316 .client_get_all_close_peers_in_range_or_close_group(&chunk_address)
317 .await?;
318 }
319 retry_attempts += 1;
320 info!(
321 "Getting ChunkProof for {pretty_key:?}. Attempts: {retry_attempts:?}/{total_attempts:?}",
322 );
323
324 let request = Request::Query(Query::GetChunkExistenceProof {
325 key: chunk_address.clone(),
326 nonce,
327 difficulty: 1,
328 });
329 let responses = self
330 .send_and_get_responses(&close_nodes, &request, true)
331 .await;
332 let n_verified = responses
333 .into_iter()
334 .filter_map(|(peer, resp)| {
335 if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(proofs))) =
336 resp
337 {
338 if proofs.is_empty() {
339 warn!("Failed to verify the ChunkProof from {peer:?}. Returned proof is empty.");
340 None
341 } else if let Ok(ref proof) = proofs[0].1 {
342 if expected_proof.verify(proof) {
343 debug!("Got a valid ChunkProof from {peer:?}");
344 Some(())
345 } else {
346 warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?");
347 None
348 }
349 } else {
350 warn!("Failed to verify the ChunkProof from {peer:?}, returned with error {:?}", proofs[0].1);
351 None
352 }
353 } else {
354 debug!("Did not get a valid response for the ChunkProof from {peer:?}");
355 None
356 }
357 })
358 .count();
359 debug!("Got {n_verified} verified chunk existence proofs for chunk_address {chunk_address:?}");
360
361 if n_verified >= expected_n_verified {
362 return Ok(());
363 }
364 warn!("The obtained {n_verified} verified proofs did not match the expected {expected_n_verified} verified proofs");
365 let waiting_time = if retry_attempts == 1 {
367 MIN_WAIT_BEFORE_READING_A_PUT
368 } else {
369 MIN_WAIT_BEFORE_READING_A_PUT + MIN_WAIT_BEFORE_READING_A_PUT
370 };
371 sleep(waiting_time).await;
372 }
373
374 Err(NetworkError::FailedToVerifyChunkProof(
375 chunk_address.clone(),
376 ))
377 }
378
379 pub async fn get_store_quote_from_network(
385 &self,
386 record_address: NetworkAddress,
387 data_type: u32,
388 data_size: usize,
389 ignore_peers: Vec<PeerId>,
390 ) -> Result<Vec<(PeerId, PaymentQuote)>> {
391 let mut close_nodes = self
394 .client_get_all_close_peers_in_range_or_close_group(&record_address)
395 .await?;
396 close_nodes.retain(|(peer_id, _)| !ignore_peers.contains(peer_id));
398 info!(
399 "For record {record_address:?} quoting {} nodes. ignore_peers is {ignore_peers:?}",
400 close_nodes.len()
401 );
402
403 if close_nodes.is_empty() {
404 error!("Can't get store_cost of {record_address:?}, as all close_nodes are ignored");
405 return Err(NetworkError::NotEnoughPeersForStoreCostRequest);
406 }
407
408 let request = Request::Query(Query::GetStoreQuote {
410 key: record_address.clone(),
411 data_type,
412 data_size,
413 nonce: None,
414 difficulty: 0,
415 });
416 let responses = self
417 .send_and_get_responses(&close_nodes, &request, true)
418 .await;
419
420 let mut peer_already_have_it = 0;
422 let enough_peers_already_have_it = close_nodes.len() / 2;
423
424 let mut peers_returned_error = 0;
425
426 let mut all_quotes = vec![];
428 let mut quotes_to_pay = vec![];
429 for (peer, response) in responses {
430 info!("StoreCostReq for {record_address:?} received response: {response:?}");
431 match response {
432 Ok(Response::Query(QueryResponse::GetStoreQuote {
433 quote: Ok(quote),
434 peer_address,
435 storage_proofs,
436 })) => {
437 if !storage_proofs.is_empty() {
438 debug!("Storage proofing during GetStoreQuote to be implemented.");
439 }
440
441 if !quote.check_is_signed_by_claimed_peer(peer) {
443 warn!("Received invalid quote from {peer_address:?}, {quote:?}");
444 continue;
445 }
446
447 if quote.quoting_metrics.data_type != data_type {
449 warn!("Received invalid quote from {peer_address:?}, {quote:?}. Data type did not match the request.");
450 continue;
451 }
452
453 all_quotes.push((peer_address.clone(), quote.clone()));
454 quotes_to_pay.push((peer, quote));
455 }
456 Ok(Response::Query(QueryResponse::GetStoreQuote {
457 quote: Err(ProtocolError::RecordExists(_)),
458 peer_address,
459 storage_proofs,
460 })) => {
461 if !storage_proofs.is_empty() {
462 debug!("Storage proofing during GetStoreQuote to be implemented.");
463 }
464 peer_already_have_it += 1;
465 info!("Address {record_address:?} was already paid for according to {peer_address:?} ({peer_already_have_it}/{enough_peers_already_have_it})");
466 if peer_already_have_it >= enough_peers_already_have_it {
467 info!("Address {record_address:?} was already paid for according to {peer_already_have_it} peers, ending quote request");
468 return Ok(vec![]);
469 }
470 }
471 Err(err) => {
472 error!("Got an error while requesting quote from peer {peer:?}: {err:?}");
473 peers_returned_error += 1;
474 }
475 _ => {
476 error!("Got an unexpected response while requesting quote from peer {peer:?}: {response:?}");
477 peers_returned_error += 1;
478 }
479 }
480 }
481
482 if quotes_to_pay.is_empty() {
483 error!(
484 "Could not fetch any quotes. {} peers returned an error.",
485 peers_returned_error
486 );
487 return Err(NetworkError::NoStoreCostResponses);
488 }
489
490 Ok(quotes_to_pay)
491 }
492
493 pub async fn get_record_from_network(
500 &self,
501 key: RecordKey,
502 cfg: &GetRecordCfg,
503 ) -> Result<Record> {
504 let pretty_key = PrettyPrintRecordKey::from(&key);
505 let mut backoff = cfg.retry_strategy.backoff().into_iter();
506
507 loop {
508 info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",);
509 let (sender, receiver) = oneshot::channel();
510 self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord {
511 key: key.clone(),
512 sender,
513 cfg: cfg.clone(),
514 });
515 let result = match receiver.await {
516 Ok(result) => result,
517 Err(err) => {
518 error!(
519 "When fetching record {pretty_key:?}, encountered a channel error {err:?}"
520 );
521 return Err(NetworkError::InternalMsgChannelDropped);
523 }
524 };
525
526 let err = match result {
527 Ok(record) => {
528 info!("Record returned: {pretty_key:?}.");
529 return Ok(record);
530 }
531 Err(err) => err,
532 };
533
534 match &err {
536 GetRecordError::RecordDoesNotMatch(_) => {
537 warn!("The returned record does not match target {pretty_key:?}.");
538 }
539 GetRecordError::NotEnoughCopies { expected, got, .. } => {
540 warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}.");
541 }
542 GetRecordError::RecordNotFound => {
546 warn!("No holder of record '{pretty_key:?}' found.");
547 }
548 GetRecordError::RecordKindMismatch => {
550 error!("Record kind mismatch for {pretty_key:?}. This error should not happen here.");
551 }
552 GetRecordError::SplitRecord { result_map } => {
553 error!("Encountered a split record for {pretty_key:?}.");
554 if let Some(record) = Self::handle_split_record_error(result_map, &key)? {
555 info!("Merged the split record for {pretty_key:?}, into a single record");
556 return Ok(record);
557 }
558 }
559 GetRecordError::QueryTimeout => {
560 error!("Encountered query timeout for {pretty_key:?}.");
561 }
562 }
563
564 match backoff.next() {
565 Some(Some(duration)) => {
566 crate::time::sleep(duration).await;
567 debug!("Getting record from network of {pretty_key:?} via backoff...");
568 }
569 _ => break Err(err.into()),
570 }
571 }
572 }
573
574 fn handle_split_record_error(
576 result_map: &HashMap<XorName, (Record, HashSet<PeerId>)>,
577 key: &RecordKey,
578 ) -> std::result::Result<Option<Record>, NetworkError> {
579 let pretty_key = PrettyPrintRecordKey::from(key);
580
581 let results_count = result_map.len();
583 let mut accumulated_graphentries = HashSet::new();
584 let mut valid_scratchpad: Option<Scratchpad> = None;
585 let mut valid_pointer: Option<Pointer> = None;
586
587 if results_count > 1 {
588 let mut record_kind = None;
589 info!("For record {pretty_key:?}, we have more than one result returned.");
590 for (record, _) in result_map.values() {
591 let Ok(header) = RecordHeader::from_record(record) else {
592 continue;
593 };
594 let kind = record_kind.get_or_insert(header.kind);
595 if *kind != header.kind {
598 error!("Encountered a split record for {pretty_key:?} with different RecordHeaders. Expected {kind:?} but got {:?}. Skipping",header.kind);
599 continue;
600 }
601
602 match kind {
603 RecordKind::DataOnly(DataTypes::Chunk) | RecordKind::DataWithPayment(_) => {
604 error!("Encountered a split record for {pretty_key:?} with unexpected RecordKind {kind:?}, skipping.");
605 continue;
606 }
607 RecordKind::DataOnly(DataTypes::GraphEntry) => {
608 match get_graph_entry_from_record(record) {
609 Ok(graphentries) => {
610 accumulated_graphentries.extend(graphentries);
611 info!("For record {pretty_key:?}, we have a split record for a GraphEntry. Accumulating GraphEntry: {}", accumulated_graphentries.len());
612 }
613 Err(_) => {
614 warn!("Failed to deserialize GraphEntry for {pretty_key:?}, skipping accumulation");
615 continue;
616 }
617 }
618 }
619 RecordKind::DataOnly(DataTypes::Pointer) => {
620 info!("For record {pretty_key:?}, we have a split record for a pointer. Selecting the one with the highest count");
621 let Ok(pointer) = try_deserialize_record::<Pointer>(record) else {
622 error!(
623 "Failed to deserialize pointer {pretty_key}. Skipping accumulation"
624 );
625 continue;
626 };
627
628 if !pointer.verify_signature() {
629 warn!("Rejecting Pointer for {pretty_key} PUT with invalid signature");
630 continue;
631 }
632
633 if let Some(old) = &valid_pointer {
634 if old.counter() >= pointer.counter() {
635 info!("Rejecting Pointer for {pretty_key} with lower count than the previous one");
636 continue;
637 }
638 }
639 valid_pointer = Some(pointer);
640 }
641 RecordKind::DataOnly(DataTypes::Scratchpad) => {
642 info!("For record {pretty_key:?}, we have a split record for a scratchpad. Selecting the one with the highest count");
643 let Ok(scratchpad) = try_deserialize_record::<Scratchpad>(record) else {
644 error!(
645 "Failed to deserialize scratchpad {pretty_key}. Skipping accumulation"
646 );
647 continue;
648 };
649
650 if !scratchpad.verify_signature() {
651 warn!(
652 "Rejecting Scratchpad for {pretty_key} PUT with invalid signature"
653 );
654 continue;
655 }
656
657 if let Some(old) = &valid_scratchpad {
658 if old.counter() >= scratchpad.counter() {
659 info!("Rejecting Scratchpad for {pretty_key} with lower count than the previous one");
660 continue;
661 }
662 }
663 valid_scratchpad = Some(scratchpad);
664 }
665 }
666 }
667 }
668
669 if accumulated_graphentries.len() > 1 {
671 info!("For record {pretty_key:?} task found split record for a GraphEntry, accumulated and sending them as a single record");
672 let accumulated_graphentries = accumulated_graphentries
673 .into_iter()
674 .collect::<Vec<GraphEntry>>();
675 let record = Record {
676 key: key.clone(),
677 value: try_serialize_record(&accumulated_graphentries, RecordKind::DataOnly(DataTypes::GraphEntry))
678 .map_err(|err| {
679 error!(
680 "Error while serializing the accumulated GraphEntries for {pretty_key:?}: {err:?}"
681 );
682 NetworkError::from(err)
683 })?
684 .to_vec(),
685 publisher: None,
686 expires: None,
687 };
688 return Ok(Some(record));
689 } else if let Some(pointer) = valid_pointer {
690 info!("For record {pretty_key:?} task found a valid pointer, returning it.");
691 let record_value =
692 try_serialize_record(&pointer, RecordKind::DataOnly(DataTypes::Pointer))
693 .map_err(|err| {
694 error!("Error while serializing the pointer for {pretty_key:?}: {err:?}");
695 NetworkError::from(err)
696 })?
697 .to_vec();
698
699 let record = Record {
700 key: key.clone(),
701 value: record_value,
702 publisher: None,
703 expires: None,
704 };
705 return Ok(Some(record));
706 } else if let Some(scratchpad) = valid_scratchpad {
707 info!("For record {pretty_key:?} task found a valid scratchpad, returning it.");
708 let record_value =
709 try_serialize_record(&scratchpad, RecordKind::DataOnly(DataTypes::Scratchpad))
710 .map_err(|err| {
711 error!(
712 "Error while serializing the scratchpad for {pretty_key:?}: {err:?}"
713 );
714 NetworkError::from(err)
715 })?
716 .to_vec();
717
718 let record = Record {
719 key: key.clone(),
720 value: record_value,
721 publisher: None,
722 expires: None,
723 };
724 return Ok(Some(record));
725 }
726 Ok(None)
727 }
728
729 pub async fn get_local_quoting_metrics(
731 &self,
732 key: RecordKey,
733 data_type: u32,
734 data_size: usize,
735 ) -> Result<(QuotingMetrics, bool)> {
736 let (sender, receiver) = oneshot::channel();
737 self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalQuotingMetrics {
738 key,
739 data_type,
740 data_size,
741 sender,
742 });
743
744 let quoting_metrics = receiver
745 .await
746 .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
747 Ok(quoting_metrics)
748 }
749
750 pub fn notify_payment_received(&self) {
752 self.send_local_swarm_cmd(LocalSwarmCmd::PaymentReceived);
753 }
754
755 pub async fn get_local_record(&self, key: &RecordKey) -> Result<Option<Record>> {
757 let (sender, receiver) = oneshot::channel();
758 self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalRecord {
759 key: key.clone(),
760 sender,
761 });
762
763 receiver
764 .await
765 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
766 }
767
768 pub async fn is_peer_shunned(&self, target: NetworkAddress) -> Result<bool> {
770 let (sender, receiver) = oneshot::channel();
771 self.send_local_swarm_cmd(LocalSwarmCmd::IsPeerShunned { target, sender });
772
773 receiver
774 .await
775 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
776 }
777
778 pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
782 let pretty_key = PrettyPrintRecordKey::from(&record.key);
783 let mut backoff = cfg.retry_strategy.backoff().into_iter();
784
785 loop {
786 info!(
787 "Attempting to PUT record with key: {pretty_key:?} to network, with cfg {cfg:?}, retrying via backoff..."
788 );
789
790 let err = match self.put_record_once(record.clone(), cfg).await {
791 Ok(_) => break Ok(()),
792 Err(err) => err,
793 };
794
795 warn!("Failed to PUT record with key: {pretty_key:?} to network (retry via backoff) with error: {err:?}");
797
798 match backoff.next() {
799 Some(Some(duration)) => {
800 crate::time::sleep(duration).await;
801 }
802 _ => break Err(err),
803 }
804 }
805 }
806
807 async fn put_record_once(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
808 let record_key = record.key.clone();
809 let pretty_key = PrettyPrintRecordKey::from(&record_key);
810 info!(
811 "Putting record of {} - length {:?} to network",
812 pretty_key,
813 record.value.len()
814 );
815
816 let (sender, receiver) = oneshot::channel();
818 if let Some(put_record_to_peers) = &cfg.use_put_record_to {
819 self.send_network_swarm_cmd(NetworkSwarmCmd::PutRecordTo {
820 peers: put_record_to_peers.clone(),
821 record: record.clone(),
822 sender,
823 quorum: cfg.put_quorum,
824 });
825 } else {
826 self.send_network_swarm_cmd(NetworkSwarmCmd::PutRecord {
827 record: record.clone(),
828 sender,
829 quorum: cfg.put_quorum,
830 });
831 }
832
833 let response = receiver.await?;
834
835 if let Some((verification_kind, get_cfg)) = &cfg.verification {
836 let wait_duration = rand::thread_rng()
838 .gen_range(MIN_WAIT_BEFORE_READING_A_PUT..MAX_WAIT_BEFORE_READING_A_PUT);
839 sleep(wait_duration).await;
842 debug!("Attempting to verify {pretty_key:?} after we've slept for {wait_duration:?}");
843
844 if let VerificationKind::ChunkProof {
846 expected_proof,
847 nonce,
848 } = verification_kind
849 {
850 self.verify_chunk_existence(
851 NetworkAddress::from_record_key(&record_key),
852 *nonce,
853 expected_proof.clone(),
854 get_cfg.get_quorum,
855 get_cfg.retry_strategy,
856 )
857 .await?;
858 } else {
859 match self
860 .get_record_from_network(record.key.clone(), get_cfg)
861 .await
862 {
863 Ok(_) => {
864 debug!("Record {pretty_key:?} verified to be stored.");
865 }
866 Err(NetworkError::GetRecordError(GetRecordError::RecordNotFound)) => {
867 warn!("Record {pretty_key:?} not found after PUT, either rejected or not yet stored by nodes when we asked");
868 return Err(NetworkError::RecordNotStoredByNodes(
869 NetworkAddress::from_record_key(&record_key),
870 ));
871 }
872 Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { .. }))
873 if matches!(verification_kind, VerificationKind::Crdt) =>
874 {
875 warn!("Record {pretty_key:?} is split, which is okay since we're dealing with CRDTs");
876 }
877 Err(e) => {
878 debug!(
879 "Failed to verify record {pretty_key:?} to be stored with error: {e:?}"
880 );
881 return Err(e);
882 }
883 }
884 }
885 }
886 response
887 }
888
889 pub fn notify_fetch_completed(&self, key: RecordKey, record_type: ValidationType) {
892 self.send_local_swarm_cmd(LocalSwarmCmd::FetchCompleted((key, record_type)))
893 }
894
895 pub fn put_local_record(&self, record: Record, is_client_put: bool) {
898 debug!(
899 "Writing Record locally, for {:?} - length {:?}",
900 PrettyPrintRecordKey::from(&record.key),
901 record.value.len()
902 );
903 self.send_local_swarm_cmd(LocalSwarmCmd::PutLocalRecord {
904 record,
905 is_client_put,
906 })
907 }
908
909 pub async fn is_record_key_present_locally(&self, key: &RecordKey) -> Result<bool> {
911 let (sender, receiver) = oneshot::channel();
912 self.send_local_swarm_cmd(LocalSwarmCmd::RecordStoreHasKey {
913 key: key.clone(),
914 sender,
915 });
916
917 let is_present = receiver
918 .await
919 .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
920
921 Ok(is_present)
922 }
923
924 pub async fn get_all_local_record_addresses(
926 &self,
927 ) -> Result<HashMap<NetworkAddress, ValidationType>> {
928 let (sender, receiver) = oneshot::channel();
929 self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender });
930
931 let addrs = receiver
932 .await
933 .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
934 Ok(addrs)
935 }
936
937 pub async fn send_request(
944 &self,
945 req: Request,
946 peer: PeerId,
947 addrs: Addresses,
948 ) -> Result<Response> {
949 let (sender, receiver) = oneshot::channel();
950 let req_str = format!("{req:?}");
951 self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
953 req: req.clone(),
954 peer,
955 addrs: None,
956 sender: Some(sender),
957 });
958 let mut r = receiver.await?;
959
960 if let Err(error) = &r {
961 error!("Error in response: {:?}", error);
962
963 match error {
964 NetworkError::OutboundError(OutboundFailure::Io(_))
965 | NetworkError::OutboundError(OutboundFailure::ConnectionClosed)
966 | NetworkError::OutboundError(OutboundFailure::DialFailure) => {
967 warn!(
968 "Outbound failed for {req_str} .. {error:?}, dialing it then re-attempt."
969 );
970
971 self.send_network_swarm_cmd(NetworkSwarmCmd::DialPeer {
972 peer,
973 addrs: addrs.clone(),
974 });
975
976 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
978
979 let (sender, receiver) = oneshot::channel();
980 debug!("Reattempting to send_request {req_str} to {peer:?} by dialing the addrs manually.");
981 self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
982 req,
983 peer,
984 addrs: Some(addrs),
985 sender: Some(sender),
986 });
987
988 r = receiver.await?;
989 if let Err(error) = &r {
990 error!("Reattempt of {req_str} led to an error again (even after dialing). {error:?}");
991 }
992 }
993 _ => {
994 warn!("Error in response for {req_str}: {error:?}",);
996 }
997 }
998 }
999
1000 r
1001 }
1002
1003 pub fn send_response(&self, resp: Response, channel: MsgResponder) {
1005 self.send_network_swarm_cmd(NetworkSwarmCmd::SendResponse { resp, channel })
1006 }
1007
1008 pub async fn get_swarm_local_state(&self) -> Result<SwarmLocalState> {
1010 let (sender, receiver) = oneshot::channel();
1011 self.send_local_swarm_cmd(LocalSwarmCmd::GetSwarmLocalState(sender));
1012 let state = receiver.await?;
1013 Ok(state)
1014 }
1015
1016 pub fn trigger_interval_replication(&self) {
1017 self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIntervalReplication)
1018 }
1019
1020 pub fn add_fresh_records_to_the_replication_fetcher(
1021 &self,
1022 holder: NetworkAddress,
1023 keys: Vec<(NetworkAddress, ValidationType)>,
1024 ) {
1025 self.send_local_swarm_cmd(LocalSwarmCmd::AddFreshReplicateRecords { holder, keys })
1026 }
1027
1028 pub fn record_node_issues(&self, peer_id: PeerId, issue: NodeIssue) {
1029 self.send_local_swarm_cmd(LocalSwarmCmd::RecordNodeIssue { peer_id, issue });
1030 }
1031
1032 pub fn historical_verify_quotes(&self, quotes: Vec<(PeerId, PaymentQuote)>) {
1033 self.send_local_swarm_cmd(LocalSwarmCmd::QuoteVerification { quotes });
1034 }
1035
1036 pub fn trigger_irrelevant_record_cleanup(&self) {
1037 self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIrrelevantRecordCleanup)
1038 }
1039
1040 pub fn add_network_density_sample(&self, distance: KBucketDistance) {
1041 self.send_local_swarm_cmd(LocalSwarmCmd::AddNetworkDensitySample { distance })
1042 }
1043
1044 pub fn notify_peer_scores(&self, peer_scores: Vec<(PeerId, bool)>) {
1045 self.send_local_swarm_cmd(LocalSwarmCmd::NotifyPeerScores { peer_scores })
1046 }
1047
1048 pub fn notify_node_version(&self, peer: PeerId, version: String) {
1049 self.send_local_swarm_cmd(LocalSwarmCmd::NotifyPeerVersion { peer, version })
1050 }
1051
1052 fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) {
1054 send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd);
1055 }
1056
1057 fn send_local_swarm_cmd(&self, cmd: LocalSwarmCmd) {
1059 send_local_swarm_cmd(self.local_swarm_cmd_sender().clone(), cmd);
1060 }
1061
1062 pub async fn get_all_close_peers_in_range_or_close_group(
1067 &self,
1068 key: &NetworkAddress,
1069 client: bool,
1070 ) -> Result<Vec<(PeerId, Addresses)>> {
1071 let pretty_key = PrettyPrintKBucketKey(key.as_kbucket_key());
1072 debug!("Getting the all closest peers in range of {pretty_key:?}");
1073 let (sender, receiver) = oneshot::channel();
1074 self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork {
1075 key: key.clone(),
1076 sender,
1077 });
1078
1079 let found_peers = receiver.await?;
1080
1081 let result_len = found_peers.len();
1083 let mut closest_peers = found_peers;
1084
1085 if client {
1087 closest_peers.retain(|&(x, _)| x != self.peer_id());
1089 if result_len != closest_peers.len() {
1090 info!("Remove self client from the closest_peers");
1091 }
1092 }
1093
1094 if tracing::level_enabled!(tracing::Level::DEBUG) {
1095 let close_peers_pretty_print: Vec<_> = closest_peers
1096 .iter()
1097 .map(|(peer_id, _)| {
1098 format!(
1099 "{peer_id:?}({:?})",
1100 PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key())
1101 )
1102 })
1103 .collect();
1104
1105 debug!(
1106 "Network knowledge of closest peers to {pretty_key:?} are: {close_peers_pretty_print:?}"
1107 );
1108 }
1109
1110 let expanded_close_group = CLOSE_GROUP_SIZE + CLOSE_GROUP_SIZE / 2;
1111 let closest_peers = sort_peers_by_address(closest_peers, key, expanded_close_group)?;
1112 Ok(closest_peers)
1113 }
1114
1115 pub async fn send_and_get_responses(
1120 &self,
1121 peers: &[(PeerId, Addresses)],
1122 req: &Request,
1123 get_all_responses: bool,
1124 ) -> BTreeMap<PeerId, Result<Response>> {
1125 debug!("send_and_get_responses for {req:?}");
1126 let mut list_of_futures = peers
1127 .iter()
1128 .map(|(peer, addrs)| {
1129 Box::pin(async {
1130 let resp = self.send_request(req.clone(), *peer, addrs.clone()).await;
1131 (*peer, resp)
1132 })
1133 })
1134 .collect::<Vec<_>>();
1135
1136 let mut responses = BTreeMap::new();
1137 while !list_of_futures.is_empty() {
1138 let ((peer, resp), _, remaining_futures) = select_all(list_of_futures).await;
1139 let resp_string = match &resp {
1140 Ok(resp) => format!("{resp}"),
1141 Err(err) => format!("{err:?}"),
1142 };
1143 debug!("Got response from {peer:?} for the req: {req:?}, resp: {resp_string}");
1144 if !get_all_responses && resp.is_ok() {
1145 return BTreeMap::from([(peer, resp)]);
1146 }
1147 responses.insert(peer, resp);
1148 list_of_futures = remaining_futures;
1149 }
1150
1151 debug!("Received all responses for {req:?}");
1152 responses
1153 }
1154}
1155
1156pub fn multiaddr_is_global(multiaddr: &Multiaddr) -> bool {
1159 !multiaddr.iter().any(|addr| match addr {
1160 Protocol::Ip4(ip) => {
1161 ip.is_unspecified()
1164 | ip.is_private()
1165 | ip.is_loopback()
1166 | ip.is_link_local()
1167 | ip.is_documentation()
1168 | ip.is_broadcast()
1169 }
1170 _ => false,
1171 })
1172}
1173
1174pub(crate) fn multiaddr_pop_p2p(multiaddr: &mut Multiaddr) -> Option<PeerId> {
1176 if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
1177 let _ = multiaddr.pop();
1179 Some(peer_id)
1180 } else {
1181 None
1182 }
1183}
1184
1185pub(crate) fn multiaddr_get_p2p(multiaddr: &Multiaddr) -> Option<PeerId> {
1187 if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
1188 Some(peer_id)
1189 } else {
1190 None
1191 }
1192}
1193
1194pub(crate) fn multiaddr_strip_p2p(multiaddr: &Multiaddr) -> Multiaddr {
1197 let is_relayed = multiaddr.iter().any(|p| matches!(p, Protocol::P2pCircuit));
1198
1199 if is_relayed {
1200 let mut before_relay_protocol = true;
1203 let mut new_multi_addr = Multiaddr::empty();
1204 for p in multiaddr.iter() {
1205 if matches!(p, Protocol::P2pCircuit) {
1206 before_relay_protocol = false;
1207 }
1208 if matches!(p, Protocol::P2p(_)) && !before_relay_protocol {
1209 continue;
1210 }
1211 new_multi_addr.push(p);
1212 }
1213 new_multi_addr
1214 } else {
1215 multiaddr
1216 .iter()
1217 .filter(|p| !matches!(p, Protocol::P2p(_)))
1218 .collect()
1219 }
1220}
1221
1222pub(crate) fn multiaddr_get_ip(addr: &Multiaddr) -> Option<IpAddr> {
1224 addr.iter().find_map(|p| match p {
1225 Protocol::Ip4(addr) => Some(IpAddr::V4(addr)),
1226 Protocol::Ip6(addr) => Some(IpAddr::V6(addr)),
1227 _ => None,
1228 })
1229}
1230
1231pub(crate) fn multiaddr_get_port(addr: &Multiaddr) -> Option<u16> {
1232 addr.iter().find_map(|p| match p {
1233 Protocol::Udp(port) => Some(port),
1234 _ => None,
1235 })
1236}
1237
1238pub(crate) fn send_local_swarm_cmd(swarm_cmd_sender: Sender<LocalSwarmCmd>, cmd: LocalSwarmCmd) {
1239 let capacity = swarm_cmd_sender.capacity();
1240
1241 if capacity == 0 {
1242 error!(
1243 "SwarmCmd channel is full. Await capacity to send: {:?}",
1244 cmd
1245 );
1246 }
1247
1248 let _handle = spawn(async move {
1250 if let Err(error) = swarm_cmd_sender.send(cmd).await {
1251 error!("Failed to send SwarmCmd: {}", error);
1252 }
1253 });
1254}
1255
1256pub(crate) fn send_network_swarm_cmd(
1257 swarm_cmd_sender: Sender<NetworkSwarmCmd>,
1258 cmd: NetworkSwarmCmd,
1259) {
1260 let capacity = swarm_cmd_sender.capacity();
1261
1262 if capacity == 0 {
1263 error!(
1264 "SwarmCmd channel is full. Await capacity to send: {:?}",
1265 cmd
1266 );
1267 }
1268
1269 let _handle = spawn(async move {
1271 if let Err(error) = swarm_cmd_sender.send(cmd).await {
1272 error!("Failed to send SwarmCmd: {}", error);
1273 }
1274 });
1275}
1276
1277#[cfg(test)]
1278mod tests {
1279 use super::*;
1280
1281 #[tokio::test]
1282 async fn test_network_sign_verify() -> eyre::Result<()> {
1283 let (network, _, _) =
1284 NetworkBuilder::new(Keypair::generate_ed25519(), false, vec![]).build_client();
1285 let msg = b"test message";
1286 let sig = network.sign(msg)?;
1287 assert!(network.verify(msg, &sig));
1288 Ok(())
1289 }
1290}