ant_networking/
lib.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9#![allow(clippy::large_enum_variant)]
10#![allow(clippy::result_large_err)]
11
12#[macro_use]
13extern crate tracing;
14
15mod behaviour;
16mod bootstrap;
17mod circular_vec;
18mod cmd;
19mod config;
20mod driver;
21mod error;
22mod event;
23mod external_address;
24mod log_markers;
25#[cfg(feature = "open-metrics")]
26mod metrics;
27mod network_builder;
28mod network_discovery;
29mod record_store;
30mod relay_manager;
31mod replication_fetcher;
32pub mod time;
33mod transport;
34
35use cmd::LocalSwarmCmd;
36
37// re-export arch dependent deps for use in the crate, or above
38pub use self::{
39    cmd::{NodeIssue, SwarmLocalState},
40    config::ResponseQuorum,
41    driver::SwarmDriver,
42    error::NetworkError,
43    event::{MsgResponder, NetworkEvent},
44    network_builder::{NetworkBuilder, MAX_PACKET_SIZE},
45    record_store::NodeRecordStore,
46};
47#[cfg(feature = "open-metrics")]
48pub use metrics::service::MetricsRegistries;
49pub use time::{interval, sleep, spawn, Instant, Interval};
50
51use self::{cmd::NetworkSwarmCmd, error::Result};
52use ant_evm::{PaymentQuote, QuotingMetrics};
53use ant_protocol::{
54    messages::{ConnectionInfo, Request, Response},
55    storage::ValidationType,
56    NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
57};
58use futures::future::select_all;
59use libp2p::{
60    identity::Keypair,
61    kad::{KBucketDistance, KBucketKey, Record, RecordKey, K_VALUE},
62    multiaddr::Protocol,
63    request_response::OutboundFailure,
64    Multiaddr, PeerId,
65};
66use std::{
67    collections::{BTreeMap, HashMap},
68    net::IpAddr,
69    sync::Arc,
70};
71use tokio::sync::{
72    mpsc::{self, Sender},
73    oneshot,
74};
75
76/// Majority of a given group (i.e. > 1/2).
77#[inline]
78pub const fn close_group_majority() -> usize {
79    // Calculate the majority of the close group size by dividing it by 2 and adding 1.
80    // This ensures that the majority is always greater than half.
81    CLOSE_GROUP_SIZE / 2 + 1
82}
83
84/// Sort the provided peers by their distance to the given `KBucketKey`.
85/// Return with the closest expected number of entries it has.
86pub fn sort_peers_by_key<T>(
87    peers: Vec<(PeerId, Addresses)>,
88    key: &KBucketKey<T>,
89    expected_entries: usize,
90) -> Result<Vec<(PeerId, Addresses)>> {
91    // Check if there are enough peers to satisfy the request.
92    // bail early if that's not the case
93    if CLOSE_GROUP_SIZE > peers.len() {
94        warn!("Not enough peers in the k-bucket to satisfy the request");
95        return Err(NetworkError::NotEnoughPeers {
96            found: peers.len(),
97            required: CLOSE_GROUP_SIZE,
98        });
99    }
100
101    // Create a vector of tuples where each tuple is a reference to a peer and its distance to the key.
102    // This avoids multiple computations of the same distance in the sorting process.
103    let mut peer_distances: Vec<(PeerId, Addresses, KBucketDistance)> =
104        Vec::with_capacity(peers.len());
105
106    for (peer_id, addrs) in peers.into_iter() {
107        let addr = NetworkAddress::from(peer_id);
108        let distance = key.distance(&addr.as_kbucket_key());
109        peer_distances.push((peer_id, addrs, distance));
110    }
111
112    // Sort the vector of tuples by the distance.
113    peer_distances.sort_by(|a, b| a.2.cmp(&b.2));
114
115    // Collect the sorted peers into a new vector.
116    let sorted_peers: Vec<(PeerId, Addresses)> = peer_distances
117        .into_iter()
118        .take(expected_entries)
119        .map(|(peer_id, addrs, _)| (peer_id, addrs))
120        .collect();
121
122    Ok(sorted_peers)
123}
124
125/// A list of addresses of a peer in the routing table.
126#[derive(Clone, Debug, Default)]
127pub struct Addresses(pub Vec<Multiaddr>);
128
129#[derive(Clone, Debug)]
130/// API to interact with the underlying Swarm
131pub struct Network {
132    inner: Arc<NetworkInner>,
133}
134
135/// The actual implementation of the Network. The other is just a wrapper around this, so that we don't expose
136/// the Arc from the interface.
137#[derive(Debug)]
138struct NetworkInner {
139    network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
140    local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
141    peer_id: PeerId,
142    keypair: Keypair,
143}
144
145impl Network {
146    pub fn new(
147        network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
148        local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
149        peer_id: PeerId,
150        keypair: Keypair,
151    ) -> Self {
152        Self {
153            inner: Arc::new(NetworkInner {
154                network_swarm_cmd_sender,
155                local_swarm_cmd_sender,
156                peer_id,
157                keypair,
158            }),
159        }
160    }
161
162    /// Returns the `PeerId` of the instance.
163    pub fn peer_id(&self) -> PeerId {
164        self.inner.peer_id
165    }
166
167    /// Returns the `Keypair` of the instance.
168    pub fn keypair(&self) -> &Keypair {
169        &self.inner.keypair
170    }
171
172    /// Get the sender to send a `NetworkSwarmCmd` to the underlying `Swarm`.
173    pub(crate) fn network_swarm_cmd_sender(&self) -> &mpsc::Sender<NetworkSwarmCmd> {
174        &self.inner.network_swarm_cmd_sender
175    }
176    /// Get the sender to send a `LocalSwarmCmd` to the underlying `Swarm`.
177    pub(crate) fn local_swarm_cmd_sender(&self) -> &mpsc::Sender<LocalSwarmCmd> {
178        &self.inner.local_swarm_cmd_sender
179    }
180
181    /// Signs the given data with the node's keypair.
182    pub fn sign(&self, msg: &[u8]) -> Result<Vec<u8>> {
183        self.keypair().sign(msg).map_err(NetworkError::from)
184    }
185
186    /// Verifies a signature for the given data and the node's public key.
187    pub fn verify(&self, msg: &[u8], sig: &[u8]) -> bool {
188        self.keypair().public().verify(msg, sig)
189    }
190
191    /// Returns the protobuf serialised PublicKey to allow messaging out for share.
192    pub fn get_pub_key(&self) -> Vec<u8> {
193        self.keypair().public().encode_protobuf()
194    }
195
196    /// Returns a list of peers in local RT and their correspondent Multiaddr.
197    /// Does not include self
198    pub async fn get_local_peers_with_multiaddr(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>> {
199        let (sender, receiver) = oneshot::channel();
200        self.send_local_swarm_cmd(LocalSwarmCmd::GetPeersWithMultiaddr { sender });
201        receiver
202            .await
203            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
204    }
205
206    /// Returns a map where each key is the ilog2 distance of that Kbucket
207    /// and each value is a vector of peers in that bucket.
208    /// Does not include self
209    pub async fn get_kbuckets(&self) -> Result<BTreeMap<u32, Vec<PeerId>>> {
210        let (sender, receiver) = oneshot::channel();
211        self.send_local_swarm_cmd(LocalSwarmCmd::GetKBuckets { sender });
212        receiver
213            .await
214            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
215    }
216
217    /// Returns K closest local peers to the target.
218    /// Target defaults to self, if not provided.
219    /// Self is always included as the first entry.
220    pub async fn get_k_closest_local_peers_to_the_target(
221        &self,
222        key: Option<NetworkAddress>,
223    ) -> Result<Vec<(PeerId, Addresses)>> {
224        let target = if let Some(target) = key {
225            target
226        } else {
227            NetworkAddress::from(self.peer_id())
228        };
229
230        let (sender, receiver) = oneshot::channel();
231        self.send_local_swarm_cmd(LocalSwarmCmd::GetKCloseLocalPeersToTarget {
232            sender,
233            key: target,
234        });
235
236        receiver
237            .await
238            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
239    }
240
241    /// Get the quoting metrics for storing the next record from the network
242    pub async fn get_local_quoting_metrics(
243        &self,
244        key: RecordKey,
245        data_type: u32,
246        data_size: usize,
247    ) -> Result<(QuotingMetrics, bool)> {
248        let (sender, receiver) = oneshot::channel();
249        self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalQuotingMetrics {
250            key,
251            data_type,
252            data_size,
253            sender,
254        });
255
256        let quoting_metrics = receiver
257            .await
258            .map_err(|_e| NetworkError::InternalMsgChannelDropped)?;
259        Ok(quoting_metrics)
260    }
261
262    /// Notify the node receicced a payment.
263    pub fn notify_payment_received(&self) {
264        self.send_local_swarm_cmd(LocalSwarmCmd::PaymentReceived);
265    }
266
267    /// Get `Record` from the local RecordStore
268    pub async fn get_local_record(&self, key: &RecordKey) -> Result<Option<Record>> {
269        let (sender, receiver) = oneshot::channel();
270        self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalRecord {
271            key: key.clone(),
272            sender,
273        });
274
275        receiver
276            .await
277            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
278    }
279
280    /// Whether the target peer is considered blacklisted by self
281    pub async fn is_peer_shunned(&self, target: NetworkAddress) -> Result<bool> {
282        let (sender, receiver) = oneshot::channel();
283        self.send_local_swarm_cmd(LocalSwarmCmd::IsPeerShunned { target, sender });
284
285        receiver
286            .await
287            .map_err(|_e| NetworkError::InternalMsgChannelDropped)
288    }
289
290    /// Notify ReplicationFetch a fetch attempt is completed.
291    /// (but it won't trigger any real writes to disk)
292    pub fn notify_fetch_completed(&self, key: RecordKey, record_type: ValidationType) {
293        self.send_local_swarm_cmd(LocalSwarmCmd::FetchCompleted((key, record_type)))
294    }
295
296    /// Put `Record` to the local RecordStore
297    /// Must be called after the validations are performed on the Record
298    pub fn put_local_record(&self, record: Record, is_client_put: bool) {
299        debug!(
300            "Writing Record locally, for {:?} - length {:?}",
301            PrettyPrintRecordKey::from(&record.key),
302            record.value.len()
303        );
304        self.send_local_swarm_cmd(LocalSwarmCmd::PutLocalRecord {
305            record,
306            is_client_put,
307        })
308    }
309
310    /// Returns true if a RecordKey is present locally in the RecordStore
311    pub async fn is_record_key_present_locally(&self, key: &RecordKey) -> Result<bool> {
312        let (sender, receiver) = oneshot::channel();
313        self.send_local_swarm_cmd(LocalSwarmCmd::RecordStoreHasKey {
314            key: key.clone(),
315            sender,
316        });
317
318        let is_present = receiver
319            .await
320            .map_err(|_e| NetworkError::InternalMsgChannelDropped)?;
321
322        Ok(is_present)
323    }
324
325    /// Returns the Addresses of all the locally stored Records
326    pub async fn get_all_local_record_addresses(
327        &self,
328    ) -> Result<HashMap<NetworkAddress, ValidationType>> {
329        let (sender, receiver) = oneshot::channel();
330        self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender });
331
332        let addrs = receiver
333            .await
334            .map_err(|_e| NetworkError::InternalMsgChannelDropped)?;
335        Ok(addrs)
336    }
337
338    /// Send `Request` to the given `PeerId` and await for the response. If `self` is the recipient,
339    /// then the `Request` is forwarded to itself and handled, and a corresponding `Response` is created
340    /// and returned to itself. Hence the flow remains the same and there is no branching at the upper
341    /// layers.
342    ///
343    /// If an outbound issue is raised, we retry once more to send the request before returning an error.
344    pub async fn send_request(
345        &self,
346        req: Request,
347        peer: PeerId,
348        addrs: Addresses,
349    ) -> Result<(Response, Option<ConnectionInfo>)> {
350        let (sender, receiver) = oneshot::channel();
351        let req_str = format!("{req:?}");
352        // try to send the request without dialing the peer
353        self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
354            req: req.clone(),
355            peer,
356            addrs: None,
357            sender: Some(sender),
358        });
359        let mut r = receiver.await?;
360
361        if let Err(error) = &r {
362            error!("Error in response: {:?}", error);
363
364            match error {
365                NetworkError::OutboundError(OutboundFailure::Io(_))
366                | NetworkError::OutboundError(OutboundFailure::ConnectionClosed)
367                | NetworkError::OutboundError(OutboundFailure::DialFailure) => {
368                    warn!(
369                        "Outbound failed for {req_str} .. {error:?}, dialing it then re-attempt."
370                    );
371
372                    // Default Addresses will be used for request sent to close range.
373                    // For example: replication requests.
374                    // In that case, we shall get the proper addrs from local then re-dial.
375                    let dial_addrs = if addrs.0.is_empty() {
376                        debug!("Input addrs of {peer:?} is empty, lookup from local");
377                        let (sender, receiver) = oneshot::channel();
378
379                        self.send_local_swarm_cmd(LocalSwarmCmd::GetPeersWithMultiaddr { sender });
380                        let peers = receiver.await?;
381
382                        let Some(new_addrs) = peers
383                            .iter()
384                            .find(|(id, _addrs)| *id == peer)
385                            .map(|(_id, addrs)| addrs.clone())
386                        else {
387                            error!("Cann't find the addrs of peer {peer:?} from local, during the request reattempt of {req:?}.");
388                            return r;
389                        };
390                        Addresses(new_addrs)
391                    } else {
392                        addrs.clone()
393                    };
394
395                    self.send_network_swarm_cmd(NetworkSwarmCmd::DialPeer {
396                        peer,
397                        addrs: dial_addrs.clone(),
398                    });
399
400                    // Short wait to allow connection re-established.
401                    tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
402
403                    let (sender, receiver) = oneshot::channel();
404                    debug!("Reattempting to send_request {req_str} to {peer:?} by dialing the addrs manually.");
405                    self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
406                        req,
407                        peer,
408                        addrs: Some(dial_addrs),
409                        sender: Some(sender),
410                    });
411
412                    r = receiver.await?;
413                    if let Err(error) = &r {
414                        error!("Reattempt of {req_str} led to an error again (even after dialing). {error:?}");
415                    }
416                }
417                _ => {
418                    // If the record is found, we should log the error and continue
419                    warn!("Error in response for {req_str}: {error:?}",);
420                }
421            }
422        }
423
424        r
425    }
426
427    /// Send a `Response` through the channel opened by the requester.
428    pub fn send_response(&self, resp: Response, channel: MsgResponder) {
429        self.send_network_swarm_cmd(NetworkSwarmCmd::SendResponse { resp, channel })
430    }
431
432    /// Return a `SwarmLocalState` with some information obtained from swarm's local state.
433    pub async fn get_swarm_local_state(&self) -> Result<SwarmLocalState> {
434        let (sender, receiver) = oneshot::channel();
435        self.send_local_swarm_cmd(LocalSwarmCmd::GetSwarmLocalState(sender));
436        let state = receiver.await?;
437        Ok(state)
438    }
439
440    pub fn trigger_interval_replication(&self) {
441        self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIntervalReplication)
442    }
443
444    /// To be called when got a fresh record from client uploading.
445    pub fn add_fresh_records_to_the_replication_fetcher(
446        &self,
447        holder: NetworkAddress,
448        keys: Vec<(NetworkAddress, ValidationType)>,
449    ) {
450        self.send_local_swarm_cmd(LocalSwarmCmd::AddFreshReplicateRecords { holder, keys })
451    }
452
453    pub fn record_node_issues(&self, peer_id: PeerId, issue: NodeIssue) {
454        self.send_local_swarm_cmd(LocalSwarmCmd::RecordNodeIssue { peer_id, issue });
455    }
456
457    pub fn historical_verify_quotes(&self, quotes: Vec<(PeerId, PaymentQuote)>) {
458        self.send_local_swarm_cmd(LocalSwarmCmd::QuoteVerification { quotes });
459    }
460
461    pub fn trigger_irrelevant_record_cleanup(&self) {
462        self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIrrelevantRecordCleanup)
463    }
464
465    pub fn notify_peer_scores(&self, peer_scores: Vec<(PeerId, bool)>) {
466        self.send_local_swarm_cmd(LocalSwarmCmd::NotifyPeerScores { peer_scores })
467    }
468
469    pub fn notify_node_version(&self, peer: PeerId, version: String) {
470        self.send_local_swarm_cmd(LocalSwarmCmd::NotifyPeerVersion { peer, version })
471    }
472
473    pub fn remove_peer(&self, peer: PeerId) {
474        self.send_local_swarm_cmd(LocalSwarmCmd::RemovePeer { peer })
475    }
476
477    /// Helper to send NetworkSwarmCmd
478    fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) {
479        send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd);
480    }
481
482    /// Helper to send LocalSwarmCmd
483    fn send_local_swarm_cmd(&self, cmd: LocalSwarmCmd) {
484        send_local_swarm_cmd(self.local_swarm_cmd_sender().clone(), cmd);
485    }
486
487    /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
488    pub async fn get_closest_peers(
489        &self,
490        key: &NetworkAddress,
491    ) -> Result<Vec<(PeerId, Addresses)>> {
492        let pretty_key = PrettyPrintKBucketKey(key.as_kbucket_key());
493        debug!("Getting the all closest peers in range of {pretty_key:?}");
494        let (sender, receiver) = oneshot::channel();
495        self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork {
496            key: key.clone(),
497            sender,
498        });
499
500        let closest_peers = receiver.await?;
501
502        // Error out when fetched result is empty, indicating a timed out network query.
503        if closest_peers.is_empty() {
504            return Err(NetworkError::GetClosestTimedOut);
505        }
506
507        if tracing::level_enabled!(tracing::Level::DEBUG) {
508            let close_peers_pretty_print: Vec<_> = closest_peers
509                .iter()
510                .map(|(peer_id, _)| {
511                    format!(
512                        "{peer_id:?}({:?})",
513                        PrettyPrintKBucketKey(NetworkAddress::from(*peer_id).as_kbucket_key())
514                    )
515                })
516                .collect();
517
518            debug!(
519                "Network knowledge of closest peers to {pretty_key:?} are: {close_peers_pretty_print:?}"
520            );
521        }
522
523        Ok(closest_peers)
524    }
525
526    /// Returns the `n` closest peers to the given `XorName`, sorted by their distance to the xor_name.
527    pub async fn get_n_closest_peers(
528        &self,
529        key: &NetworkAddress,
530        n: usize,
531    ) -> Result<Vec<(PeerId, Addresses)>> {
532        assert!(n <= K_VALUE.get());
533
534        let mut closest_peers = self.get_closest_peers(key).await?;
535
536        // Check if we have enough results
537        if closest_peers.len() < n {
538            return Err(NetworkError::NotEnoughPeers {
539                found: closest_peers.len(),
540                required: n,
541            });
542        }
543
544        // Only need the `n` closest peers
545        closest_peers.truncate(n);
546
547        Ok(closest_peers)
548    }
549
550    /// Send a `Request` to the provided set of peers and wait for their responses concurrently.
551    /// If `get_all_responses` is true, we wait for the responses from all the peers.
552    /// If `get_all_responses` is false, we return the first successful response that we get
553    pub async fn send_and_get_responses(
554        &self,
555        peers: &[(PeerId, Addresses)],
556        req: &Request,
557        get_all_responses: bool,
558    ) -> BTreeMap<PeerId, Result<(Response, Option<ConnectionInfo>)>> {
559        debug!("send_and_get_responses for {req:?}");
560        let mut list_of_futures = peers
561            .iter()
562            .map(|(peer, addrs)| {
563                Box::pin(async {
564                    let resp = self.send_request(req.clone(), *peer, addrs.clone()).await;
565                    (*peer, resp)
566                })
567            })
568            .collect::<Vec<_>>();
569
570        let mut responses = BTreeMap::new();
571        while !list_of_futures.is_empty() {
572            let ((peer, resp), _, remaining_futures) = select_all(list_of_futures).await;
573            let resp_string = match &resp {
574                Ok(resp) => format!("{resp:?}"),
575                Err(err) => format!("{err:?}"),
576            };
577            debug!("Got response from {peer:?} for the req: {req:?}, resp: {resp_string}");
578            if !get_all_responses && resp.is_ok() {
579                return BTreeMap::from([(peer, resp)]);
580            }
581            responses.insert(peer, resp);
582            list_of_futures = remaining_futures;
583        }
584
585        debug!("Received all responses for {req:?}");
586        responses
587    }
588
589    /// Get the estimated network density (i.e. the responsible_distance_range).
590    pub async fn get_network_density(&self) -> Result<Option<KBucketDistance>> {
591        let (sender, receiver) = oneshot::channel();
592        self.send_local_swarm_cmd(LocalSwarmCmd::GetNetworkDensity { sender });
593
594        let density = receiver
595            .await
596            .map_err(|_e| NetworkError::InternalMsgChannelDropped)?;
597        Ok(density)
598    }
599}
600
601/// Verifies if `Multiaddr` contains IPv4 address that is not global.
602/// This is used to filter out unroutable addresses from the Kademlia routing table.
603pub fn multiaddr_is_global(multiaddr: &Multiaddr) -> bool {
604    !multiaddr.iter().any(|addr| match addr {
605        Protocol::Ip4(ip) => {
606            // Based on the nightly `is_global` method (`Ipv4Addrs::is_global`), only using what is available in stable.
607            // Missing `is_shared`, `is_benchmarking` and `is_reserved`.
608            ip.is_unspecified()
609                | ip.is_private()
610                | ip.is_loopback()
611                | ip.is_link_local()
612                | ip.is_documentation()
613                | ip.is_broadcast()
614        }
615        _ => false,
616    })
617}
618
619/// Pop off the `/p2p/<peer_id>`. This mutates the `Multiaddr` and returns the `PeerId` if it exists.
620pub(crate) fn multiaddr_pop_p2p(multiaddr: &mut Multiaddr) -> Option<PeerId> {
621    if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
622        // Only actually strip the last protocol if it's indeed the peer ID.
623        let _ = multiaddr.pop();
624        Some(peer_id)
625    } else {
626        None
627    }
628}
629
630/// Return the last `PeerId` from the `Multiaddr` if it exists.
631pub(crate) fn multiaddr_get_p2p(multiaddr: &Multiaddr) -> Option<PeerId> {
632    if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
633        Some(peer_id)
634    } else {
635        None
636    }
637}
638
639/// Build a `Multiaddr` with the p2p protocol filtered out.
640/// If it is a relayed address, then the relay's P2P address is preserved.
641pub(crate) fn multiaddr_strip_p2p(multiaddr: &Multiaddr) -> Multiaddr {
642    let is_relayed = multiaddr.iter().any(|p| matches!(p, Protocol::P2pCircuit));
643
644    if is_relayed {
645        // Do not add any PeerId after we've found the P2PCircuit protocol. The prior one is the relay's PeerId which
646        // we should preserve.
647        let mut before_relay_protocol = true;
648        let mut new_multi_addr = Multiaddr::empty();
649        for p in multiaddr.iter() {
650            if matches!(p, Protocol::P2pCircuit) {
651                before_relay_protocol = false;
652            }
653            if matches!(p, Protocol::P2p(_)) && !before_relay_protocol {
654                continue;
655            }
656            new_multi_addr.push(p);
657        }
658        new_multi_addr
659    } else {
660        multiaddr
661            .iter()
662            .filter(|p| !matches!(p, Protocol::P2p(_)))
663            .collect()
664    }
665}
666
667/// Craft valid multiaddr like /ip4/68.183.39.80/udp/31055/quic-v1
668/// RelayManager::craft_relay_address for relayed addr. This is for non-relayed addr.
669pub(crate) fn craft_valid_multiaddr_without_p2p(addr: &Multiaddr) -> Option<Multiaddr> {
670    let mut new_multiaddr = Multiaddr::empty();
671    let ip = addr.iter().find_map(|p| match p {
672        Protocol::Ip4(addr) => Some(addr),
673        _ => None,
674    })?;
675    let port = multiaddr_get_port(addr)?;
676
677    new_multiaddr.push(Protocol::Ip4(ip));
678    new_multiaddr.push(Protocol::Udp(port));
679    new_multiaddr.push(Protocol::QuicV1);
680
681    Some(new_multiaddr)
682}
683
684/// Get the `IpAddr` from the `Multiaddr`
685pub(crate) fn multiaddr_get_ip(addr: &Multiaddr) -> Option<IpAddr> {
686    addr.iter().find_map(|p| match p {
687        Protocol::Ip4(addr) => Some(IpAddr::V4(addr)),
688        Protocol::Ip6(addr) => Some(IpAddr::V6(addr)),
689        _ => None,
690    })
691}
692
693pub(crate) fn multiaddr_get_port(addr: &Multiaddr) -> Option<u16> {
694    addr.iter().find_map(|p| match p {
695        Protocol::Udp(port) => Some(port),
696        _ => None,
697    })
698}
699
700pub(crate) fn send_local_swarm_cmd(swarm_cmd_sender: Sender<LocalSwarmCmd>, cmd: LocalSwarmCmd) {
701    let capacity = swarm_cmd_sender.capacity();
702
703    if capacity == 0 {
704        error!(
705            "SwarmCmd channel is full. Await capacity to send: {:?}",
706            cmd
707        );
708    }
709
710    // Spawn a task to send the SwarmCmd and keep this fn sync
711    let _handle = spawn(async move {
712        if let Err(error) = swarm_cmd_sender.send(cmd).await {
713            error!("Failed to send SwarmCmd: {}", error);
714        }
715    });
716}
717
718pub(crate) fn send_network_swarm_cmd(
719    swarm_cmd_sender: Sender<NetworkSwarmCmd>,
720    cmd: NetworkSwarmCmd,
721) {
722    let capacity = swarm_cmd_sender.capacity();
723
724    if capacity == 0 {
725        error!(
726            "SwarmCmd channel is full. Await capacity to send: {:?}",
727            cmd
728        );
729    }
730
731    // Spawn a task to send the SwarmCmd and keep this fn sync
732    let _handle = spawn(async move {
733        if let Err(error) = swarm_cmd_sender.send(cmd).await {
734            error!("Failed to send SwarmCmd: {}", error);
735        }
736    });
737}