ant_networking/
lib.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9#[macro_use]
10extern crate tracing;
11
12mod bootstrap;
13mod circular_vec;
14mod cmd;
15mod config;
16mod driver;
17mod error;
18mod event;
19mod external_address;
20mod fifo_register;
21mod graph;
22mod log_markers;
23#[cfg(feature = "open-metrics")]
24mod metrics;
25mod network_discovery;
26mod record_store;
27mod record_store_api;
28mod relay_manager;
29mod replication_fetcher;
30pub mod time;
31mod transport;
32
33use cmd::LocalSwarmCmd;
34use xor_name::XorName;
35
36// re-export arch dependent deps for use in the crate, or above
37pub use self::{
38    cmd::{NodeIssue, SwarmLocalState},
39    config::{GetRecordCfg, PutRecordCfg, ResponseQuorum, RetryStrategy, VerificationKind},
40    driver::{NetworkBuilder, SwarmDriver, MAX_PACKET_SIZE},
41    error::{GetRecordError, NetworkError},
42    event::{MsgResponder, NetworkEvent},
43    graph::get_graph_entry_from_record,
44    record_store::NodeRecordStore,
45};
46#[cfg(feature = "open-metrics")]
47pub use metrics::service::MetricsRegistries;
48pub use time::{interval, sleep, spawn, Instant, Interval};
49
50use self::{cmd::NetworkSwarmCmd, error::Result};
51use ant_evm::{PaymentQuote, QuotingMetrics};
52use ant_protocol::{
53    error::Error as ProtocolError,
54    messages::{ChunkProof, Nonce, Query, QueryResponse, Request, Response},
55    storage::{DataTypes, Pointer, Scratchpad, ValidationType},
56    NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
57};
58use futures::future::select_all;
59use libp2p::{
60    identity::Keypair,
61    kad::{KBucketDistance, KBucketKey, Record, RecordKey},
62    multiaddr::Protocol,
63    request_response::OutboundFailure,
64    Multiaddr, PeerId,
65};
66use rand::Rng;
67use std::{
68    collections::{BTreeMap, HashMap},
69    net::IpAddr,
70    sync::Arc,
71};
72use tokio::sync::{
73    mpsc::{self, Sender},
74    oneshot,
75};
76use tokio::time::Duration;
77use {
78    ant_protocol::storage::GraphEntry,
79    ant_protocol::storage::{
80        try_deserialize_record, try_serialize_record, RecordHeader, RecordKind,
81    },
82    std::collections::HashSet,
83};
84
85/// Majority of a given group (i.e. > 1/2).
86#[inline]
87pub const fn close_group_majority() -> usize {
88    // Calculate the majority of the close group size by dividing it by 2 and adding 1.
89    // This ensures that the majority is always greater than half.
90    CLOSE_GROUP_SIZE / 2 + 1
91}
92
93/// Max duration to wait for verification.
94const MAX_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(750);
95/// Min duration to wait for verification
96const MIN_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(300);
97
98/// Sort the provided peers by their distance to the given `NetworkAddress`.
99/// Return with the closest expected number of entries if has.
100pub fn sort_peers_by_address<'a>(
101    peers: &'a Vec<PeerId>,
102    address: &NetworkAddress,
103    expected_entries: usize,
104) -> Result<Vec<&'a PeerId>> {
105    sort_peers_by_key(peers, &address.as_kbucket_key(), expected_entries)
106}
107
108/// Sort the provided peers by their distance to the given `KBucketKey`.
109/// Return with the closest expected number of entries if has.
110pub fn sort_peers_by_key<'a, T>(
111    peers: &'a Vec<PeerId>,
112    key: &KBucketKey<T>,
113    expected_entries: usize,
114) -> Result<Vec<&'a PeerId>> {
115    // Check if there are enough peers to satisfy the request.
116    // bail early if that's not the case
117    if CLOSE_GROUP_SIZE > peers.len() {
118        warn!("Not enough peers in the k-bucket to satisfy the request");
119        return Err(NetworkError::NotEnoughPeers {
120            found: peers.len(),
121            required: CLOSE_GROUP_SIZE,
122        });
123    }
124
125    // Create a vector of tuples where each tuple is a reference to a peer and its distance to the key.
126    // This avoids multiple computations of the same distance in the sorting process.
127    let mut peer_distances: Vec<(&PeerId, KBucketDistance)> = Vec::with_capacity(peers.len());
128
129    for peer_id in peers {
130        let addr = NetworkAddress::from_peer(*peer_id);
131        let distance = key.distance(&addr.as_kbucket_key());
132        peer_distances.push((peer_id, distance));
133    }
134
135    // Sort the vector of tuples by the distance.
136    peer_distances.sort_by(|a, b| a.1.cmp(&b.1));
137
138    // Collect the sorted peers into a new vector.
139    let sorted_peers: Vec<_> = peer_distances
140        .into_iter()
141        .take(expected_entries)
142        .map(|(peer_id, _)| peer_id)
143        .collect();
144
145    Ok(sorted_peers)
146}
147
148#[derive(Clone, Debug)]
149/// API to interact with the underlying Swarm
150pub struct Network {
151    inner: Arc<NetworkInner>,
152}
153
154/// The actual implementation of the Network. The other is just a wrapper around this, so that we don't expose
155/// the Arc from the interface.
156#[derive(Debug)]
157struct NetworkInner {
158    network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
159    local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
160    peer_id: PeerId,
161    keypair: Keypair,
162}
163
164impl Network {
165    pub fn new(
166        network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
167        local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
168        peer_id: PeerId,
169        keypair: Keypair,
170    ) -> Self {
171        Self {
172            inner: Arc::new(NetworkInner {
173                network_swarm_cmd_sender,
174                local_swarm_cmd_sender,
175                peer_id,
176                keypair,
177            }),
178        }
179    }
180
181    /// Returns the `PeerId` of the instance.
182    pub fn peer_id(&self) -> PeerId {
183        self.inner.peer_id
184    }
185
186    /// Returns the `Keypair` of the instance.
187    pub fn keypair(&self) -> &Keypair {
188        &self.inner.keypair
189    }
190
191    /// Get the sender to send a `NetworkSwarmCmd` to the underlying `Swarm`.
192    pub(crate) fn network_swarm_cmd_sender(&self) -> &mpsc::Sender<NetworkSwarmCmd> {
193        &self.inner.network_swarm_cmd_sender
194    }
195    /// Get the sender to send a `LocalSwarmCmd` to the underlying `Swarm`.
196    pub(crate) fn local_swarm_cmd_sender(&self) -> &mpsc::Sender<LocalSwarmCmd> {
197        &self.inner.local_swarm_cmd_sender
198    }
199
200    /// Signs the given data with the node's keypair.
201    pub fn sign(&self, msg: &[u8]) -> Result<Vec<u8>> {
202        self.keypair().sign(msg).map_err(NetworkError::from)
203    }
204
205    /// Verifies a signature for the given data and the node's public key.
206    pub fn verify(&self, msg: &[u8], sig: &[u8]) -> bool {
207        self.keypair().public().verify(msg, sig)
208    }
209
210    /// Returns the protobuf serialised PublicKey to allow messaging out for share.
211    pub fn get_pub_key(&self) -> Vec<u8> {
212        self.keypair().public().encode_protobuf()
213    }
214
215    /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
216    /// Excludes the client's `PeerId` while calculating the closest peers.
217    pub async fn client_get_all_close_peers_in_range_or_close_group(
218        &self,
219        key: &NetworkAddress,
220    ) -> Result<Vec<PeerId>> {
221        self.get_all_close_peers_in_range_or_close_group(key, true)
222            .await
223    }
224
225    /// Returns the closest peers to the given `NetworkAddress`, sorted by their distance to the key.
226    ///
227    /// Includes our node's `PeerId` while calculating the closest peers.
228    pub async fn node_get_closest_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
229        self.get_all_close_peers_in_range_or_close_group(key, false)
230            .await
231    }
232
233    /// Returns a list of peers in local RT and their correspondent Multiaddr.
234    /// Does not include self
235    pub async fn get_local_peers_with_multiaddr(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>> {
236        let (sender, receiver) = oneshot::channel();
237        self.send_local_swarm_cmd(LocalSwarmCmd::GetPeersWithMultiaddr { sender });
238        receiver
239            .await
240            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
241    }
242
243    /// Returns a map where each key is the ilog2 distance of that Kbucket
244    /// and each value is a vector of peers in that bucket.
245    /// Does not include self
246    pub async fn get_kbuckets(&self) -> Result<BTreeMap<u32, Vec<PeerId>>> {
247        let (sender, receiver) = oneshot::channel();
248        self.send_local_swarm_cmd(LocalSwarmCmd::GetKBuckets { sender });
249        receiver
250            .await
251            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
252    }
253
254    /// Returns all the PeerId from all the KBuckets from our local Routing Table
255    /// Also contains our own PeerId.
256    pub async fn get_closest_k_value_local_peers(&self) -> Result<Vec<PeerId>> {
257        let (sender, receiver) = oneshot::channel();
258        self.send_local_swarm_cmd(LocalSwarmCmd::GetClosestKLocalPeers { sender });
259
260        receiver
261            .await
262            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
263    }
264
265    /// Returns the replicate candidates in range.
266    pub async fn get_replicate_candidates(&self, data_addr: NetworkAddress) -> Result<Vec<PeerId>> {
267        let (sender, receiver) = oneshot::channel();
268        self.send_local_swarm_cmd(LocalSwarmCmd::GetReplicateCandidates { data_addr, sender });
269
270        let candidate = receiver
271            .await
272            .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
273
274        Ok(candidate)
275    }
276
277    /// Get the Chunk existence proof from the close nodes to the provided chunk address.
278    /// This is to be used by client only to verify the success of the upload.
279    pub async fn verify_chunk_existence(
280        &self,
281        chunk_address: NetworkAddress,
282        nonce: Nonce,
283        expected_proof: ChunkProof,
284        quorum: ResponseQuorum,
285        retry_strategy: RetryStrategy,
286    ) -> Result<()> {
287        let total_attempts = retry_strategy.attempts();
288
289        let pretty_key = PrettyPrintRecordKey::from(&chunk_address.to_record_key()).into_owned();
290        let expected_n_verified = quorum.get_value();
291
292        let mut close_nodes = Vec::new();
293        let mut retry_attempts = 0;
294        while retry_attempts < total_attempts {
295            // the check should happen before incrementing retry_attempts
296            if retry_attempts % 2 == 0 {
297                // Do not query the closest_peers during every re-try attempt.
298                // The close_nodes don't change often and the previous set of close_nodes might be taking a while to write
299                // the Chunk, so query them again incase of a failure.
300                close_nodes = self
301                    .client_get_all_close_peers_in_range_or_close_group(&chunk_address)
302                    .await?;
303            }
304            retry_attempts += 1;
305            info!(
306                "Getting ChunkProof for {pretty_key:?}. Attempts: {retry_attempts:?}/{total_attempts:?}",
307            );
308
309            let request = Request::Query(Query::GetChunkExistenceProof {
310                key: chunk_address.clone(),
311                nonce,
312                difficulty: 1,
313            });
314            let responses = self
315                .send_and_get_responses(&close_nodes, &request, true)
316                .await;
317            let n_verified = responses
318                .into_iter()
319                .filter_map(|(peer, resp)| {
320                    if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(proofs))) =
321                        resp
322                    {
323                        if proofs.is_empty() {
324                            warn!("Failed to verify the ChunkProof from {peer:?}. Returned proof is empty.");
325                            None
326                        } else if let Ok(ref proof) = proofs[0].1 {
327                            if expected_proof.verify(proof) {
328                                debug!("Got a valid ChunkProof from {peer:?}");
329                                Some(())
330                            } else {
331                                warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?");
332                                None
333                            }
334                        } else {
335                            warn!("Failed to verify the ChunkProof from {peer:?}, returned with error {:?}", proofs[0].1);
336                            None
337                        }
338                    } else {
339                        debug!("Did not get a valid response for the ChunkProof from {peer:?}");
340                        None
341                    }
342                })
343                .count();
344            debug!("Got {n_verified} verified chunk existence proofs for chunk_address {chunk_address:?}");
345
346            if n_verified >= expected_n_verified {
347                return Ok(());
348            }
349            warn!("The obtained {n_verified} verified proofs did not match the expected {expected_n_verified} verified proofs");
350            // Sleep to avoid firing queries too close to even choke the nodes further.
351            let waiting_time = if retry_attempts == 1 {
352                MIN_WAIT_BEFORE_READING_A_PUT
353            } else {
354                MIN_WAIT_BEFORE_READING_A_PUT + MIN_WAIT_BEFORE_READING_A_PUT
355            };
356            sleep(waiting_time).await;
357        }
358
359        Err(NetworkError::FailedToVerifyChunkProof(
360            chunk_address.clone(),
361        ))
362    }
363
364    /// Get the store costs from the majority of the closest peers to the provided RecordKey.
365    /// Record already exists will have a cost of zero to be returned.
366    ///
367    /// Ignore the quote from any peers from `ignore_peers`.
368    /// This is useful if we want to repay a different PeerId on failure.
369    pub async fn get_store_quote_from_network(
370        &self,
371        record_address: NetworkAddress,
372        data_type: u32,
373        data_size: usize,
374        ignore_peers: Vec<PeerId>,
375    ) -> Result<Vec<(PeerId, PaymentQuote)>> {
376        // The requirement of having at least CLOSE_GROUP_SIZE
377        // close nodes will be checked internally automatically.
378        let mut close_nodes = self
379            .client_get_all_close_peers_in_range_or_close_group(&record_address)
380            .await?;
381        // Filter out results from the ignored peers.
382        close_nodes.retain(|peer_id| !ignore_peers.contains(peer_id));
383        info!(
384            "For record {record_address:?} quoting {} nodes. ignore_peers is {ignore_peers:?}",
385            close_nodes.len()
386        );
387
388        if close_nodes.is_empty() {
389            error!("Can't get store_cost of {record_address:?}, as all close_nodes are ignored");
390            return Err(NetworkError::NotEnoughPeersForStoreCostRequest);
391        }
392
393        // Client shall decide whether to carry out storage verification or not.
394        let request = Request::Query(Query::GetStoreQuote {
395            key: record_address.clone(),
396            data_type,
397            data_size,
398            nonce: None,
399            difficulty: 0,
400        });
401        let responses = self
402            .send_and_get_responses(&close_nodes, &request, true)
403            .await;
404
405        // consider data to be already paid for if 1/2 of the close nodes already have it
406        let mut peer_already_have_it = 0;
407        let enough_peers_already_have_it = close_nodes.len() / 2;
408
409        let mut peers_returned_error = 0;
410
411        // loop over responses
412        let mut all_quotes = vec![];
413        let mut quotes_to_pay = vec![];
414        for (peer, response) in responses {
415            info!("StoreCostReq for {record_address:?} received response: {response:?}");
416            match response {
417                Ok(Response::Query(QueryResponse::GetStoreQuote {
418                    quote: Ok(quote),
419                    peer_address,
420                    storage_proofs,
421                })) => {
422                    if !storage_proofs.is_empty() {
423                        debug!("Storage proofing during GetStoreQuote to be implemented.");
424                    }
425
426                    // Check the quote itself is valid.
427                    if !quote.check_is_signed_by_claimed_peer(peer) {
428                        warn!("Received invalid quote from {peer_address:?}, {quote:?}");
429                        continue;
430                    }
431
432                    // Check if the returned data type matches the request
433                    if quote.quoting_metrics.data_type != data_type {
434                        warn!("Received invalid quote from {peer_address:?}, {quote:?}. Data type did not match the request.");
435                        continue;
436                    }
437
438                    all_quotes.push((peer_address.clone(), quote.clone()));
439                    quotes_to_pay.push((peer, quote));
440                }
441                Ok(Response::Query(QueryResponse::GetStoreQuote {
442                    quote: Err(ProtocolError::RecordExists(_)),
443                    peer_address,
444                    storage_proofs,
445                })) => {
446                    if !storage_proofs.is_empty() {
447                        debug!("Storage proofing during GetStoreQuote to be implemented.");
448                    }
449                    peer_already_have_it += 1;
450                    info!("Address {record_address:?} was already paid for according to {peer_address:?} ({peer_already_have_it}/{enough_peers_already_have_it})");
451                    if peer_already_have_it >= enough_peers_already_have_it {
452                        info!("Address {record_address:?} was already paid for according to {peer_already_have_it} peers, ending quote request");
453                        return Ok(vec![]);
454                    }
455                }
456                Err(err) => {
457                    error!("Got an error while requesting quote from peer {peer:?}: {err}");
458                    peers_returned_error += 1;
459                }
460                _ => {
461                    error!("Got an unexpected response while requesting quote from peer {peer:?}: {response:?}");
462                    peers_returned_error += 1;
463                }
464            }
465        }
466
467        if quotes_to_pay.is_empty() {
468            error!(
469                "Could not fetch any quotes. {} peers returned an error.",
470                peers_returned_error
471            );
472            return Err(NetworkError::NoStoreCostResponses);
473        }
474
475        Ok(quotes_to_pay)
476    }
477
478    /// Get the Record from the network
479    /// Carry out re-attempts if required
480    /// In case a target_record is provided, only return when fetched target.
481    /// Otherwise count it as a failure when all attempts completed.
482    ///
483    /// It also handles the split record error for GraphEntry.
484    pub async fn get_record_from_network(
485        &self,
486        key: RecordKey,
487        cfg: &GetRecordCfg,
488    ) -> Result<Record> {
489        let pretty_key = PrettyPrintRecordKey::from(&key);
490        let mut backoff = cfg.retry_strategy.backoff().into_iter();
491
492        loop {
493            info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",);
494            let (sender, receiver) = oneshot::channel();
495            self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord {
496                key: key.clone(),
497                sender,
498                cfg: cfg.clone(),
499            });
500            let result = match receiver.await {
501                Ok(result) => result,
502                Err(err) => {
503                    error!(
504                        "When fetching record {pretty_key:?}, encountered a channel error {err:?}"
505                    );
506                    // Do not attempt retries.
507                    return Err(NetworkError::InternalMsgChannelDropped);
508                }
509            };
510
511            let err = match result {
512                Ok(record) => {
513                    info!("Record returned: {pretty_key:?}.");
514                    return Ok(record);
515                }
516                Err(err) => err,
517            };
518
519            // log the results
520            match &err {
521                GetRecordError::RecordDoesNotMatch(_) => {
522                    warn!("The returned record does not match target {pretty_key:?}.");
523                }
524                GetRecordError::NotEnoughCopies { expected, got, .. } => {
525                    warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}.");
526                }
527                // libp2p RecordNotFound does mean no holders answered.
528                // it does not actually mean the record does not exist.
529                // just that those asked did not have it
530                GetRecordError::RecordNotFound => {
531                    warn!("No holder of record '{pretty_key:?}' found.");
532                }
533                // This is returned during SplitRecordError, we should not get this error here.
534                GetRecordError::RecordKindMismatch => {
535                    error!("Record kind mismatch for {pretty_key:?}. This error should not happen here.");
536                }
537                GetRecordError::SplitRecord { result_map } => {
538                    error!("Encountered a split record for {pretty_key:?}.");
539                    if let Some(record) = Self::handle_split_record_error(result_map, &key)? {
540                        info!("Merged the split record for {pretty_key:?}, into a single record");
541                        return Ok(record);
542                    }
543                }
544                GetRecordError::QueryTimeout => {
545                    error!("Encountered query timeout for {pretty_key:?}.");
546                }
547            }
548
549            match backoff.next() {
550                Some(Some(duration)) => {
551                    crate::time::sleep(duration).await;
552                    debug!("Getting record from network of {pretty_key:?} via backoff...");
553                }
554                _ => break Err(err.into()),
555            }
556        }
557    }
558
559    /// Handle the split record error.
560    fn handle_split_record_error(
561        result_map: &HashMap<XorName, (Record, HashSet<PeerId>)>,
562        key: &RecordKey,
563    ) -> std::result::Result<Option<Record>, NetworkError> {
564        let pretty_key = PrettyPrintRecordKey::from(key);
565
566        // attempt to deserialise and accumulate all GraphEntries
567        let results_count = result_map.len();
568        let mut accumulated_graphentries = HashSet::new();
569        let mut valid_scratchpad: Option<Scratchpad> = None;
570        let mut valid_pointer: Option<Pointer> = None;
571
572        if results_count > 1 {
573            let mut record_kind = None;
574            info!("For record {pretty_key:?}, we have more than one result returned.");
575            for (record, _) in result_map.values() {
576                let Ok(header) = RecordHeader::from_record(record) else {
577                    continue;
578                };
579                let kind = record_kind.get_or_insert(header.kind);
580                // FIXME: the first record dictates the kind, but we should check all records are of the same kind.
581                // And somehow discard the incorrect ones.
582                if *kind != header.kind {
583                    error!("Encountered a split record for {pretty_key:?} with different RecordHeaders. Expected {kind:?} but got {:?}. Skipping",header.kind);
584                    continue;
585                }
586
587                match kind {
588                    RecordKind::DataOnly(DataTypes::Chunk) | RecordKind::DataWithPayment(_) => {
589                        error!("Encountered a split record for {pretty_key:?} with unexpected RecordKind {kind:?}, skipping.");
590                        continue;
591                    }
592                    RecordKind::DataOnly(DataTypes::GraphEntry) => {
593                        match get_graph_entry_from_record(record) {
594                            Ok(graphentries) => {
595                                accumulated_graphentries.extend(graphentries);
596                                info!("For record {pretty_key:?}, we have a split record for a GraphEntry. Accumulating GraphEntry: {}", accumulated_graphentries.len());
597                            }
598                            Err(_) => {
599                                warn!("Failed to deserialize GraphEntry for {pretty_key:?}, skipping accumulation");
600                                continue;
601                            }
602                        }
603                    }
604                    RecordKind::DataOnly(DataTypes::Pointer) => {
605                        info!("For record {pretty_key:?}, we have a split record for a pointer. Selecting the one with the highest count");
606                        let Ok(pointer) = try_deserialize_record::<Pointer>(record) else {
607                            error!(
608                                "Failed to deserialize pointer {pretty_key}. Skipping accumulation"
609                            );
610                            continue;
611                        };
612
613                        if !pointer.verify_signature() {
614                            warn!("Rejecting Pointer for {pretty_key} PUT with invalid signature");
615                            continue;
616                        }
617
618                        if let Some(old) = &valid_pointer {
619                            if old.counter() >= pointer.counter() {
620                                info!("Rejecting Pointer for {pretty_key} with lower count than the previous one");
621                                continue;
622                            }
623                        }
624                        valid_pointer = Some(pointer);
625                    }
626                    RecordKind::DataOnly(DataTypes::Scratchpad) => {
627                        info!("For record {pretty_key:?}, we have a split record for a scratchpad. Selecting the one with the highest count");
628                        let Ok(scratchpad) = try_deserialize_record::<Scratchpad>(record) else {
629                            error!(
630                                "Failed to deserialize scratchpad {pretty_key}. Skipping accumulation"
631                            );
632                            continue;
633                        };
634
635                        if !scratchpad.verify_signature() {
636                            warn!(
637                                "Rejecting Scratchpad for {pretty_key} PUT with invalid signature"
638                            );
639                            continue;
640                        }
641
642                        if let Some(old) = &valid_scratchpad {
643                            if old.counter() >= scratchpad.counter() {
644                                info!("Rejecting Scratchpad for {pretty_key} with lower count than the previous one");
645                                continue;
646                            }
647                        }
648                        valid_scratchpad = Some(scratchpad);
649                    }
650                }
651            }
652        }
653
654        // Return the accumulated GraphEntries as a single record
655        if accumulated_graphentries.len() > 1 {
656            info!("For record {pretty_key:?} task found split record for a GraphEntry, accumulated and sending them as a single record");
657            let accumulated_graphentries = accumulated_graphentries
658                .into_iter()
659                .collect::<Vec<GraphEntry>>();
660            let record = Record {
661                key: key.clone(),
662                value: try_serialize_record(&accumulated_graphentries, RecordKind::DataOnly(DataTypes::GraphEntry))
663                    .map_err(|err| {
664                        error!(
665                            "Error while serializing the accumulated GraphEntries for {pretty_key:?}: {err:?}"
666                        );
667                        NetworkError::from(err)
668                    })?
669                    .to_vec(),
670                publisher: None,
671                expires: None,
672            };
673            return Ok(Some(record));
674        } else if let Some(pointer) = valid_pointer {
675            info!("For record {pretty_key:?} task found a valid pointer, returning it.");
676            let record_value =
677                try_serialize_record(&pointer, RecordKind::DataOnly(DataTypes::Pointer))
678                    .map_err(|err| {
679                        error!("Error while serializing the pointer for {pretty_key:?}: {err:?}");
680                        NetworkError::from(err)
681                    })?
682                    .to_vec();
683
684            let record = Record {
685                key: key.clone(),
686                value: record_value,
687                publisher: None,
688                expires: None,
689            };
690            return Ok(Some(record));
691        } else if let Some(scratchpad) = valid_scratchpad {
692            info!("For record {pretty_key:?} task found a valid scratchpad, returning it.");
693            let record_value =
694                try_serialize_record(&scratchpad, RecordKind::DataOnly(DataTypes::Scratchpad))
695                    .map_err(|err| {
696                        error!(
697                            "Error while serializing the scratchpad for {pretty_key:?}: {err:?}"
698                        );
699                        NetworkError::from(err)
700                    })?
701                    .to_vec();
702
703            let record = Record {
704                key: key.clone(),
705                value: record_value,
706                publisher: None,
707                expires: None,
708            };
709            return Ok(Some(record));
710        }
711        Ok(None)
712    }
713
714    /// Get the quoting metrics for storing the next record from the network
715    pub async fn get_local_quoting_metrics(
716        &self,
717        key: RecordKey,
718        data_type: u32,
719        data_size: usize,
720    ) -> Result<(QuotingMetrics, bool)> {
721        let (sender, receiver) = oneshot::channel();
722        self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalQuotingMetrics {
723            key,
724            data_type,
725            data_size,
726            sender,
727        });
728
729        let quoting_metrics = receiver
730            .await
731            .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
732        Ok(quoting_metrics)
733    }
734
735    /// Notify the node receicced a payment.
736    pub fn notify_payment_received(&self) {
737        self.send_local_swarm_cmd(LocalSwarmCmd::PaymentReceived);
738    }
739
740    /// Get `Record` from the local RecordStore
741    pub async fn get_local_record(&self, key: &RecordKey) -> Result<Option<Record>> {
742        let (sender, receiver) = oneshot::channel();
743        self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalRecord {
744            key: key.clone(),
745            sender,
746        });
747
748        receiver
749            .await
750            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
751    }
752
753    /// Whether the target peer is considered blacklisted by self
754    pub async fn is_peer_shunned(&self, target: NetworkAddress) -> Result<bool> {
755        let (sender, receiver) = oneshot::channel();
756        self.send_local_swarm_cmd(LocalSwarmCmd::IsPeerShunned { target, sender });
757
758        receiver
759            .await
760            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
761    }
762
763    /// Put `Record` to network
764    /// Optionally verify the record is stored after putting it to network
765    /// If verify is on, we retry.
766    pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
767        let pretty_key = PrettyPrintRecordKey::from(&record.key);
768        let mut backoff = cfg.retry_strategy.backoff().into_iter();
769
770        loop {
771            info!(
772                "Attempting to PUT record with key: {pretty_key:?} to network, with cfg {cfg:?}, retrying via backoff..."
773            );
774
775            let err = match self.put_record_once(record.clone(), cfg).await {
776                Ok(_) => break Ok(()),
777                Err(err) => err,
778            };
779
780            // FIXME: Skip if we get a permanent error during verification
781            warn!("Failed to PUT record with key: {pretty_key:?} to network (retry via backoff) with error: {err:?}");
782
783            match backoff.next() {
784                Some(Some(duration)) => {
785                    crate::time::sleep(duration).await;
786                }
787                _ => break Err(err),
788            }
789        }
790    }
791
792    async fn put_record_once(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
793        let record_key = record.key.clone();
794        let pretty_key = PrettyPrintRecordKey::from(&record_key);
795        info!(
796            "Putting record of {} - length {:?} to network",
797            pretty_key,
798            record.value.len()
799        );
800
801        // Waiting for a response to avoid flushing to network too quick that causing choke
802        let (sender, receiver) = oneshot::channel();
803        if let Some(put_record_to_peers) = &cfg.use_put_record_to {
804            self.send_network_swarm_cmd(NetworkSwarmCmd::PutRecordTo {
805                peers: put_record_to_peers.clone(),
806                record: record.clone(),
807                sender,
808                quorum: cfg.put_quorum,
809            });
810        } else {
811            self.send_network_swarm_cmd(NetworkSwarmCmd::PutRecord {
812                record: record.clone(),
813                sender,
814                quorum: cfg.put_quorum,
815            });
816        }
817
818        let response = receiver.await?;
819
820        if let Some((verification_kind, get_cfg)) = &cfg.verification {
821            // Generate a random duration between MAX_WAIT_BEFORE_READING_A_PUT and MIN_WAIT_BEFORE_READING_A_PUT
822            let wait_duration = rand::thread_rng()
823                .gen_range(MIN_WAIT_BEFORE_READING_A_PUT..MAX_WAIT_BEFORE_READING_A_PUT);
824            // Small wait before we attempt to verify.
825            // There will be `re-attempts` to be carried out within the later step anyway.
826            sleep(wait_duration).await;
827            debug!("Attempting to verify {pretty_key:?} after we've slept for {wait_duration:?}");
828
829            // Verify the record is stored, requiring re-attempts
830            if let VerificationKind::ChunkProof {
831                expected_proof,
832                nonce,
833            } = verification_kind
834            {
835                self.verify_chunk_existence(
836                    NetworkAddress::from_record_key(&record_key),
837                    *nonce,
838                    expected_proof.clone(),
839                    get_cfg.get_quorum,
840                    get_cfg.retry_strategy,
841                )
842                .await?;
843            } else {
844                match self
845                    .get_record_from_network(record.key.clone(), get_cfg)
846                    .await
847                {
848                    Ok(_) => {
849                        debug!("Record {pretty_key:?} verified to be stored.");
850                    }
851                    Err(NetworkError::GetRecordError(GetRecordError::RecordNotFound)) => {
852                        warn!("Record {pretty_key:?} not found after PUT, either rejected or not yet stored by nodes when we asked");
853                        return Err(NetworkError::RecordNotStoredByNodes(
854                            NetworkAddress::from_record_key(&record_key),
855                        ));
856                    }
857                    Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { .. }))
858                        if matches!(verification_kind, VerificationKind::Crdt) =>
859                    {
860                        warn!("Record {pretty_key:?} is split, which is okay since we're dealing with CRDTs");
861                    }
862                    Err(e) => {
863                        debug!(
864                            "Failed to verify record {pretty_key:?} to be stored with error: {e:?}"
865                        );
866                        return Err(e);
867                    }
868                }
869            }
870        }
871        response
872    }
873
874    /// Notify ReplicationFetch a fetch attempt is completed.
875    /// (but it won't trigger any real writes to disk)
876    pub fn notify_fetch_completed(&self, key: RecordKey, record_type: ValidationType) {
877        self.send_local_swarm_cmd(LocalSwarmCmd::FetchCompleted((key, record_type)))
878    }
879
880    /// Put `Record` to the local RecordStore
881    /// Must be called after the validations are performed on the Record
882    pub fn put_local_record(&self, record: Record, is_client_put: bool) {
883        debug!(
884            "Writing Record locally, for {:?} - length {:?}",
885            PrettyPrintRecordKey::from(&record.key),
886            record.value.len()
887        );
888        self.send_local_swarm_cmd(LocalSwarmCmd::PutLocalRecord {
889            record,
890            is_client_put,
891        })
892    }
893
894    /// Returns true if a RecordKey is present locally in the RecordStore
895    pub async fn is_record_key_present_locally(&self, key: &RecordKey) -> Result<bool> {
896        let (sender, receiver) = oneshot::channel();
897        self.send_local_swarm_cmd(LocalSwarmCmd::RecordStoreHasKey {
898            key: key.clone(),
899            sender,
900        });
901
902        let is_present = receiver
903            .await
904            .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
905
906        Ok(is_present)
907    }
908
909    /// Returns the Addresses of all the locally stored Records
910    pub async fn get_all_local_record_addresses(
911        &self,
912    ) -> Result<HashMap<NetworkAddress, ValidationType>> {
913        let (sender, receiver) = oneshot::channel();
914        self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender });
915
916        let addrs = receiver
917            .await
918            .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
919        Ok(addrs)
920    }
921
922    /// Send `Request` to the given `PeerId` and await for the response. If `self` is the recipient,
923    /// then the `Request` is forwarded to itself and handled, and a corresponding `Response` is created
924    /// and returned to itself. Hence the flow remains the same and there is no branching at the upper
925    /// layers.
926    ///
927    /// If an outbound issue is raised, we retry once more to send the request before returning an error.
928    pub async fn send_request(&self, req: Request, peer: PeerId) -> Result<Response> {
929        let (sender, receiver) = oneshot::channel();
930        self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
931            req: req.clone(),
932            peer,
933            sender: Some(sender),
934        });
935        let mut r = receiver.await?;
936
937        if let Err(error) = &r {
938            error!("Error in response: {:?}", error);
939
940            match error {
941                NetworkError::OutboundError(OutboundFailure::Io(_))
942                | NetworkError::OutboundError(OutboundFailure::ConnectionClosed) => {
943                    warn!(
944                        "Outbound failed for {req:?} .. {error:?}, redialing once and reattempting"
945                    );
946                    let (sender, receiver) = oneshot::channel();
947
948                    debug!("Reattempting to send_request {req:?} to {peer:?}");
949                    self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
950                        req,
951                        peer,
952                        sender: Some(sender),
953                    });
954
955                    r = receiver.await?;
956                }
957                _ => {
958                    // If the record is found, we should log the error and continue
959                    warn!("Error in response: {:?}", error);
960                }
961            }
962        }
963
964        r
965    }
966
967    /// Send `Request` to the given `PeerId` and do _not_ await a response here.
968    /// Instead the Response will be handled by the common `response_handler`
969    pub fn send_req_ignore_reply(&self, req: Request, peer: PeerId) {
970        let swarm_cmd = NetworkSwarmCmd::SendRequest {
971            req,
972            peer,
973            sender: None,
974        };
975        self.send_network_swarm_cmd(swarm_cmd)
976    }
977
978    /// Send a `Response` through the channel opened by the requester.
979    pub fn send_response(&self, resp: Response, channel: MsgResponder) {
980        self.send_network_swarm_cmd(NetworkSwarmCmd::SendResponse { resp, channel })
981    }
982
983    /// Return a `SwarmLocalState` with some information obtained from swarm's local state.
984    pub async fn get_swarm_local_state(&self) -> Result<SwarmLocalState> {
985        let (sender, receiver) = oneshot::channel();
986        self.send_local_swarm_cmd(LocalSwarmCmd::GetSwarmLocalState(sender));
987        let state = receiver.await?;
988        Ok(state)
989    }
990
991    pub fn trigger_interval_replication(&self) {
992        self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIntervalReplication)
993    }
994
995    pub fn add_fresh_records_to_the_replication_fetcher(
996        &self,
997        holder: NetworkAddress,
998        keys: Vec<(NetworkAddress, ValidationType)>,
999    ) {
1000        self.send_local_swarm_cmd(LocalSwarmCmd::AddFreshReplicateRecords { holder, keys })
1001    }
1002
1003    pub fn record_node_issues(&self, peer_id: PeerId, issue: NodeIssue) {
1004        self.send_local_swarm_cmd(LocalSwarmCmd::RecordNodeIssue { peer_id, issue });
1005    }
1006
1007    pub fn historical_verify_quotes(&self, quotes: Vec<(PeerId, PaymentQuote)>) {
1008        self.send_local_swarm_cmd(LocalSwarmCmd::QuoteVerification { quotes });
1009    }
1010
1011    pub fn trigger_irrelevant_record_cleanup(&self) {
1012        self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIrrelevantRecordCleanup)
1013    }
1014
1015    pub fn add_network_density_sample(&self, distance: KBucketDistance) {
1016        self.send_local_swarm_cmd(LocalSwarmCmd::AddNetworkDensitySample { distance })
1017    }
1018
1019    pub fn notify_peer_scores(&self, peer_scores: Vec<(PeerId, bool)>) {
1020        self.send_local_swarm_cmd(LocalSwarmCmd::NotifyPeerScores { peer_scores })
1021    }
1022
1023    pub fn notify_node_version(&self, peer: PeerId, version: String) {
1024        self.send_local_swarm_cmd(LocalSwarmCmd::NotifyPeerVersion { peer, version })
1025    }
1026
1027    /// Helper to send NetworkSwarmCmd
1028    fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) {
1029        send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd);
1030    }
1031
1032    /// Helper to send LocalSwarmCmd
1033    fn send_local_swarm_cmd(&self, cmd: LocalSwarmCmd) {
1034        send_local_swarm_cmd(self.local_swarm_cmd_sender().clone(), cmd);
1035    }
1036
1037    /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
1038    /// If `client` is false, then include `self` among the `closest_peers`
1039    ///
1040    /// If less than CLOSE_GROUP_SIZE peers are found, it will return all the peers.
1041    pub async fn get_all_close_peers_in_range_or_close_group(
1042        &self,
1043        key: &NetworkAddress,
1044        client: bool,
1045    ) -> Result<Vec<PeerId>> {
1046        let pretty_key = PrettyPrintKBucketKey(key.as_kbucket_key());
1047        debug!("Getting the all closest peers in range of {pretty_key:?}");
1048        let (sender, receiver) = oneshot::channel();
1049        self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork {
1050            key: key.clone(),
1051            sender,
1052        });
1053
1054        let found_peers = receiver.await?;
1055
1056        // Count self in if among the CLOSE_GROUP_SIZE closest and sort the result
1057        let result_len = found_peers.len();
1058        let mut closest_peers = found_peers;
1059
1060        // ensure we're not including self here
1061        if client {
1062            // remove our peer id from the calculations here:
1063            closest_peers.retain(|&x| x != self.peer_id());
1064            if result_len != closest_peers.len() {
1065                info!("Remove self client from the closest_peers");
1066            }
1067        }
1068
1069        if tracing::level_enabled!(tracing::Level::DEBUG) {
1070            let close_peers_pretty_print: Vec<_> = closest_peers
1071                .iter()
1072                .map(|peer_id| {
1073                    format!(
1074                        "{peer_id:?}({:?})",
1075                        PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key())
1076                    )
1077                })
1078                .collect();
1079
1080            debug!(
1081                "Network knowledge of closest peers to {pretty_key:?} are: {close_peers_pretty_print:?}"
1082            );
1083        }
1084
1085        let expanded_close_group = CLOSE_GROUP_SIZE + CLOSE_GROUP_SIZE / 2;
1086        let closest_peers = sort_peers_by_address(&closest_peers, key, expanded_close_group)?;
1087        Ok(closest_peers.into_iter().cloned().collect())
1088    }
1089
1090    /// Send a `Request` to the provided set of peers and wait for their responses concurrently.
1091    /// If `get_all_responses` is true, we wait for the responses from all the peers.
1092    /// NB TODO: Will return an error if the request timeouts.
1093    /// If `get_all_responses` is false, we return the first successful response that we get
1094    pub async fn send_and_get_responses(
1095        &self,
1096        peers: &[PeerId],
1097        req: &Request,
1098        get_all_responses: bool,
1099    ) -> BTreeMap<PeerId, Result<Response>> {
1100        debug!("send_and_get_responses for {req:?}");
1101        let mut list_of_futures = peers
1102            .iter()
1103            .map(|peer| {
1104                Box::pin(async {
1105                    let resp = self.send_request(req.clone(), *peer).await;
1106                    (*peer, resp)
1107                })
1108            })
1109            .collect::<Vec<_>>();
1110
1111        let mut responses = BTreeMap::new();
1112        while !list_of_futures.is_empty() {
1113            let ((peer, resp), _, remaining_futures) = select_all(list_of_futures).await;
1114            let resp_string = match &resp {
1115                Ok(resp) => format!("{resp}"),
1116                Err(err) => format!("{err:?}"),
1117            };
1118            debug!("Got response from {peer:?} for the req: {req:?}, resp: {resp_string}");
1119            if !get_all_responses && resp.is_ok() {
1120                return BTreeMap::from([(peer, resp)]);
1121            }
1122            responses.insert(peer, resp);
1123            list_of_futures = remaining_futures;
1124        }
1125
1126        debug!("Received all responses for {req:?}");
1127        responses
1128    }
1129}
1130
1131/// Verifies if `Multiaddr` contains IPv4 address that is not global.
1132/// This is used to filter out unroutable addresses from the Kademlia routing table.
1133pub fn multiaddr_is_global(multiaddr: &Multiaddr) -> bool {
1134    !multiaddr.iter().any(|addr| match addr {
1135        Protocol::Ip4(ip) => {
1136            // Based on the nightly `is_global` method (`Ipv4Addrs::is_global`), only using what is available in stable.
1137            // Missing `is_shared`, `is_benchmarking` and `is_reserved`.
1138            ip.is_unspecified()
1139                | ip.is_private()
1140                | ip.is_loopback()
1141                | ip.is_link_local()
1142                | ip.is_documentation()
1143                | ip.is_broadcast()
1144        }
1145        _ => false,
1146    })
1147}
1148
1149/// Pop off the `/p2p/<peer_id>`. This mutates the `Multiaddr` and returns the `PeerId` if it exists.
1150pub(crate) fn multiaddr_pop_p2p(multiaddr: &mut Multiaddr) -> Option<PeerId> {
1151    if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
1152        // Only actually strip the last protocol if it's indeed the peer ID.
1153        let _ = multiaddr.pop();
1154        Some(peer_id)
1155    } else {
1156        None
1157    }
1158}
1159
1160/// Return the last `PeerId` from the `Multiaddr` if it exists.
1161pub(crate) fn multiaddr_get_p2p(multiaddr: &Multiaddr) -> Option<PeerId> {
1162    if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
1163        Some(peer_id)
1164    } else {
1165        None
1166    }
1167}
1168
1169/// Build a `Multiaddr` with the p2p protocol filtered out.
1170/// If it is a relayed address, then the relay's P2P address is preserved.
1171pub(crate) fn multiaddr_strip_p2p(multiaddr: &Multiaddr) -> Multiaddr {
1172    let is_relayed = multiaddr.iter().any(|p| matches!(p, Protocol::P2pCircuit));
1173
1174    if is_relayed {
1175        // Do not add any PeerId after we've found the P2PCircuit protocol. The prior one is the relay's PeerId which
1176        // we should preserve.
1177        let mut before_relay_protocol = true;
1178        let mut new_multi_addr = Multiaddr::empty();
1179        for p in multiaddr.iter() {
1180            if matches!(p, Protocol::P2pCircuit) {
1181                before_relay_protocol = false;
1182            }
1183            if matches!(p, Protocol::P2p(_)) && !before_relay_protocol {
1184                continue;
1185            }
1186            new_multi_addr.push(p);
1187        }
1188        new_multi_addr
1189    } else {
1190        multiaddr
1191            .iter()
1192            .filter(|p| !matches!(p, Protocol::P2p(_)))
1193            .collect()
1194    }
1195}
1196
1197/// Get the `IpAddr` from the `Multiaddr`
1198pub(crate) fn multiaddr_get_ip(addr: &Multiaddr) -> Option<IpAddr> {
1199    addr.iter().find_map(|p| match p {
1200        Protocol::Ip4(addr) => Some(IpAddr::V4(addr)),
1201        Protocol::Ip6(addr) => Some(IpAddr::V6(addr)),
1202        _ => None,
1203    })
1204}
1205
1206pub(crate) fn multiaddr_get_port(addr: &Multiaddr) -> Option<u16> {
1207    addr.iter().find_map(|p| match p {
1208        Protocol::Udp(port) => Some(port),
1209        _ => None,
1210    })
1211}
1212
1213pub(crate) fn send_local_swarm_cmd(swarm_cmd_sender: Sender<LocalSwarmCmd>, cmd: LocalSwarmCmd) {
1214    let capacity = swarm_cmd_sender.capacity();
1215
1216    if capacity == 0 {
1217        error!(
1218            "SwarmCmd channel is full. Await capacity to send: {:?}",
1219            cmd
1220        );
1221    }
1222
1223    // Spawn a task to send the SwarmCmd and keep this fn sync
1224    let _handle = spawn(async move {
1225        if let Err(error) = swarm_cmd_sender.send(cmd).await {
1226            error!("Failed to send SwarmCmd: {}", error);
1227        }
1228    });
1229}
1230
1231pub(crate) fn send_network_swarm_cmd(
1232    swarm_cmd_sender: Sender<NetworkSwarmCmd>,
1233    cmd: NetworkSwarmCmd,
1234) {
1235    let capacity = swarm_cmd_sender.capacity();
1236
1237    if capacity == 0 {
1238        error!(
1239            "SwarmCmd channel is full. Await capacity to send: {:?}",
1240            cmd
1241        );
1242    }
1243
1244    // Spawn a task to send the SwarmCmd and keep this fn sync
1245    let _handle = spawn(async move {
1246        if let Err(error) = swarm_cmd_sender.send(cmd).await {
1247            error!("Failed to send SwarmCmd: {}", error);
1248        }
1249    });
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254    use super::*;
1255
1256    #[tokio::test]
1257    async fn test_network_sign_verify() -> eyre::Result<()> {
1258        let (network, _, _) =
1259            NetworkBuilder::new(Keypair::generate_ed25519(), false, vec![]).build_client();
1260        let msg = b"test message";
1261        let sig = network.sign(msg)?;
1262        assert!(network.verify(msg, &sig));
1263        Ok(())
1264    }
1265}