1#[macro_use]
10extern crate tracing;
11
12mod bootstrap;
13mod circular_vec;
14mod cmd;
15mod config;
16mod driver;
17mod error;
18mod event;
19mod external_address;
20mod fifo_register;
21mod graph;
22mod log_markers;
23#[cfg(feature = "open-metrics")]
24mod metrics;
25mod network_discovery;
26mod record_store;
27mod record_store_api;
28mod relay_manager;
29mod replication_fetcher;
30pub mod time;
31mod transport;
32
33use cmd::LocalSwarmCmd;
34use xor_name::XorName;
35
36pub use self::{
38 cmd::{NodeIssue, SwarmLocalState},
39 config::{GetRecordCfg, PutRecordCfg, ResponseQuorum, RetryStrategy, VerificationKind},
40 driver::{NetworkBuilder, SwarmDriver, MAX_PACKET_SIZE},
41 error::{GetRecordError, NetworkError},
42 event::{MsgResponder, NetworkEvent},
43 graph::get_graph_entry_from_record,
44 record_store::NodeRecordStore,
45};
46#[cfg(feature = "open-metrics")]
47pub use metrics::service::MetricsRegistries;
48pub use time::{interval, sleep, spawn, Instant, Interval};
49
50use self::{cmd::NetworkSwarmCmd, error::Result};
51use ant_evm::{PaymentQuote, QuotingMetrics};
52use ant_protocol::{
53 error::Error as ProtocolError,
54 messages::{ChunkProof, Nonce, Query, QueryResponse, Request, Response},
55 storage::{DataTypes, Pointer, Scratchpad, ValidationType},
56 NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
57};
58use futures::future::select_all;
59use libp2p::{
60 identity::Keypair,
61 kad::{KBucketDistance, KBucketKey, Record, RecordKey},
62 multiaddr::Protocol,
63 request_response::OutboundFailure,
64 Multiaddr, PeerId,
65};
66use rand::Rng;
67use std::{
68 collections::{BTreeMap, HashMap},
69 net::IpAddr,
70 sync::Arc,
71};
72use tokio::sync::{
73 mpsc::{self, Sender},
74 oneshot,
75};
76use tokio::time::Duration;
77use {
78 ant_protocol::storage::GraphEntry,
79 ant_protocol::storage::{
80 try_deserialize_record, try_serialize_record, RecordHeader, RecordKind,
81 },
82 std::collections::HashSet,
83};
84
85#[inline]
87pub const fn close_group_majority() -> usize {
88 CLOSE_GROUP_SIZE / 2 + 1
91}
92
93const MAX_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(750);
95const MIN_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(300);
97
98pub fn sort_peers_by_address<'a>(
101 peers: &'a Vec<PeerId>,
102 address: &NetworkAddress,
103 expected_entries: usize,
104) -> Result<Vec<&'a PeerId>> {
105 sort_peers_by_key(peers, &address.as_kbucket_key(), expected_entries)
106}
107
108pub fn sort_peers_by_key<'a, T>(
111 peers: &'a Vec<PeerId>,
112 key: &KBucketKey<T>,
113 expected_entries: usize,
114) -> Result<Vec<&'a PeerId>> {
115 if CLOSE_GROUP_SIZE > peers.len() {
118 warn!("Not enough peers in the k-bucket to satisfy the request");
119 return Err(NetworkError::NotEnoughPeers {
120 found: peers.len(),
121 required: CLOSE_GROUP_SIZE,
122 });
123 }
124
125 let mut peer_distances: Vec<(&PeerId, KBucketDistance)> = Vec::with_capacity(peers.len());
128
129 for peer_id in peers {
130 let addr = NetworkAddress::from_peer(*peer_id);
131 let distance = key.distance(&addr.as_kbucket_key());
132 peer_distances.push((peer_id, distance));
133 }
134
135 peer_distances.sort_by(|a, b| a.1.cmp(&b.1));
137
138 let sorted_peers: Vec<_> = peer_distances
140 .into_iter()
141 .take(expected_entries)
142 .map(|(peer_id, _)| peer_id)
143 .collect();
144
145 Ok(sorted_peers)
146}
147
148#[derive(Clone, Debug)]
149pub struct Network {
151 inner: Arc<NetworkInner>,
152}
153
154#[derive(Debug)]
157struct NetworkInner {
158 network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
159 local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
160 peer_id: PeerId,
161 keypair: Keypair,
162}
163
164impl Network {
165 pub fn new(
166 network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
167 local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
168 peer_id: PeerId,
169 keypair: Keypair,
170 ) -> Self {
171 Self {
172 inner: Arc::new(NetworkInner {
173 network_swarm_cmd_sender,
174 local_swarm_cmd_sender,
175 peer_id,
176 keypair,
177 }),
178 }
179 }
180
181 pub fn peer_id(&self) -> PeerId {
183 self.inner.peer_id
184 }
185
186 pub fn keypair(&self) -> &Keypair {
188 &self.inner.keypair
189 }
190
191 pub(crate) fn network_swarm_cmd_sender(&self) -> &mpsc::Sender<NetworkSwarmCmd> {
193 &self.inner.network_swarm_cmd_sender
194 }
195 pub(crate) fn local_swarm_cmd_sender(&self) -> &mpsc::Sender<LocalSwarmCmd> {
197 &self.inner.local_swarm_cmd_sender
198 }
199
200 pub fn sign(&self, msg: &[u8]) -> Result<Vec<u8>> {
202 self.keypair().sign(msg).map_err(NetworkError::from)
203 }
204
205 pub fn verify(&self, msg: &[u8], sig: &[u8]) -> bool {
207 self.keypair().public().verify(msg, sig)
208 }
209
210 pub fn get_pub_key(&self) -> Vec<u8> {
212 self.keypair().public().encode_protobuf()
213 }
214
215 pub async fn client_get_all_close_peers_in_range_or_close_group(
218 &self,
219 key: &NetworkAddress,
220 ) -> Result<Vec<PeerId>> {
221 self.get_all_close_peers_in_range_or_close_group(key, true)
222 .await
223 }
224
225 pub async fn node_get_closest_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
229 self.get_all_close_peers_in_range_or_close_group(key, false)
230 .await
231 }
232
233 pub async fn get_local_peers_with_multiaddr(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>> {
236 let (sender, receiver) = oneshot::channel();
237 self.send_local_swarm_cmd(LocalSwarmCmd::GetPeersWithMultiaddr { sender });
238 receiver
239 .await
240 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
241 }
242
243 pub async fn get_kbuckets(&self) -> Result<BTreeMap<u32, Vec<PeerId>>> {
247 let (sender, receiver) = oneshot::channel();
248 self.send_local_swarm_cmd(LocalSwarmCmd::GetKBuckets { sender });
249 receiver
250 .await
251 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
252 }
253
254 pub async fn get_closest_k_value_local_peers(&self) -> Result<Vec<PeerId>> {
257 let (sender, receiver) = oneshot::channel();
258 self.send_local_swarm_cmd(LocalSwarmCmd::GetClosestKLocalPeers { sender });
259
260 receiver
261 .await
262 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
263 }
264
265 pub async fn get_replicate_candidates(&self, data_addr: NetworkAddress) -> Result<Vec<PeerId>> {
267 let (sender, receiver) = oneshot::channel();
268 self.send_local_swarm_cmd(LocalSwarmCmd::GetReplicateCandidates { data_addr, sender });
269
270 let candidate = receiver
271 .await
272 .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
273
274 Ok(candidate)
275 }
276
277 pub async fn verify_chunk_existence(
280 &self,
281 chunk_address: NetworkAddress,
282 nonce: Nonce,
283 expected_proof: ChunkProof,
284 quorum: ResponseQuorum,
285 retry_strategy: RetryStrategy,
286 ) -> Result<()> {
287 let total_attempts = retry_strategy.attempts();
288
289 let pretty_key = PrettyPrintRecordKey::from(&chunk_address.to_record_key()).into_owned();
290 let expected_n_verified = quorum.get_value();
291
292 let mut close_nodes = Vec::new();
293 let mut retry_attempts = 0;
294 while retry_attempts < total_attempts {
295 if retry_attempts % 2 == 0 {
297 close_nodes = self
301 .client_get_all_close_peers_in_range_or_close_group(&chunk_address)
302 .await?;
303 }
304 retry_attempts += 1;
305 info!(
306 "Getting ChunkProof for {pretty_key:?}. Attempts: {retry_attempts:?}/{total_attempts:?}",
307 );
308
309 let request = Request::Query(Query::GetChunkExistenceProof {
310 key: chunk_address.clone(),
311 nonce,
312 difficulty: 1,
313 });
314 let responses = self
315 .send_and_get_responses(&close_nodes, &request, true)
316 .await;
317 let n_verified = responses
318 .into_iter()
319 .filter_map(|(peer, resp)| {
320 if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(proofs))) =
321 resp
322 {
323 if proofs.is_empty() {
324 warn!("Failed to verify the ChunkProof from {peer:?}. Returned proof is empty.");
325 None
326 } else if let Ok(ref proof) = proofs[0].1 {
327 if expected_proof.verify(proof) {
328 debug!("Got a valid ChunkProof from {peer:?}");
329 Some(())
330 } else {
331 warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?");
332 None
333 }
334 } else {
335 warn!("Failed to verify the ChunkProof from {peer:?}, returned with error {:?}", proofs[0].1);
336 None
337 }
338 } else {
339 debug!("Did not get a valid response for the ChunkProof from {peer:?}");
340 None
341 }
342 })
343 .count();
344 debug!("Got {n_verified} verified chunk existence proofs for chunk_address {chunk_address:?}");
345
346 if n_verified >= expected_n_verified {
347 return Ok(());
348 }
349 warn!("The obtained {n_verified} verified proofs did not match the expected {expected_n_verified} verified proofs");
350 let waiting_time = if retry_attempts == 1 {
352 MIN_WAIT_BEFORE_READING_A_PUT
353 } else {
354 MIN_WAIT_BEFORE_READING_A_PUT + MIN_WAIT_BEFORE_READING_A_PUT
355 };
356 sleep(waiting_time).await;
357 }
358
359 Err(NetworkError::FailedToVerifyChunkProof(
360 chunk_address.clone(),
361 ))
362 }
363
364 pub async fn get_store_quote_from_network(
370 &self,
371 record_address: NetworkAddress,
372 data_type: u32,
373 data_size: usize,
374 ignore_peers: Vec<PeerId>,
375 ) -> Result<Vec<(PeerId, PaymentQuote)>> {
376 let mut close_nodes = self
379 .client_get_all_close_peers_in_range_or_close_group(&record_address)
380 .await?;
381 close_nodes.retain(|peer_id| !ignore_peers.contains(peer_id));
383 info!(
384 "For record {record_address:?} quoting {} nodes. ignore_peers is {ignore_peers:?}",
385 close_nodes.len()
386 );
387
388 if close_nodes.is_empty() {
389 error!("Can't get store_cost of {record_address:?}, as all close_nodes are ignored");
390 return Err(NetworkError::NotEnoughPeersForStoreCostRequest);
391 }
392
393 let request = Request::Query(Query::GetStoreQuote {
395 key: record_address.clone(),
396 data_type,
397 data_size,
398 nonce: None,
399 difficulty: 0,
400 });
401 let responses = self
402 .send_and_get_responses(&close_nodes, &request, true)
403 .await;
404
405 let mut peer_already_have_it = 0;
407 let enough_peers_already_have_it = close_nodes.len() / 2;
408
409 let mut peers_returned_error = 0;
410
411 let mut all_quotes = vec![];
413 let mut quotes_to_pay = vec![];
414 for (peer, response) in responses {
415 info!("StoreCostReq for {record_address:?} received response: {response:?}");
416 match response {
417 Ok(Response::Query(QueryResponse::GetStoreQuote {
418 quote: Ok(quote),
419 peer_address,
420 storage_proofs,
421 })) => {
422 if !storage_proofs.is_empty() {
423 debug!("Storage proofing during GetStoreQuote to be implemented.");
424 }
425
426 if !quote.check_is_signed_by_claimed_peer(peer) {
428 warn!("Received invalid quote from {peer_address:?}, {quote:?}");
429 continue;
430 }
431
432 if quote.quoting_metrics.data_type != data_type {
434 warn!("Received invalid quote from {peer_address:?}, {quote:?}. Data type did not match the request.");
435 continue;
436 }
437
438 all_quotes.push((peer_address.clone(), quote.clone()));
439 quotes_to_pay.push((peer, quote));
440 }
441 Ok(Response::Query(QueryResponse::GetStoreQuote {
442 quote: Err(ProtocolError::RecordExists(_)),
443 peer_address,
444 storage_proofs,
445 })) => {
446 if !storage_proofs.is_empty() {
447 debug!("Storage proofing during GetStoreQuote to be implemented.");
448 }
449 peer_already_have_it += 1;
450 info!("Address {record_address:?} was already paid for according to {peer_address:?} ({peer_already_have_it}/{enough_peers_already_have_it})");
451 if peer_already_have_it >= enough_peers_already_have_it {
452 info!("Address {record_address:?} was already paid for according to {peer_already_have_it} peers, ending quote request");
453 return Ok(vec![]);
454 }
455 }
456 Err(err) => {
457 error!("Got an error while requesting quote from peer {peer:?}: {err}");
458 peers_returned_error += 1;
459 }
460 _ => {
461 error!("Got an unexpected response while requesting quote from peer {peer:?}: {response:?}");
462 peers_returned_error += 1;
463 }
464 }
465 }
466
467 if quotes_to_pay.is_empty() {
468 error!(
469 "Could not fetch any quotes. {} peers returned an error.",
470 peers_returned_error
471 );
472 return Err(NetworkError::NoStoreCostResponses);
473 }
474
475 Ok(quotes_to_pay)
476 }
477
478 pub async fn get_record_from_network(
485 &self,
486 key: RecordKey,
487 cfg: &GetRecordCfg,
488 ) -> Result<Record> {
489 let pretty_key = PrettyPrintRecordKey::from(&key);
490 let mut backoff = cfg.retry_strategy.backoff().into_iter();
491
492 loop {
493 info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",);
494 let (sender, receiver) = oneshot::channel();
495 self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord {
496 key: key.clone(),
497 sender,
498 cfg: cfg.clone(),
499 });
500 let result = match receiver.await {
501 Ok(result) => result,
502 Err(err) => {
503 error!(
504 "When fetching record {pretty_key:?}, encountered a channel error {err:?}"
505 );
506 return Err(NetworkError::InternalMsgChannelDropped);
508 }
509 };
510
511 let err = match result {
512 Ok(record) => {
513 info!("Record returned: {pretty_key:?}.");
514 return Ok(record);
515 }
516 Err(err) => err,
517 };
518
519 match &err {
521 GetRecordError::RecordDoesNotMatch(_) => {
522 warn!("The returned record does not match target {pretty_key:?}.");
523 }
524 GetRecordError::NotEnoughCopies { expected, got, .. } => {
525 warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}.");
526 }
527 GetRecordError::RecordNotFound => {
531 warn!("No holder of record '{pretty_key:?}' found.");
532 }
533 GetRecordError::RecordKindMismatch => {
535 error!("Record kind mismatch for {pretty_key:?}. This error should not happen here.");
536 }
537 GetRecordError::SplitRecord { result_map } => {
538 error!("Encountered a split record for {pretty_key:?}.");
539 if let Some(record) = Self::handle_split_record_error(result_map, &key)? {
540 info!("Merged the split record for {pretty_key:?}, into a single record");
541 return Ok(record);
542 }
543 }
544 GetRecordError::QueryTimeout => {
545 error!("Encountered query timeout for {pretty_key:?}.");
546 }
547 }
548
549 match backoff.next() {
550 Some(Some(duration)) => {
551 crate::time::sleep(duration).await;
552 debug!("Getting record from network of {pretty_key:?} via backoff...");
553 }
554 _ => break Err(err.into()),
555 }
556 }
557 }
558
559 fn handle_split_record_error(
561 result_map: &HashMap<XorName, (Record, HashSet<PeerId>)>,
562 key: &RecordKey,
563 ) -> std::result::Result<Option<Record>, NetworkError> {
564 let pretty_key = PrettyPrintRecordKey::from(key);
565
566 let results_count = result_map.len();
568 let mut accumulated_graphentries = HashSet::new();
569 let mut valid_scratchpad: Option<Scratchpad> = None;
570 let mut valid_pointer: Option<Pointer> = None;
571
572 if results_count > 1 {
573 let mut record_kind = None;
574 info!("For record {pretty_key:?}, we have more than one result returned.");
575 for (record, _) in result_map.values() {
576 let Ok(header) = RecordHeader::from_record(record) else {
577 continue;
578 };
579 let kind = record_kind.get_or_insert(header.kind);
580 if *kind != header.kind {
583 error!("Encountered a split record for {pretty_key:?} with different RecordHeaders. Expected {kind:?} but got {:?}. Skipping",header.kind);
584 continue;
585 }
586
587 match kind {
588 RecordKind::DataOnly(DataTypes::Chunk) | RecordKind::DataWithPayment(_) => {
589 error!("Encountered a split record for {pretty_key:?} with unexpected RecordKind {kind:?}, skipping.");
590 continue;
591 }
592 RecordKind::DataOnly(DataTypes::GraphEntry) => {
593 match get_graph_entry_from_record(record) {
594 Ok(graphentries) => {
595 accumulated_graphentries.extend(graphentries);
596 info!("For record {pretty_key:?}, we have a split record for a GraphEntry. Accumulating GraphEntry: {}", accumulated_graphentries.len());
597 }
598 Err(_) => {
599 warn!("Failed to deserialize GraphEntry for {pretty_key:?}, skipping accumulation");
600 continue;
601 }
602 }
603 }
604 RecordKind::DataOnly(DataTypes::Pointer) => {
605 info!("For record {pretty_key:?}, we have a split record for a pointer. Selecting the one with the highest count");
606 let Ok(pointer) = try_deserialize_record::<Pointer>(record) else {
607 error!(
608 "Failed to deserialize pointer {pretty_key}. Skipping accumulation"
609 );
610 continue;
611 };
612
613 if !pointer.verify_signature() {
614 warn!("Rejecting Pointer for {pretty_key} PUT with invalid signature");
615 continue;
616 }
617
618 if let Some(old) = &valid_pointer {
619 if old.counter() >= pointer.counter() {
620 info!("Rejecting Pointer for {pretty_key} with lower count than the previous one");
621 continue;
622 }
623 }
624 valid_pointer = Some(pointer);
625 }
626 RecordKind::DataOnly(DataTypes::Scratchpad) => {
627 info!("For record {pretty_key:?}, we have a split record for a scratchpad. Selecting the one with the highest count");
628 let Ok(scratchpad) = try_deserialize_record::<Scratchpad>(record) else {
629 error!(
630 "Failed to deserialize scratchpad {pretty_key}. Skipping accumulation"
631 );
632 continue;
633 };
634
635 if !scratchpad.verify_signature() {
636 warn!(
637 "Rejecting Scratchpad for {pretty_key} PUT with invalid signature"
638 );
639 continue;
640 }
641
642 if let Some(old) = &valid_scratchpad {
643 if old.counter() >= scratchpad.counter() {
644 info!("Rejecting Scratchpad for {pretty_key} with lower count than the previous one");
645 continue;
646 }
647 }
648 valid_scratchpad = Some(scratchpad);
649 }
650 }
651 }
652 }
653
654 if accumulated_graphentries.len() > 1 {
656 info!("For record {pretty_key:?} task found split record for a GraphEntry, accumulated and sending them as a single record");
657 let accumulated_graphentries = accumulated_graphentries
658 .into_iter()
659 .collect::<Vec<GraphEntry>>();
660 let record = Record {
661 key: key.clone(),
662 value: try_serialize_record(&accumulated_graphentries, RecordKind::DataOnly(DataTypes::GraphEntry))
663 .map_err(|err| {
664 error!(
665 "Error while serializing the accumulated GraphEntries for {pretty_key:?}: {err:?}"
666 );
667 NetworkError::from(err)
668 })?
669 .to_vec(),
670 publisher: None,
671 expires: None,
672 };
673 return Ok(Some(record));
674 } else if let Some(pointer) = valid_pointer {
675 info!("For record {pretty_key:?} task found a valid pointer, returning it.");
676 let record_value =
677 try_serialize_record(&pointer, RecordKind::DataOnly(DataTypes::Pointer))
678 .map_err(|err| {
679 error!("Error while serializing the pointer for {pretty_key:?}: {err:?}");
680 NetworkError::from(err)
681 })?
682 .to_vec();
683
684 let record = Record {
685 key: key.clone(),
686 value: record_value,
687 publisher: None,
688 expires: None,
689 };
690 return Ok(Some(record));
691 } else if let Some(scratchpad) = valid_scratchpad {
692 info!("For record {pretty_key:?} task found a valid scratchpad, returning it.");
693 let record_value =
694 try_serialize_record(&scratchpad, RecordKind::DataOnly(DataTypes::Scratchpad))
695 .map_err(|err| {
696 error!(
697 "Error while serializing the scratchpad for {pretty_key:?}: {err:?}"
698 );
699 NetworkError::from(err)
700 })?
701 .to_vec();
702
703 let record = Record {
704 key: key.clone(),
705 value: record_value,
706 publisher: None,
707 expires: None,
708 };
709 return Ok(Some(record));
710 }
711 Ok(None)
712 }
713
714 pub async fn get_local_quoting_metrics(
716 &self,
717 key: RecordKey,
718 data_type: u32,
719 data_size: usize,
720 ) -> Result<(QuotingMetrics, bool)> {
721 let (sender, receiver) = oneshot::channel();
722 self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalQuotingMetrics {
723 key,
724 data_type,
725 data_size,
726 sender,
727 });
728
729 let quoting_metrics = receiver
730 .await
731 .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
732 Ok(quoting_metrics)
733 }
734
735 pub fn notify_payment_received(&self) {
737 self.send_local_swarm_cmd(LocalSwarmCmd::PaymentReceived);
738 }
739
740 pub async fn get_local_record(&self, key: &RecordKey) -> Result<Option<Record>> {
742 let (sender, receiver) = oneshot::channel();
743 self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalRecord {
744 key: key.clone(),
745 sender,
746 });
747
748 receiver
749 .await
750 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
751 }
752
753 pub async fn is_peer_shunned(&self, target: NetworkAddress) -> Result<bool> {
755 let (sender, receiver) = oneshot::channel();
756 self.send_local_swarm_cmd(LocalSwarmCmd::IsPeerShunned { target, sender });
757
758 receiver
759 .await
760 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
761 }
762
763 pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
767 let pretty_key = PrettyPrintRecordKey::from(&record.key);
768 let mut backoff = cfg.retry_strategy.backoff().into_iter();
769
770 loop {
771 info!(
772 "Attempting to PUT record with key: {pretty_key:?} to network, with cfg {cfg:?}, retrying via backoff..."
773 );
774
775 let err = match self.put_record_once(record.clone(), cfg).await {
776 Ok(_) => break Ok(()),
777 Err(err) => err,
778 };
779
780 warn!("Failed to PUT record with key: {pretty_key:?} to network (retry via backoff) with error: {err:?}");
782
783 match backoff.next() {
784 Some(Some(duration)) => {
785 crate::time::sleep(duration).await;
786 }
787 _ => break Err(err),
788 }
789 }
790 }
791
792 async fn put_record_once(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
793 let record_key = record.key.clone();
794 let pretty_key = PrettyPrintRecordKey::from(&record_key);
795 info!(
796 "Putting record of {} - length {:?} to network",
797 pretty_key,
798 record.value.len()
799 );
800
801 let (sender, receiver) = oneshot::channel();
803 if let Some(put_record_to_peers) = &cfg.use_put_record_to {
804 self.send_network_swarm_cmd(NetworkSwarmCmd::PutRecordTo {
805 peers: put_record_to_peers.clone(),
806 record: record.clone(),
807 sender,
808 quorum: cfg.put_quorum,
809 });
810 } else {
811 self.send_network_swarm_cmd(NetworkSwarmCmd::PutRecord {
812 record: record.clone(),
813 sender,
814 quorum: cfg.put_quorum,
815 });
816 }
817
818 let response = receiver.await?;
819
820 if let Some((verification_kind, get_cfg)) = &cfg.verification {
821 let wait_duration = rand::thread_rng()
823 .gen_range(MIN_WAIT_BEFORE_READING_A_PUT..MAX_WAIT_BEFORE_READING_A_PUT);
824 sleep(wait_duration).await;
827 debug!("Attempting to verify {pretty_key:?} after we've slept for {wait_duration:?}");
828
829 if let VerificationKind::ChunkProof {
831 expected_proof,
832 nonce,
833 } = verification_kind
834 {
835 self.verify_chunk_existence(
836 NetworkAddress::from_record_key(&record_key),
837 *nonce,
838 expected_proof.clone(),
839 get_cfg.get_quorum,
840 get_cfg.retry_strategy,
841 )
842 .await?;
843 } else {
844 match self
845 .get_record_from_network(record.key.clone(), get_cfg)
846 .await
847 {
848 Ok(_) => {
849 debug!("Record {pretty_key:?} verified to be stored.");
850 }
851 Err(NetworkError::GetRecordError(GetRecordError::RecordNotFound)) => {
852 warn!("Record {pretty_key:?} not found after PUT, either rejected or not yet stored by nodes when we asked");
853 return Err(NetworkError::RecordNotStoredByNodes(
854 NetworkAddress::from_record_key(&record_key),
855 ));
856 }
857 Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { .. }))
858 if matches!(verification_kind, VerificationKind::Crdt) =>
859 {
860 warn!("Record {pretty_key:?} is split, which is okay since we're dealing with CRDTs");
861 }
862 Err(e) => {
863 debug!(
864 "Failed to verify record {pretty_key:?} to be stored with error: {e:?}"
865 );
866 return Err(e);
867 }
868 }
869 }
870 }
871 response
872 }
873
874 pub fn notify_fetch_completed(&self, key: RecordKey, record_type: ValidationType) {
877 self.send_local_swarm_cmd(LocalSwarmCmd::FetchCompleted((key, record_type)))
878 }
879
880 pub fn put_local_record(&self, record: Record, is_client_put: bool) {
883 debug!(
884 "Writing Record locally, for {:?} - length {:?}",
885 PrettyPrintRecordKey::from(&record.key),
886 record.value.len()
887 );
888 self.send_local_swarm_cmd(LocalSwarmCmd::PutLocalRecord {
889 record,
890 is_client_put,
891 })
892 }
893
894 pub async fn is_record_key_present_locally(&self, key: &RecordKey) -> Result<bool> {
896 let (sender, receiver) = oneshot::channel();
897 self.send_local_swarm_cmd(LocalSwarmCmd::RecordStoreHasKey {
898 key: key.clone(),
899 sender,
900 });
901
902 let is_present = receiver
903 .await
904 .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
905
906 Ok(is_present)
907 }
908
909 pub async fn get_all_local_record_addresses(
911 &self,
912 ) -> Result<HashMap<NetworkAddress, ValidationType>> {
913 let (sender, receiver) = oneshot::channel();
914 self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender });
915
916 let addrs = receiver
917 .await
918 .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
919 Ok(addrs)
920 }
921
922 pub async fn send_request(&self, req: Request, peer: PeerId) -> Result<Response> {
929 let (sender, receiver) = oneshot::channel();
930 self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
931 req: req.clone(),
932 peer,
933 sender: Some(sender),
934 });
935 let mut r = receiver.await?;
936
937 if let Err(error) = &r {
938 error!("Error in response: {:?}", error);
939
940 match error {
941 NetworkError::OutboundError(OutboundFailure::Io(_))
942 | NetworkError::OutboundError(OutboundFailure::ConnectionClosed) => {
943 warn!(
944 "Outbound failed for {req:?} .. {error:?}, redialing once and reattempting"
945 );
946 let (sender, receiver) = oneshot::channel();
947
948 debug!("Reattempting to send_request {req:?} to {peer:?}");
949 self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
950 req,
951 peer,
952 sender: Some(sender),
953 });
954
955 r = receiver.await?;
956 }
957 _ => {
958 warn!("Error in response: {:?}", error);
960 }
961 }
962 }
963
964 r
965 }
966
967 pub fn send_req_ignore_reply(&self, req: Request, peer: PeerId) {
970 let swarm_cmd = NetworkSwarmCmd::SendRequest {
971 req,
972 peer,
973 sender: None,
974 };
975 self.send_network_swarm_cmd(swarm_cmd)
976 }
977
978 pub fn send_response(&self, resp: Response, channel: MsgResponder) {
980 self.send_network_swarm_cmd(NetworkSwarmCmd::SendResponse { resp, channel })
981 }
982
983 pub async fn get_swarm_local_state(&self) -> Result<SwarmLocalState> {
985 let (sender, receiver) = oneshot::channel();
986 self.send_local_swarm_cmd(LocalSwarmCmd::GetSwarmLocalState(sender));
987 let state = receiver.await?;
988 Ok(state)
989 }
990
991 pub fn trigger_interval_replication(&self) {
992 self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIntervalReplication)
993 }
994
995 pub fn add_fresh_records_to_the_replication_fetcher(
996 &self,
997 holder: NetworkAddress,
998 keys: Vec<(NetworkAddress, ValidationType)>,
999 ) {
1000 self.send_local_swarm_cmd(LocalSwarmCmd::AddFreshReplicateRecords { holder, keys })
1001 }
1002
1003 pub fn record_node_issues(&self, peer_id: PeerId, issue: NodeIssue) {
1004 self.send_local_swarm_cmd(LocalSwarmCmd::RecordNodeIssue { peer_id, issue });
1005 }
1006
1007 pub fn historical_verify_quotes(&self, quotes: Vec<(PeerId, PaymentQuote)>) {
1008 self.send_local_swarm_cmd(LocalSwarmCmd::QuoteVerification { quotes });
1009 }
1010
1011 pub fn trigger_irrelevant_record_cleanup(&self) {
1012 self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIrrelevantRecordCleanup)
1013 }
1014
1015 pub fn add_network_density_sample(&self, distance: KBucketDistance) {
1016 self.send_local_swarm_cmd(LocalSwarmCmd::AddNetworkDensitySample { distance })
1017 }
1018
1019 pub fn notify_peer_scores(&self, peer_scores: Vec<(PeerId, bool)>) {
1020 self.send_local_swarm_cmd(LocalSwarmCmd::NotifyPeerScores { peer_scores })
1021 }
1022
1023 pub fn notify_node_version(&self, peer: PeerId, version: String) {
1024 self.send_local_swarm_cmd(LocalSwarmCmd::NotifyPeerVersion { peer, version })
1025 }
1026
1027 fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) {
1029 send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd);
1030 }
1031
1032 fn send_local_swarm_cmd(&self, cmd: LocalSwarmCmd) {
1034 send_local_swarm_cmd(self.local_swarm_cmd_sender().clone(), cmd);
1035 }
1036
1037 pub async fn get_all_close_peers_in_range_or_close_group(
1042 &self,
1043 key: &NetworkAddress,
1044 client: bool,
1045 ) -> Result<Vec<PeerId>> {
1046 let pretty_key = PrettyPrintKBucketKey(key.as_kbucket_key());
1047 debug!("Getting the all closest peers in range of {pretty_key:?}");
1048 let (sender, receiver) = oneshot::channel();
1049 self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork {
1050 key: key.clone(),
1051 sender,
1052 });
1053
1054 let found_peers = receiver.await?;
1055
1056 let result_len = found_peers.len();
1058 let mut closest_peers = found_peers;
1059
1060 if client {
1062 closest_peers.retain(|&x| x != self.peer_id());
1064 if result_len != closest_peers.len() {
1065 info!("Remove self client from the closest_peers");
1066 }
1067 }
1068
1069 if tracing::level_enabled!(tracing::Level::DEBUG) {
1070 let close_peers_pretty_print: Vec<_> = closest_peers
1071 .iter()
1072 .map(|peer_id| {
1073 format!(
1074 "{peer_id:?}({:?})",
1075 PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key())
1076 )
1077 })
1078 .collect();
1079
1080 debug!(
1081 "Network knowledge of closest peers to {pretty_key:?} are: {close_peers_pretty_print:?}"
1082 );
1083 }
1084
1085 let expanded_close_group = CLOSE_GROUP_SIZE + CLOSE_GROUP_SIZE / 2;
1086 let closest_peers = sort_peers_by_address(&closest_peers, key, expanded_close_group)?;
1087 Ok(closest_peers.into_iter().cloned().collect())
1088 }
1089
1090 pub async fn send_and_get_responses(
1095 &self,
1096 peers: &[PeerId],
1097 req: &Request,
1098 get_all_responses: bool,
1099 ) -> BTreeMap<PeerId, Result<Response>> {
1100 debug!("send_and_get_responses for {req:?}");
1101 let mut list_of_futures = peers
1102 .iter()
1103 .map(|peer| {
1104 Box::pin(async {
1105 let resp = self.send_request(req.clone(), *peer).await;
1106 (*peer, resp)
1107 })
1108 })
1109 .collect::<Vec<_>>();
1110
1111 let mut responses = BTreeMap::new();
1112 while !list_of_futures.is_empty() {
1113 let ((peer, resp), _, remaining_futures) = select_all(list_of_futures).await;
1114 let resp_string = match &resp {
1115 Ok(resp) => format!("{resp}"),
1116 Err(err) => format!("{err:?}"),
1117 };
1118 debug!("Got response from {peer:?} for the req: {req:?}, resp: {resp_string}");
1119 if !get_all_responses && resp.is_ok() {
1120 return BTreeMap::from([(peer, resp)]);
1121 }
1122 responses.insert(peer, resp);
1123 list_of_futures = remaining_futures;
1124 }
1125
1126 debug!("Received all responses for {req:?}");
1127 responses
1128 }
1129}
1130
1131pub fn multiaddr_is_global(multiaddr: &Multiaddr) -> bool {
1134 !multiaddr.iter().any(|addr| match addr {
1135 Protocol::Ip4(ip) => {
1136 ip.is_unspecified()
1139 | ip.is_private()
1140 | ip.is_loopback()
1141 | ip.is_link_local()
1142 | ip.is_documentation()
1143 | ip.is_broadcast()
1144 }
1145 _ => false,
1146 })
1147}
1148
1149pub(crate) fn multiaddr_pop_p2p(multiaddr: &mut Multiaddr) -> Option<PeerId> {
1151 if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
1152 let _ = multiaddr.pop();
1154 Some(peer_id)
1155 } else {
1156 None
1157 }
1158}
1159
1160pub(crate) fn multiaddr_get_p2p(multiaddr: &Multiaddr) -> Option<PeerId> {
1162 if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
1163 Some(peer_id)
1164 } else {
1165 None
1166 }
1167}
1168
1169pub(crate) fn multiaddr_strip_p2p(multiaddr: &Multiaddr) -> Multiaddr {
1172 let is_relayed = multiaddr.iter().any(|p| matches!(p, Protocol::P2pCircuit));
1173
1174 if is_relayed {
1175 let mut before_relay_protocol = true;
1178 let mut new_multi_addr = Multiaddr::empty();
1179 for p in multiaddr.iter() {
1180 if matches!(p, Protocol::P2pCircuit) {
1181 before_relay_protocol = false;
1182 }
1183 if matches!(p, Protocol::P2p(_)) && !before_relay_protocol {
1184 continue;
1185 }
1186 new_multi_addr.push(p);
1187 }
1188 new_multi_addr
1189 } else {
1190 multiaddr
1191 .iter()
1192 .filter(|p| !matches!(p, Protocol::P2p(_)))
1193 .collect()
1194 }
1195}
1196
1197pub(crate) fn multiaddr_get_ip(addr: &Multiaddr) -> Option<IpAddr> {
1199 addr.iter().find_map(|p| match p {
1200 Protocol::Ip4(addr) => Some(IpAddr::V4(addr)),
1201 Protocol::Ip6(addr) => Some(IpAddr::V6(addr)),
1202 _ => None,
1203 })
1204}
1205
1206pub(crate) fn multiaddr_get_port(addr: &Multiaddr) -> Option<u16> {
1207 addr.iter().find_map(|p| match p {
1208 Protocol::Udp(port) => Some(port),
1209 _ => None,
1210 })
1211}
1212
1213pub(crate) fn send_local_swarm_cmd(swarm_cmd_sender: Sender<LocalSwarmCmd>, cmd: LocalSwarmCmd) {
1214 let capacity = swarm_cmd_sender.capacity();
1215
1216 if capacity == 0 {
1217 error!(
1218 "SwarmCmd channel is full. Await capacity to send: {:?}",
1219 cmd
1220 );
1221 }
1222
1223 let _handle = spawn(async move {
1225 if let Err(error) = swarm_cmd_sender.send(cmd).await {
1226 error!("Failed to send SwarmCmd: {}", error);
1227 }
1228 });
1229}
1230
1231pub(crate) fn send_network_swarm_cmd(
1232 swarm_cmd_sender: Sender<NetworkSwarmCmd>,
1233 cmd: NetworkSwarmCmd,
1234) {
1235 let capacity = swarm_cmd_sender.capacity();
1236
1237 if capacity == 0 {
1238 error!(
1239 "SwarmCmd channel is full. Await capacity to send: {:?}",
1240 cmd
1241 );
1242 }
1243
1244 let _handle = spawn(async move {
1246 if let Err(error) = swarm_cmd_sender.send(cmd).await {
1247 error!("Failed to send SwarmCmd: {}", error);
1248 }
1249 });
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254 use super::*;
1255
1256 #[tokio::test]
1257 async fn test_network_sign_verify() -> eyre::Result<()> {
1258 let (network, _, _) =
1259 NetworkBuilder::new(Keypair::generate_ed25519(), false, vec![]).build_client();
1260 let msg = b"test message";
1261 let sig = network.sign(msg)?;
1262 assert!(network.verify(msg, &sig));
1263 Ok(())
1264 }
1265}