ant_networking/
lib.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9#[macro_use]
10extern crate tracing;
11
12mod bootstrap;
13mod circular_vec;
14mod cmd;
15mod config;
16mod driver;
17mod error;
18mod event;
19mod external_address;
20mod fifo_register;
21mod graph;
22mod log_markers;
23#[cfg(feature = "open-metrics")]
24mod metrics;
25mod network_discovery;
26mod record_store;
27mod record_store_api;
28mod relay_manager;
29mod replication_fetcher;
30pub mod time;
31mod transport;
32
33use cmd::LocalSwarmCmd;
34use xor_name::XorName;
35
36// re-export arch dependent deps for use in the crate, or above
37pub use self::{
38    cmd::{NodeIssue, SwarmLocalState},
39    config::{GetRecordCfg, PutRecordCfg, ResponseQuorum, RetryStrategy, VerificationKind},
40    driver::{NetworkBuilder, SwarmDriver, MAX_PACKET_SIZE},
41    error::{GetRecordError, NetworkError},
42    event::{MsgResponder, NetworkEvent},
43    graph::get_graph_entry_from_record,
44    record_store::NodeRecordStore,
45};
46#[cfg(feature = "open-metrics")]
47pub use metrics::service::MetricsRegistries;
48pub use time::{interval, sleep, spawn, Instant, Interval};
49
50use self::{cmd::NetworkSwarmCmd, error::Result};
51use ant_evm::{PaymentQuote, QuotingMetrics};
52use ant_protocol::{
53    error::Error as ProtocolError,
54    messages::{ChunkProof, Nonce, Query, QueryResponse, Request, Response},
55    storage::{DataTypes, Pointer, Scratchpad, ValidationType},
56    NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
57};
58use futures::future::select_all;
59use libp2p::{
60    identity::Keypair,
61    kad::{KBucketDistance, KBucketKey, Record, RecordKey},
62    multiaddr::Protocol,
63    request_response::OutboundFailure,
64    Multiaddr, PeerId,
65};
66use rand::Rng;
67use std::{
68    collections::{BTreeMap, HashMap},
69    net::IpAddr,
70    sync::Arc,
71};
72use tokio::sync::{
73    mpsc::{self, Sender},
74    oneshot,
75};
76use tokio::time::Duration;
77use {
78    ant_protocol::storage::GraphEntry,
79    ant_protocol::storage::{
80        try_deserialize_record, try_serialize_record, RecordHeader, RecordKind,
81    },
82    std::collections::HashSet,
83};
84
85/// Majority of a given group (i.e. > 1/2).
86#[inline]
87pub const fn close_group_majority() -> usize {
88    // Calculate the majority of the close group size by dividing it by 2 and adding 1.
89    // This ensures that the majority is always greater than half.
90    CLOSE_GROUP_SIZE / 2 + 1
91}
92
93/// Max duration to wait for verification.
94const MAX_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(750);
95/// Min duration to wait for verification
96const MIN_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(300);
97
98/// Sort the provided peers by their distance to the given `NetworkAddress`.
99/// Return with the closest expected number of entries if has.
100pub fn sort_peers_by_address(
101    peers: Vec<(PeerId, Addresses)>,
102    address: &NetworkAddress,
103    expected_entries: usize,
104) -> Result<Vec<(PeerId, Addresses)>> {
105    sort_peers_by_key(peers, &address.as_kbucket_key(), expected_entries)
106}
107
108/// Sort the provided peers by their distance to the given `KBucketKey`.
109/// Return with the closest expected number of entries if has.
110pub fn sort_peers_by_key<T>(
111    peers: Vec<(PeerId, Addresses)>,
112    key: &KBucketKey<T>,
113    expected_entries: usize,
114) -> Result<Vec<(PeerId, Addresses)>> {
115    // Check if there are enough peers to satisfy the request.
116    // bail early if that's not the case
117    if CLOSE_GROUP_SIZE > peers.len() {
118        warn!("Not enough peers in the k-bucket to satisfy the request");
119        return Err(NetworkError::NotEnoughPeers {
120            found: peers.len(),
121            required: CLOSE_GROUP_SIZE,
122        });
123    }
124
125    // Create a vector of tuples where each tuple is a reference to a peer and its distance to the key.
126    // This avoids multiple computations of the same distance in the sorting process.
127    let mut peer_distances: Vec<(PeerId, Addresses, KBucketDistance)> =
128        Vec::with_capacity(peers.len());
129
130    for (peer_id, addrs) in peers.into_iter() {
131        let addr = NetworkAddress::from_peer(peer_id);
132        let distance = key.distance(&addr.as_kbucket_key());
133        peer_distances.push((peer_id, addrs, distance));
134    }
135
136    // Sort the vector of tuples by the distance.
137    peer_distances.sort_by(|a, b| a.2.cmp(&b.2));
138
139    // Collect the sorted peers into a new vector.
140    let sorted_peers: Vec<(PeerId, Addresses)> = peer_distances
141        .into_iter()
142        .take(expected_entries)
143        .map(|(peer_id, addrs, _)| (peer_id, addrs))
144        .collect();
145
146    Ok(sorted_peers)
147}
148
149/// A list of addresses of a peer in the routing table.
150#[derive(Clone, Debug, Default)]
151pub struct Addresses(pub Vec<Multiaddr>);
152
153#[derive(Clone, Debug)]
154/// API to interact with the underlying Swarm
155pub struct Network {
156    inner: Arc<NetworkInner>,
157}
158
159/// The actual implementation of the Network. The other is just a wrapper around this, so that we don't expose
160/// the Arc from the interface.
161#[derive(Debug)]
162struct NetworkInner {
163    network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
164    local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
165    peer_id: PeerId,
166    keypair: Keypair,
167}
168
169impl Network {
170    pub fn new(
171        network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
172        local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
173        peer_id: PeerId,
174        keypair: Keypair,
175    ) -> Self {
176        Self {
177            inner: Arc::new(NetworkInner {
178                network_swarm_cmd_sender,
179                local_swarm_cmd_sender,
180                peer_id,
181                keypair,
182            }),
183        }
184    }
185
186    /// Returns the `PeerId` of the instance.
187    pub fn peer_id(&self) -> PeerId {
188        self.inner.peer_id
189    }
190
191    /// Returns the `Keypair` of the instance.
192    pub fn keypair(&self) -> &Keypair {
193        &self.inner.keypair
194    }
195
196    /// Get the sender to send a `NetworkSwarmCmd` to the underlying `Swarm`.
197    pub(crate) fn network_swarm_cmd_sender(&self) -> &mpsc::Sender<NetworkSwarmCmd> {
198        &self.inner.network_swarm_cmd_sender
199    }
200    /// Get the sender to send a `LocalSwarmCmd` to the underlying `Swarm`.
201    pub(crate) fn local_swarm_cmd_sender(&self) -> &mpsc::Sender<LocalSwarmCmd> {
202        &self.inner.local_swarm_cmd_sender
203    }
204
205    /// Signs the given data with the node's keypair.
206    pub fn sign(&self, msg: &[u8]) -> Result<Vec<u8>> {
207        self.keypair().sign(msg).map_err(NetworkError::from)
208    }
209
210    /// Verifies a signature for the given data and the node's public key.
211    pub fn verify(&self, msg: &[u8], sig: &[u8]) -> bool {
212        self.keypair().public().verify(msg, sig)
213    }
214
215    /// Returns the protobuf serialised PublicKey to allow messaging out for share.
216    pub fn get_pub_key(&self) -> Vec<u8> {
217        self.keypair().public().encode_protobuf()
218    }
219
220    /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
221    /// Excludes the client's `PeerId` while calculating the closest peers.
222    pub async fn client_get_all_close_peers_in_range_or_close_group(
223        &self,
224        key: &NetworkAddress,
225    ) -> Result<Vec<(PeerId, Addresses)>> {
226        self.get_all_close_peers_in_range_or_close_group(key, true)
227            .await
228    }
229
230    /// Returns the closest peers to the given `NetworkAddress`, sorted by their distance to the key.
231    ///
232    /// Includes our node's `PeerId` while calculating the closest peers.
233    pub async fn node_get_closest_peers(
234        &self,
235        key: &NetworkAddress,
236    ) -> Result<Vec<(PeerId, Addresses)>> {
237        self.get_all_close_peers_in_range_or_close_group(key, false)
238            .await
239    }
240
241    /// Returns a list of peers in local RT and their correspondent Multiaddr.
242    /// Does not include self
243    pub async fn get_local_peers_with_multiaddr(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>> {
244        let (sender, receiver) = oneshot::channel();
245        self.send_local_swarm_cmd(LocalSwarmCmd::GetPeersWithMultiaddr { sender });
246        receiver
247            .await
248            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
249    }
250
251    /// Returns a map where each key is the ilog2 distance of that Kbucket
252    /// and each value is a vector of peers in that bucket.
253    /// Does not include self
254    pub async fn get_kbuckets(&self) -> Result<BTreeMap<u32, Vec<PeerId>>> {
255        let (sender, receiver) = oneshot::channel();
256        self.send_local_swarm_cmd(LocalSwarmCmd::GetKBuckets { sender });
257        receiver
258            .await
259            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
260    }
261
262    /// Returns all the PeerId from all the KBuckets from our local Routing Table
263    /// Also contains our own PeerId.
264    pub async fn get_closest_k_value_local_peers(&self) -> Result<Vec<(PeerId, Addresses)>> {
265        let (sender, receiver) = oneshot::channel();
266        self.send_local_swarm_cmd(LocalSwarmCmd::GetClosestKLocalPeers { sender });
267
268        receiver
269            .await
270            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
271    }
272
273    /// Returns X close peers to the target.
274    /// Note: self is not included
275    pub async fn get_close_peers_to_the_target(
276        &self,
277        key: NetworkAddress,
278        num_of_peers: usize,
279    ) -> Result<Vec<(PeerId, Addresses)>> {
280        let (sender, receiver) = oneshot::channel();
281        self.send_local_swarm_cmd(LocalSwarmCmd::GetCloseLocalPeersToTarget {
282            key,
283            num_of_peers,
284            sender,
285        });
286
287        receiver
288            .await
289            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
290    }
291
292    /// Get the Chunk existence proof from the close nodes to the provided chunk address.
293    /// This is to be used by client only to verify the success of the upload.
294    pub async fn verify_chunk_existence(
295        &self,
296        chunk_address: NetworkAddress,
297        nonce: Nonce,
298        expected_proof: ChunkProof,
299        quorum: ResponseQuorum,
300        retry_strategy: RetryStrategy,
301    ) -> Result<()> {
302        let total_attempts = retry_strategy.attempts();
303
304        let pretty_key = PrettyPrintRecordKey::from(&chunk_address.to_record_key()).into_owned();
305        let expected_n_verified = quorum.get_value();
306
307        let mut close_nodes = Vec::new();
308        let mut retry_attempts = 0;
309        while retry_attempts < total_attempts {
310            // the check should happen before incrementing retry_attempts
311            if retry_attempts % 2 == 0 {
312                // Do not query the closest_peers during every re-try attempt.
313                // The close_nodes don't change often and the previous set of close_nodes might be taking a while to write
314                // the Chunk, so query them again incase of a failure.
315                close_nodes = self
316                    .client_get_all_close_peers_in_range_or_close_group(&chunk_address)
317                    .await?;
318            }
319            retry_attempts += 1;
320            info!(
321                "Getting ChunkProof for {pretty_key:?}. Attempts: {retry_attempts:?}/{total_attempts:?}",
322            );
323
324            let request = Request::Query(Query::GetChunkExistenceProof {
325                key: chunk_address.clone(),
326                nonce,
327                difficulty: 1,
328            });
329            let responses = self
330                .send_and_get_responses(&close_nodes, &request, true)
331                .await;
332            let n_verified = responses
333                .into_iter()
334                .filter_map(|(peer, resp)| {
335                    if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(proofs))) =
336                        resp
337                    {
338                        if proofs.is_empty() {
339                            warn!("Failed to verify the ChunkProof from {peer:?}. Returned proof is empty.");
340                            None
341                        } else if let Ok(ref proof) = proofs[0].1 {
342                            if expected_proof.verify(proof) {
343                                debug!("Got a valid ChunkProof from {peer:?}");
344                                Some(())
345                            } else {
346                                warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?");
347                                None
348                            }
349                        } else {
350                            warn!("Failed to verify the ChunkProof from {peer:?}, returned with error {:?}", proofs[0].1);
351                            None
352                        }
353                    } else {
354                        debug!("Did not get a valid response for the ChunkProof from {peer:?}");
355                        None
356                    }
357                })
358                .count();
359            debug!("Got {n_verified} verified chunk existence proofs for chunk_address {chunk_address:?}");
360
361            if n_verified >= expected_n_verified {
362                return Ok(());
363            }
364            warn!("The obtained {n_verified} verified proofs did not match the expected {expected_n_verified} verified proofs");
365            // Sleep to avoid firing queries too close to even choke the nodes further.
366            let waiting_time = if retry_attempts == 1 {
367                MIN_WAIT_BEFORE_READING_A_PUT
368            } else {
369                MIN_WAIT_BEFORE_READING_A_PUT + MIN_WAIT_BEFORE_READING_A_PUT
370            };
371            sleep(waiting_time).await;
372        }
373
374        Err(NetworkError::FailedToVerifyChunkProof(
375            chunk_address.clone(),
376        ))
377    }
378
379    /// Get the store costs from the majority of the closest peers to the provided RecordKey.
380    /// Record already exists will have a cost of zero to be returned.
381    ///
382    /// Ignore the quote from any peers from `ignore_peers`.
383    /// This is useful if we want to repay a different PeerId on failure.
384    pub async fn get_store_quote_from_network(
385        &self,
386        record_address: NetworkAddress,
387        data_type: u32,
388        data_size: usize,
389        ignore_peers: Vec<PeerId>,
390    ) -> Result<Vec<(PeerId, PaymentQuote)>> {
391        // The requirement of having at least CLOSE_GROUP_SIZE
392        // close nodes will be checked internally automatically.
393        let mut close_nodes = self
394            .client_get_all_close_peers_in_range_or_close_group(&record_address)
395            .await?;
396        // Filter out results from the ignored peers.
397        close_nodes.retain(|(peer_id, _)| !ignore_peers.contains(peer_id));
398        info!(
399            "For record {record_address:?} quoting {} nodes. ignore_peers is {ignore_peers:?}",
400            close_nodes.len()
401        );
402
403        if close_nodes.is_empty() {
404            error!("Can't get store_cost of {record_address:?}, as all close_nodes are ignored");
405            return Err(NetworkError::NotEnoughPeersForStoreCostRequest);
406        }
407
408        // Client shall decide whether to carry out storage verification or not.
409        let request = Request::Query(Query::GetStoreQuote {
410            key: record_address.clone(),
411            data_type,
412            data_size,
413            nonce: None,
414            difficulty: 0,
415        });
416        let responses = self
417            .send_and_get_responses(&close_nodes, &request, true)
418            .await;
419
420        // consider data to be already paid for if 1/2 of the close nodes already have it
421        let mut peer_already_have_it = 0;
422        let enough_peers_already_have_it = close_nodes.len() / 2;
423
424        let mut peers_returned_error = 0;
425
426        // loop over responses
427        let mut all_quotes = vec![];
428        let mut quotes_to_pay = vec![];
429        for (peer, response) in responses {
430            info!("StoreCostReq for {record_address:?} received response: {response:?}");
431            match response {
432                Ok(Response::Query(QueryResponse::GetStoreQuote {
433                    quote: Ok(quote),
434                    peer_address,
435                    storage_proofs,
436                })) => {
437                    if !storage_proofs.is_empty() {
438                        debug!("Storage proofing during GetStoreQuote to be implemented.");
439                    }
440
441                    // Check the quote itself is valid.
442                    if !quote.check_is_signed_by_claimed_peer(peer) {
443                        warn!("Received invalid quote from {peer_address:?}, {quote:?}");
444                        continue;
445                    }
446
447                    // Check if the returned data type matches the request
448                    if quote.quoting_metrics.data_type != data_type {
449                        warn!("Received invalid quote from {peer_address:?}, {quote:?}. Data type did not match the request.");
450                        continue;
451                    }
452
453                    all_quotes.push((peer_address.clone(), quote.clone()));
454                    quotes_to_pay.push((peer, quote));
455                }
456                Ok(Response::Query(QueryResponse::GetStoreQuote {
457                    quote: Err(ProtocolError::RecordExists(_)),
458                    peer_address,
459                    storage_proofs,
460                })) => {
461                    if !storage_proofs.is_empty() {
462                        debug!("Storage proofing during GetStoreQuote to be implemented.");
463                    }
464                    peer_already_have_it += 1;
465                    info!("Address {record_address:?} was already paid for according to {peer_address:?} ({peer_already_have_it}/{enough_peers_already_have_it})");
466                    if peer_already_have_it >= enough_peers_already_have_it {
467                        info!("Address {record_address:?} was already paid for according to {peer_already_have_it} peers, ending quote request");
468                        return Ok(vec![]);
469                    }
470                }
471                Err(err) => {
472                    error!("Got an error while requesting quote from peer {peer:?}: {err:?}");
473                    peers_returned_error += 1;
474                }
475                _ => {
476                    error!("Got an unexpected response while requesting quote from peer {peer:?}: {response:?}");
477                    peers_returned_error += 1;
478                }
479            }
480        }
481
482        if quotes_to_pay.is_empty() {
483            error!(
484                "Could not fetch any quotes. {} peers returned an error.",
485                peers_returned_error
486            );
487            return Err(NetworkError::NoStoreCostResponses);
488        }
489
490        Ok(quotes_to_pay)
491    }
492
493    /// Get the Record from the network
494    /// Carry out re-attempts if required
495    /// In case a target_record is provided, only return when fetched target.
496    /// Otherwise count it as a failure when all attempts completed.
497    ///
498    /// It also handles the split record error for GraphEntry.
499    pub async fn get_record_from_network(
500        &self,
501        key: RecordKey,
502        cfg: &GetRecordCfg,
503    ) -> Result<Record> {
504        let pretty_key = PrettyPrintRecordKey::from(&key);
505        let mut backoff = cfg.retry_strategy.backoff().into_iter();
506
507        loop {
508            info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",);
509            let (sender, receiver) = oneshot::channel();
510            self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord {
511                key: key.clone(),
512                sender,
513                cfg: cfg.clone(),
514            });
515            let result = match receiver.await {
516                Ok(result) => result,
517                Err(err) => {
518                    error!(
519                        "When fetching record {pretty_key:?}, encountered a channel error {err:?}"
520                    );
521                    // Do not attempt retries.
522                    return Err(NetworkError::InternalMsgChannelDropped);
523                }
524            };
525
526            let err = match result {
527                Ok(record) => {
528                    info!("Record returned: {pretty_key:?}.");
529                    return Ok(record);
530                }
531                Err(err) => err,
532            };
533
534            // log the results
535            match &err {
536                GetRecordError::RecordDoesNotMatch(_) => {
537                    warn!("The returned record does not match target {pretty_key:?}.");
538                }
539                GetRecordError::NotEnoughCopies { expected, got, .. } => {
540                    warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}.");
541                }
542                // libp2p RecordNotFound does mean no holders answered.
543                // it does not actually mean the record does not exist.
544                // just that those asked did not have it
545                GetRecordError::RecordNotFound => {
546                    warn!("No holder of record '{pretty_key:?}' found.");
547                }
548                // This is returned during SplitRecordError, we should not get this error here.
549                GetRecordError::RecordKindMismatch => {
550                    error!("Record kind mismatch for {pretty_key:?}. This error should not happen here.");
551                }
552                GetRecordError::SplitRecord { result_map } => {
553                    error!("Encountered a split record for {pretty_key:?}.");
554                    if let Some(record) = Self::handle_split_record_error(result_map, &key)? {
555                        info!("Merged the split record for {pretty_key:?}, into a single record");
556                        return Ok(record);
557                    }
558                }
559                GetRecordError::QueryTimeout => {
560                    error!("Encountered query timeout for {pretty_key:?}.");
561                }
562            }
563
564            match backoff.next() {
565                Some(Some(duration)) => {
566                    crate::time::sleep(duration).await;
567                    debug!("Getting record from network of {pretty_key:?} via backoff...");
568                }
569                _ => break Err(err.into()),
570            }
571        }
572    }
573
574    /// Handle the split record error.
575    fn handle_split_record_error(
576        result_map: &HashMap<XorName, (Record, HashSet<PeerId>)>,
577        key: &RecordKey,
578    ) -> std::result::Result<Option<Record>, NetworkError> {
579        let pretty_key = PrettyPrintRecordKey::from(key);
580
581        // attempt to deserialise and accumulate all GraphEntries
582        let results_count = result_map.len();
583        let mut accumulated_graphentries = HashSet::new();
584        let mut valid_scratchpad: Option<Scratchpad> = None;
585        let mut valid_pointer: Option<Pointer> = None;
586
587        if results_count > 1 {
588            let mut record_kind = None;
589            info!("For record {pretty_key:?}, we have more than one result returned.");
590            for (record, _) in result_map.values() {
591                let Ok(header) = RecordHeader::from_record(record) else {
592                    continue;
593                };
594                let kind = record_kind.get_or_insert(header.kind);
595                // FIXME: the first record dictates the kind, but we should check all records are of the same kind.
596                // And somehow discard the incorrect ones.
597                if *kind != header.kind {
598                    error!("Encountered a split record for {pretty_key:?} with different RecordHeaders. Expected {kind:?} but got {:?}. Skipping",header.kind);
599                    continue;
600                }
601
602                match kind {
603                    RecordKind::DataOnly(DataTypes::Chunk) | RecordKind::DataWithPayment(_) => {
604                        error!("Encountered a split record for {pretty_key:?} with unexpected RecordKind {kind:?}, skipping.");
605                        continue;
606                    }
607                    RecordKind::DataOnly(DataTypes::GraphEntry) => {
608                        match get_graph_entry_from_record(record) {
609                            Ok(graphentries) => {
610                                accumulated_graphentries.extend(graphentries);
611                                info!("For record {pretty_key:?}, we have a split record for a GraphEntry. Accumulating GraphEntry: {}", accumulated_graphentries.len());
612                            }
613                            Err(_) => {
614                                warn!("Failed to deserialize GraphEntry for {pretty_key:?}, skipping accumulation");
615                                continue;
616                            }
617                        }
618                    }
619                    RecordKind::DataOnly(DataTypes::Pointer) => {
620                        info!("For record {pretty_key:?}, we have a split record for a pointer. Selecting the one with the highest count");
621                        let Ok(pointer) = try_deserialize_record::<Pointer>(record) else {
622                            error!(
623                                "Failed to deserialize pointer {pretty_key}. Skipping accumulation"
624                            );
625                            continue;
626                        };
627
628                        if !pointer.verify_signature() {
629                            warn!("Rejecting Pointer for {pretty_key} PUT with invalid signature");
630                            continue;
631                        }
632
633                        if let Some(old) = &valid_pointer {
634                            if old.counter() >= pointer.counter() {
635                                info!("Rejecting Pointer for {pretty_key} with lower count than the previous one");
636                                continue;
637                            }
638                        }
639                        valid_pointer = Some(pointer);
640                    }
641                    RecordKind::DataOnly(DataTypes::Scratchpad) => {
642                        info!("For record {pretty_key:?}, we have a split record for a scratchpad. Selecting the one with the highest count");
643                        let Ok(scratchpad) = try_deserialize_record::<Scratchpad>(record) else {
644                            error!(
645                                "Failed to deserialize scratchpad {pretty_key}. Skipping accumulation"
646                            );
647                            continue;
648                        };
649
650                        if !scratchpad.verify_signature() {
651                            warn!(
652                                "Rejecting Scratchpad for {pretty_key} PUT with invalid signature"
653                            );
654                            continue;
655                        }
656
657                        if let Some(old) = &valid_scratchpad {
658                            if old.counter() >= scratchpad.counter() {
659                                info!("Rejecting Scratchpad for {pretty_key} with lower count than the previous one");
660                                continue;
661                            }
662                        }
663                        valid_scratchpad = Some(scratchpad);
664                    }
665                }
666            }
667        }
668
669        // Return the accumulated GraphEntries as a single record
670        if accumulated_graphentries.len() > 1 {
671            info!("For record {pretty_key:?} task found split record for a GraphEntry, accumulated and sending them as a single record");
672            let accumulated_graphentries = accumulated_graphentries
673                .into_iter()
674                .collect::<Vec<GraphEntry>>();
675            let record = Record {
676                key: key.clone(),
677                value: try_serialize_record(&accumulated_graphentries, RecordKind::DataOnly(DataTypes::GraphEntry))
678                    .map_err(|err| {
679                        error!(
680                            "Error while serializing the accumulated GraphEntries for {pretty_key:?}: {err:?}"
681                        );
682                        NetworkError::from(err)
683                    })?
684                    .to_vec(),
685                publisher: None,
686                expires: None,
687            };
688            return Ok(Some(record));
689        } else if let Some(pointer) = valid_pointer {
690            info!("For record {pretty_key:?} task found a valid pointer, returning it.");
691            let record_value =
692                try_serialize_record(&pointer, RecordKind::DataOnly(DataTypes::Pointer))
693                    .map_err(|err| {
694                        error!("Error while serializing the pointer for {pretty_key:?}: {err:?}");
695                        NetworkError::from(err)
696                    })?
697                    .to_vec();
698
699            let record = Record {
700                key: key.clone(),
701                value: record_value,
702                publisher: None,
703                expires: None,
704            };
705            return Ok(Some(record));
706        } else if let Some(scratchpad) = valid_scratchpad {
707            info!("For record {pretty_key:?} task found a valid scratchpad, returning it.");
708            let record_value =
709                try_serialize_record(&scratchpad, RecordKind::DataOnly(DataTypes::Scratchpad))
710                    .map_err(|err| {
711                        error!(
712                            "Error while serializing the scratchpad for {pretty_key:?}: {err:?}"
713                        );
714                        NetworkError::from(err)
715                    })?
716                    .to_vec();
717
718            let record = Record {
719                key: key.clone(),
720                value: record_value,
721                publisher: None,
722                expires: None,
723            };
724            return Ok(Some(record));
725        }
726        Ok(None)
727    }
728
729    /// Get the quoting metrics for storing the next record from the network
730    pub async fn get_local_quoting_metrics(
731        &self,
732        key: RecordKey,
733        data_type: u32,
734        data_size: usize,
735    ) -> Result<(QuotingMetrics, bool)> {
736        let (sender, receiver) = oneshot::channel();
737        self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalQuotingMetrics {
738            key,
739            data_type,
740            data_size,
741            sender,
742        });
743
744        let quoting_metrics = receiver
745            .await
746            .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
747        Ok(quoting_metrics)
748    }
749
750    /// Notify the node receicced a payment.
751    pub fn notify_payment_received(&self) {
752        self.send_local_swarm_cmd(LocalSwarmCmd::PaymentReceived);
753    }
754
755    /// Get `Record` from the local RecordStore
756    pub async fn get_local_record(&self, key: &RecordKey) -> Result<Option<Record>> {
757        let (sender, receiver) = oneshot::channel();
758        self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalRecord {
759            key: key.clone(),
760            sender,
761        });
762
763        receiver
764            .await
765            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
766    }
767
768    /// Whether the target peer is considered blacklisted by self
769    pub async fn is_peer_shunned(&self, target: NetworkAddress) -> Result<bool> {
770        let (sender, receiver) = oneshot::channel();
771        self.send_local_swarm_cmd(LocalSwarmCmd::IsPeerShunned { target, sender });
772
773        receiver
774            .await
775            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
776    }
777
778    /// Put `Record` to network
779    /// Optionally verify the record is stored after putting it to network
780    /// If verify is on, we retry.
781    pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
782        let pretty_key = PrettyPrintRecordKey::from(&record.key);
783        let mut backoff = cfg.retry_strategy.backoff().into_iter();
784
785        loop {
786            info!(
787                "Attempting to PUT record with key: {pretty_key:?} to network, with cfg {cfg:?}, retrying via backoff..."
788            );
789
790            let err = match self.put_record_once(record.clone(), cfg).await {
791                Ok(_) => break Ok(()),
792                Err(err) => err,
793            };
794
795            // FIXME: Skip if we get a permanent error during verification
796            warn!("Failed to PUT record with key: {pretty_key:?} to network (retry via backoff) with error: {err:?}");
797
798            match backoff.next() {
799                Some(Some(duration)) => {
800                    crate::time::sleep(duration).await;
801                }
802                _ => break Err(err),
803            }
804        }
805    }
806
807    async fn put_record_once(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
808        let record_key = record.key.clone();
809        let pretty_key = PrettyPrintRecordKey::from(&record_key);
810        info!(
811            "Putting record of {} - length {:?} to network",
812            pretty_key,
813            record.value.len()
814        );
815
816        // Waiting for a response to avoid flushing to network too quick that causing choke
817        let (sender, receiver) = oneshot::channel();
818        if let Some(put_record_to_peers) = &cfg.use_put_record_to {
819            self.send_network_swarm_cmd(NetworkSwarmCmd::PutRecordTo {
820                peers: put_record_to_peers.clone(),
821                record: record.clone(),
822                sender,
823                quorum: cfg.put_quorum,
824            });
825        } else {
826            self.send_network_swarm_cmd(NetworkSwarmCmd::PutRecord {
827                record: record.clone(),
828                sender,
829                quorum: cfg.put_quorum,
830            });
831        }
832
833        let response = receiver.await?;
834
835        if let Some((verification_kind, get_cfg)) = &cfg.verification {
836            // Generate a random duration between MAX_WAIT_BEFORE_READING_A_PUT and MIN_WAIT_BEFORE_READING_A_PUT
837            let wait_duration = rand::thread_rng()
838                .gen_range(MIN_WAIT_BEFORE_READING_A_PUT..MAX_WAIT_BEFORE_READING_A_PUT);
839            // Small wait before we attempt to verify.
840            // There will be `re-attempts` to be carried out within the later step anyway.
841            sleep(wait_duration).await;
842            debug!("Attempting to verify {pretty_key:?} after we've slept for {wait_duration:?}");
843
844            // Verify the record is stored, requiring re-attempts
845            if let VerificationKind::ChunkProof {
846                expected_proof,
847                nonce,
848            } = verification_kind
849            {
850                self.verify_chunk_existence(
851                    NetworkAddress::from_record_key(&record_key),
852                    *nonce,
853                    expected_proof.clone(),
854                    get_cfg.get_quorum,
855                    get_cfg.retry_strategy,
856                )
857                .await?;
858            } else {
859                match self
860                    .get_record_from_network(record.key.clone(), get_cfg)
861                    .await
862                {
863                    Ok(_) => {
864                        debug!("Record {pretty_key:?} verified to be stored.");
865                    }
866                    Err(NetworkError::GetRecordError(GetRecordError::RecordNotFound)) => {
867                        warn!("Record {pretty_key:?} not found after PUT, either rejected or not yet stored by nodes when we asked");
868                        return Err(NetworkError::RecordNotStoredByNodes(
869                            NetworkAddress::from_record_key(&record_key),
870                        ));
871                    }
872                    Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { .. }))
873                        if matches!(verification_kind, VerificationKind::Crdt) =>
874                    {
875                        warn!("Record {pretty_key:?} is split, which is okay since we're dealing with CRDTs");
876                    }
877                    Err(e) => {
878                        debug!(
879                            "Failed to verify record {pretty_key:?} to be stored with error: {e:?}"
880                        );
881                        return Err(e);
882                    }
883                }
884            }
885        }
886        response
887    }
888
889    /// Notify ReplicationFetch a fetch attempt is completed.
890    /// (but it won't trigger any real writes to disk)
891    pub fn notify_fetch_completed(&self, key: RecordKey, record_type: ValidationType) {
892        self.send_local_swarm_cmd(LocalSwarmCmd::FetchCompleted((key, record_type)))
893    }
894
895    /// Put `Record` to the local RecordStore
896    /// Must be called after the validations are performed on the Record
897    pub fn put_local_record(&self, record: Record, is_client_put: bool) {
898        debug!(
899            "Writing Record locally, for {:?} - length {:?}",
900            PrettyPrintRecordKey::from(&record.key),
901            record.value.len()
902        );
903        self.send_local_swarm_cmd(LocalSwarmCmd::PutLocalRecord {
904            record,
905            is_client_put,
906        })
907    }
908
909    /// Returns true if a RecordKey is present locally in the RecordStore
910    pub async fn is_record_key_present_locally(&self, key: &RecordKey) -> Result<bool> {
911        let (sender, receiver) = oneshot::channel();
912        self.send_local_swarm_cmd(LocalSwarmCmd::RecordStoreHasKey {
913            key: key.clone(),
914            sender,
915        });
916
917        let is_present = receiver
918            .await
919            .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
920
921        Ok(is_present)
922    }
923
924    /// Returns the Addresses of all the locally stored Records
925    pub async fn get_all_local_record_addresses(
926        &self,
927    ) -> Result<HashMap<NetworkAddress, ValidationType>> {
928        let (sender, receiver) = oneshot::channel();
929        self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender });
930
931        let addrs = receiver
932            .await
933            .map_err(|_e| NetworkError::InternalMsgChannelDropped)??;
934        Ok(addrs)
935    }
936
937    /// Send `Request` to the given `PeerId` and await for the response. If `self` is the recipient,
938    /// then the `Request` is forwarded to itself and handled, and a corresponding `Response` is created
939    /// and returned to itself. Hence the flow remains the same and there is no branching at the upper
940    /// layers.
941    ///
942    /// If an outbound issue is raised, we retry once more to send the request before returning an error.
943    pub async fn send_request(
944        &self,
945        req: Request,
946        peer: PeerId,
947        addrs: Addresses,
948    ) -> Result<Response> {
949        let (sender, receiver) = oneshot::channel();
950        let req_str = format!("{req:?}");
951        // try to send the request without dialing the peer
952        self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
953            req: req.clone(),
954            peer,
955            addrs: None,
956            sender: Some(sender),
957        });
958        let mut r = receiver.await?;
959
960        if let Err(error) = &r {
961            error!("Error in response: {:?}", error);
962
963            match error {
964                NetworkError::OutboundError(OutboundFailure::Io(_))
965                | NetworkError::OutboundError(OutboundFailure::ConnectionClosed)
966                | NetworkError::OutboundError(OutboundFailure::DialFailure) => {
967                    warn!(
968                        "Outbound failed for {req_str} .. {error:?}, dialing it then re-attempt."
969                    );
970
971                    self.send_network_swarm_cmd(NetworkSwarmCmd::DialPeer {
972                        peer,
973                        addrs: addrs.clone(),
974                    });
975
976                    // Short wait to allow connection re-established.
977                    tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
978
979                    let (sender, receiver) = oneshot::channel();
980                    debug!("Reattempting to send_request {req_str} to {peer:?} by dialing the addrs manually.");
981                    self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
982                        req,
983                        peer,
984                        addrs: Some(addrs),
985                        sender: Some(sender),
986                    });
987
988                    r = receiver.await?;
989                    if let Err(error) = &r {
990                        error!("Reattempt of {req_str} led to an error again (even after dialing). {error:?}");
991                    }
992                }
993                _ => {
994                    // If the record is found, we should log the error and continue
995                    warn!("Error in response for {req_str}: {error:?}",);
996                }
997            }
998        }
999
1000        r
1001    }
1002
1003    /// Send a `Response` through the channel opened by the requester.
1004    pub fn send_response(&self, resp: Response, channel: MsgResponder) {
1005        self.send_network_swarm_cmd(NetworkSwarmCmd::SendResponse { resp, channel })
1006    }
1007
1008    /// Return a `SwarmLocalState` with some information obtained from swarm's local state.
1009    pub async fn get_swarm_local_state(&self) -> Result<SwarmLocalState> {
1010        let (sender, receiver) = oneshot::channel();
1011        self.send_local_swarm_cmd(LocalSwarmCmd::GetSwarmLocalState(sender));
1012        let state = receiver.await?;
1013        Ok(state)
1014    }
1015
1016    pub fn trigger_interval_replication(&self) {
1017        self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIntervalReplication)
1018    }
1019
1020    pub fn add_fresh_records_to_the_replication_fetcher(
1021        &self,
1022        holder: NetworkAddress,
1023        keys: Vec<(NetworkAddress, ValidationType)>,
1024    ) {
1025        self.send_local_swarm_cmd(LocalSwarmCmd::AddFreshReplicateRecords { holder, keys })
1026    }
1027
1028    pub fn record_node_issues(&self, peer_id: PeerId, issue: NodeIssue) {
1029        self.send_local_swarm_cmd(LocalSwarmCmd::RecordNodeIssue { peer_id, issue });
1030    }
1031
1032    pub fn historical_verify_quotes(&self, quotes: Vec<(PeerId, PaymentQuote)>) {
1033        self.send_local_swarm_cmd(LocalSwarmCmd::QuoteVerification { quotes });
1034    }
1035
1036    pub fn trigger_irrelevant_record_cleanup(&self) {
1037        self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIrrelevantRecordCleanup)
1038    }
1039
1040    pub fn add_network_density_sample(&self, distance: KBucketDistance) {
1041        self.send_local_swarm_cmd(LocalSwarmCmd::AddNetworkDensitySample { distance })
1042    }
1043
1044    pub fn notify_peer_scores(&self, peer_scores: Vec<(PeerId, bool)>) {
1045        self.send_local_swarm_cmd(LocalSwarmCmd::NotifyPeerScores { peer_scores })
1046    }
1047
1048    pub fn notify_node_version(&self, peer: PeerId, version: String) {
1049        self.send_local_swarm_cmd(LocalSwarmCmd::NotifyPeerVersion { peer, version })
1050    }
1051
1052    /// Helper to send NetworkSwarmCmd
1053    fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) {
1054        send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd);
1055    }
1056
1057    /// Helper to send LocalSwarmCmd
1058    fn send_local_swarm_cmd(&self, cmd: LocalSwarmCmd) {
1059        send_local_swarm_cmd(self.local_swarm_cmd_sender().clone(), cmd);
1060    }
1061
1062    /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
1063    /// If `client` is false, then include `self` among the `closest_peers`
1064    ///
1065    /// If less than CLOSE_GROUP_SIZE peers are found, it will return all the peers.
1066    pub async fn get_all_close_peers_in_range_or_close_group(
1067        &self,
1068        key: &NetworkAddress,
1069        client: bool,
1070    ) -> Result<Vec<(PeerId, Addresses)>> {
1071        let pretty_key = PrettyPrintKBucketKey(key.as_kbucket_key());
1072        debug!("Getting the all closest peers in range of {pretty_key:?}");
1073        let (sender, receiver) = oneshot::channel();
1074        self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork {
1075            key: key.clone(),
1076            sender,
1077        });
1078
1079        let found_peers = receiver.await?;
1080
1081        // Count self in if among the CLOSE_GROUP_SIZE closest and sort the result
1082        let result_len = found_peers.len();
1083        let mut closest_peers = found_peers;
1084
1085        // ensure we're not including self here
1086        if client {
1087            // remove our peer id from the calculations here:
1088            closest_peers.retain(|&(x, _)| x != self.peer_id());
1089            if result_len != closest_peers.len() {
1090                info!("Remove self client from the closest_peers");
1091            }
1092        }
1093
1094        if tracing::level_enabled!(tracing::Level::DEBUG) {
1095            let close_peers_pretty_print: Vec<_> = closest_peers
1096                .iter()
1097                .map(|(peer_id, _)| {
1098                    format!(
1099                        "{peer_id:?}({:?})",
1100                        PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key())
1101                    )
1102                })
1103                .collect();
1104
1105            debug!(
1106                "Network knowledge of closest peers to {pretty_key:?} are: {close_peers_pretty_print:?}"
1107            );
1108        }
1109
1110        let expanded_close_group = CLOSE_GROUP_SIZE + CLOSE_GROUP_SIZE / 2;
1111        let closest_peers = sort_peers_by_address(closest_peers, key, expanded_close_group)?;
1112        Ok(closest_peers)
1113    }
1114
1115    /// Send a `Request` to the provided set of peers and wait for their responses concurrently.
1116    /// If `get_all_responses` is true, we wait for the responses from all the peers.
1117    /// NB TODO: Will return an error if the request timeouts.
1118    /// If `get_all_responses` is false, we return the first successful response that we get
1119    pub async fn send_and_get_responses(
1120        &self,
1121        peers: &[(PeerId, Addresses)],
1122        req: &Request,
1123        get_all_responses: bool,
1124    ) -> BTreeMap<PeerId, Result<Response>> {
1125        debug!("send_and_get_responses for {req:?}");
1126        let mut list_of_futures = peers
1127            .iter()
1128            .map(|(peer, addrs)| {
1129                Box::pin(async {
1130                    let resp = self.send_request(req.clone(), *peer, addrs.clone()).await;
1131                    (*peer, resp)
1132                })
1133            })
1134            .collect::<Vec<_>>();
1135
1136        let mut responses = BTreeMap::new();
1137        while !list_of_futures.is_empty() {
1138            let ((peer, resp), _, remaining_futures) = select_all(list_of_futures).await;
1139            let resp_string = match &resp {
1140                Ok(resp) => format!("{resp}"),
1141                Err(err) => format!("{err:?}"),
1142            };
1143            debug!("Got response from {peer:?} for the req: {req:?}, resp: {resp_string}");
1144            if !get_all_responses && resp.is_ok() {
1145                return BTreeMap::from([(peer, resp)]);
1146            }
1147            responses.insert(peer, resp);
1148            list_of_futures = remaining_futures;
1149        }
1150
1151        debug!("Received all responses for {req:?}");
1152        responses
1153    }
1154}
1155
1156/// Verifies if `Multiaddr` contains IPv4 address that is not global.
1157/// This is used to filter out unroutable addresses from the Kademlia routing table.
1158pub fn multiaddr_is_global(multiaddr: &Multiaddr) -> bool {
1159    !multiaddr.iter().any(|addr| match addr {
1160        Protocol::Ip4(ip) => {
1161            // Based on the nightly `is_global` method (`Ipv4Addrs::is_global`), only using what is available in stable.
1162            // Missing `is_shared`, `is_benchmarking` and `is_reserved`.
1163            ip.is_unspecified()
1164                | ip.is_private()
1165                | ip.is_loopback()
1166                | ip.is_link_local()
1167                | ip.is_documentation()
1168                | ip.is_broadcast()
1169        }
1170        _ => false,
1171    })
1172}
1173
1174/// Pop off the `/p2p/<peer_id>`. This mutates the `Multiaddr` and returns the `PeerId` if it exists.
1175pub(crate) fn multiaddr_pop_p2p(multiaddr: &mut Multiaddr) -> Option<PeerId> {
1176    if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
1177        // Only actually strip the last protocol if it's indeed the peer ID.
1178        let _ = multiaddr.pop();
1179        Some(peer_id)
1180    } else {
1181        None
1182    }
1183}
1184
1185/// Return the last `PeerId` from the `Multiaddr` if it exists.
1186pub(crate) fn multiaddr_get_p2p(multiaddr: &Multiaddr) -> Option<PeerId> {
1187    if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
1188        Some(peer_id)
1189    } else {
1190        None
1191    }
1192}
1193
1194/// Build a `Multiaddr` with the p2p protocol filtered out.
1195/// If it is a relayed address, then the relay's P2P address is preserved.
1196pub(crate) fn multiaddr_strip_p2p(multiaddr: &Multiaddr) -> Multiaddr {
1197    let is_relayed = multiaddr.iter().any(|p| matches!(p, Protocol::P2pCircuit));
1198
1199    if is_relayed {
1200        // Do not add any PeerId after we've found the P2PCircuit protocol. The prior one is the relay's PeerId which
1201        // we should preserve.
1202        let mut before_relay_protocol = true;
1203        let mut new_multi_addr = Multiaddr::empty();
1204        for p in multiaddr.iter() {
1205            if matches!(p, Protocol::P2pCircuit) {
1206                before_relay_protocol = false;
1207            }
1208            if matches!(p, Protocol::P2p(_)) && !before_relay_protocol {
1209                continue;
1210            }
1211            new_multi_addr.push(p);
1212        }
1213        new_multi_addr
1214    } else {
1215        multiaddr
1216            .iter()
1217            .filter(|p| !matches!(p, Protocol::P2p(_)))
1218            .collect()
1219    }
1220}
1221
1222/// Get the `IpAddr` from the `Multiaddr`
1223pub(crate) fn multiaddr_get_ip(addr: &Multiaddr) -> Option<IpAddr> {
1224    addr.iter().find_map(|p| match p {
1225        Protocol::Ip4(addr) => Some(IpAddr::V4(addr)),
1226        Protocol::Ip6(addr) => Some(IpAddr::V6(addr)),
1227        _ => None,
1228    })
1229}
1230
1231pub(crate) fn multiaddr_get_port(addr: &Multiaddr) -> Option<u16> {
1232    addr.iter().find_map(|p| match p {
1233        Protocol::Udp(port) => Some(port),
1234        _ => None,
1235    })
1236}
1237
1238pub(crate) fn send_local_swarm_cmd(swarm_cmd_sender: Sender<LocalSwarmCmd>, cmd: LocalSwarmCmd) {
1239    let capacity = swarm_cmd_sender.capacity();
1240
1241    if capacity == 0 {
1242        error!(
1243            "SwarmCmd channel is full. Await capacity to send: {:?}",
1244            cmd
1245        );
1246    }
1247
1248    // Spawn a task to send the SwarmCmd and keep this fn sync
1249    let _handle = spawn(async move {
1250        if let Err(error) = swarm_cmd_sender.send(cmd).await {
1251            error!("Failed to send SwarmCmd: {}", error);
1252        }
1253    });
1254}
1255
1256pub(crate) fn send_network_swarm_cmd(
1257    swarm_cmd_sender: Sender<NetworkSwarmCmd>,
1258    cmd: NetworkSwarmCmd,
1259) {
1260    let capacity = swarm_cmd_sender.capacity();
1261
1262    if capacity == 0 {
1263        error!(
1264            "SwarmCmd channel is full. Await capacity to send: {:?}",
1265            cmd
1266        );
1267    }
1268
1269    // Spawn a task to send the SwarmCmd and keep this fn sync
1270    let _handle = spawn(async move {
1271        if let Err(error) = swarm_cmd_sender.send(cmd).await {
1272            error!("Failed to send SwarmCmd: {}", error);
1273        }
1274    });
1275}
1276
1277#[cfg(test)]
1278mod tests {
1279    use super::*;
1280
1281    #[tokio::test]
1282    async fn test_network_sign_verify() -> eyre::Result<()> {
1283        let (network, _, _) =
1284            NetworkBuilder::new(Keypair::generate_ed25519(), false, vec![]).build_client();
1285        let msg = b"test message";
1286        let sig = network.sign(msg)?;
1287        assert!(network.verify(msg, &sig));
1288        Ok(())
1289    }
1290}