Skip to main content

ave_network/
utils.rs

1use crate::{Error, routing::RoutingNode};
2use bytes::Bytes;
3use futures::{StreamExt, stream};
4use ip_network::IpNetwork;
5use libp2p::{
6    Multiaddr, PeerId,
7    identity::{self},
8    multiaddr::Protocol,
9    multihash::Multihash,
10    swarm::ConnectionId,
11};
12use serde::{Deserialize, Deserializer, Serialize};
13use tokio::time::Instant;
14use tokio_util::sync::CancellationToken;
15use tracing::warn;
16
17use std::{
18    cmp::Ordering,
19    collections::{HashMap, HashSet, VecDeque},
20    str::FromStr,
21    time::Duration,
22};
23
24const TARGET: &str = "ave::network::utils";
25pub const NOISE_PROTOCOL: &str = "ave-p2p-v1";
26pub const REQRES_PROTOCOL: &str = "/ave/reqres/1.0.0";
27pub const ROUTING_PROTOCOL: &str = "/ave/routing/1.0.0";
28pub const IDENTIFY_PROTOCOL: &str = "/ave/1.0.0";
29pub const USER_AGENT: &str = "ave/0.8.0";
30pub const MAX_APP_MESSAGE_BYTES: usize = 1024 * 1024; // 1 MiB
31pub const DEFAULT_MAX_PENDING_OUTBOUND_BYTES_PER_PEER: usize = 8 * 1024 * 1024; // 8 MiB
32pub const DEFAULT_MAX_PENDING_INBOUND_BYTES_PER_PEER: usize = 8 * 1024 * 1024; // 8 MiB
33pub const DEFAULT_MAX_PENDING_OUTBOUND_BYTES_TOTAL: usize = 0; // disabled
34pub const DEFAULT_MAX_PENDING_INBOUND_BYTES_TOTAL: usize = 0; // disabled
35
36#[derive(Debug, thiserror::Error)]
37pub enum PeerIdToEd25519Error {
38    #[error(
39        "peer id is not an identity multihash (public key is not recoverable)"
40    )]
41    NotIdentityMultihash,
42    #[error("multihash digest is empty or invalid")]
43    InvalidDigest,
44    #[error("failed to decode protobuf-encoded public key: {0}")]
45    Protobuf(#[from] identity::DecodingError),
46    #[error("public key is not ed25519: {0}")]
47    NotEd25519(#[from] identity::OtherVariantError),
48}
49
50pub fn peer_id_to_ed25519_pubkey_bytes(
51    peer_id: &PeerId,
52) -> Result<[u8; 32], PeerIdToEd25519Error> {
53    // PeerId: AsRef<Multihash<64>>
54    let mh: &Multihash<64> = peer_id.as_ref();
55
56    // multihash identity = 0x00
57    if mh.code() != 0x00 {
58        return Err(PeerIdToEd25519Error::NotIdentityMultihash);
59    }
60
61    let digest = mh.digest();
62    if digest.is_empty() {
63        return Err(PeerIdToEd25519Error::InvalidDigest);
64    }
65
66    // digest == protobuf-encoded public key
67    let pk = identity::PublicKey::try_decode_protobuf(digest)?;
68    let ed_pk = pk.try_into_ed25519()?;
69    Ok(ed_pk.to_bytes())
70}
71
72#[derive(Clone)]
73pub struct LimitsConfig {
74    pub yamux_max_num_streams: usize,
75    pub tcp_listen_backlog: u32,
76    pub tcp_nodelay: bool,
77    pub reqres_max_concurrent_streams: usize,
78    pub reqres_request_timeout: u64,
79    pub identify_cache: usize,
80    pub kademlia_query_timeout: u64,
81    pub conn_limmits_max_pending_incoming: Option<u32>,
82    pub conn_limmits_max_pending_outgoing: Option<u32>,
83    pub conn_limmits_max_established_incoming: Option<u32>,
84    pub conn_limmits_max_established_outgoing: Option<u32>,
85    pub conn_limmits_max_established_per_peer: Option<u32>,
86    pub conn_limmits_max_established_total: Option<u32>,
87}
88
89impl LimitsConfig {
90    /// Build network limits from the total machine RAM and CPU count.
91    ///
92    /// `ram_mb` and `cpu_cores` are **total machine specs** shared by all components
93    /// (DB backends, actor runtime, libp2p, OS).
94    ///
95    /// ## Resource split
96    ///
97    /// - **RAM-driven**: connection counts. Each established connection ≈ 50 KB
98    ///   (TCP state ~20 KB + yamux ~4 KB + Noise ~3 KB + bookkeeping ~3 KB).
99    ///   Budget: 10 % of total RAM. Split: 80 % inbound / 20 % outbound.
100    ///
101    /// - **CPU-driven**: stream concurrency and pending handshakes.
102    ///   Noise handshakes (pending connections) are asymmetric-crypto intensive.
103    ///   ReqRes concurrent streams are tokio tasks — more cores = more parallelism.
104    pub fn build(ram_mb: u64, cpu_cores: usize) -> Self {
105        let cores = cpu_cores.max(1);
106
107        // ── Connection limits (RAM) ──────────────────────────────────────────────
108        let budget_bytes = ram_mb * 1024 * 1024 * 10 / 100;
109        let bytes_per_conn: u64 = 50 * 1024; // ~50 KB per established connection
110
111        // Total connections: floor 50, cap 9 000 (file-descriptor & kernel limits)
112        let max_total =
113            ((budget_bytes / bytes_per_conn) as u32).clamp(50, 9_000);
114
115        // 80 % incoming (nodes are mostly servers), 20 % outgoing
116        let max_incoming = (max_total * 80 / 100).clamp(30, 8_000);
117        let max_outgoing = (max_total * 20 / 100).clamp(20, 1_000);
118
119        // ── Pending connections (CPU) ────────────────────────────────────────────
120        // Each pending connection performs a Noise handshake (X25519 + ChaCha20).
121        // ~64 parallel handshakes per core is a practical bound.
122        let pending_incoming = (max_incoming / 10)
123            .max(10)
124            .min((cores as u32) * 64)
125            .min(512);
126        let pending_outgoing = (max_outgoing / 4).clamp(20, 128);
127
128        // ── Stream concurrency (CPU) ─────────────────────────────────────────────
129        // Each concurrent ReqRes stream is a tokio task. More cores → more tasks
130        // that run in true parallel. ~512 concurrent tasks per core is sensible.
131        let reqres_streams = (cores * 512).clamp(64, 4_096);
132
133        // Yamux per-connection stream limit: must cover the worst case where a
134        // single peer saturates the full ReqRes budget, plus routing/kad overhead.
135        let yamux_streams = (reqres_streams + 64).clamp(256, 8_192);
136
137        // ── TCP listen backlog (kernel-managed) ──────────────────────────────────
138        // Sized for SYN bursts: 1/8 of max_incoming, floor 128, cap 8 192.
139        let tcp_backlog = (max_incoming / 8).clamp(128, 8_192);
140
141        // ── Identify cache (RAM) ─────────────────────────────────────────────────
142        // Metadata for frequently-contacted peers: 1/4 of total, cap 1 024.
143        let identify_cache = ((max_total / 4) as usize).min(1_024);
144
145        Self {
146            yamux_max_num_streams: yamux_streams,
147            tcp_listen_backlog: tcp_backlog,
148            tcp_nodelay: true,
149            reqres_max_concurrent_streams: reqres_streams,
150            reqres_request_timeout: 30,
151            identify_cache,
152            kademlia_query_timeout: 25,
153            conn_limmits_max_pending_incoming: Some(pending_incoming),
154            conn_limmits_max_pending_outgoing: Some(pending_outgoing),
155            conn_limmits_max_established_incoming: Some(max_incoming),
156            conn_limmits_max_established_outgoing: Some(max_outgoing),
157            conn_limmits_max_established_per_peer: Some(2),
158            conn_limmits_max_established_total: Some(max_total),
159        }
160    }
161}
162
163pub enum ScheduleType {
164    Discover,
165    Dial(Vec<Multiaddr>),
166}
167
168#[derive(Copy, Clone, Debug)]
169pub enum Action {
170    Discover,
171    Dial,
172    Identified(ConnectionId),
173}
174
175impl From<RetryKind> for Action {
176    fn from(value: RetryKind) -> Self {
177        match value {
178            RetryKind::Discover => Self::Discover,
179            RetryKind::Dial => Self::Dial,
180        }
181    }
182}
183
184#[derive(Copy, Clone, Debug)]
185pub enum RetryKind {
186    Discover,
187    Dial,
188}
189
190#[derive(Clone, Debug)]
191pub struct RetryState {
192    pub attempts: u8,
193    pub when: Instant,
194    pub kind: RetryKind,
195    pub addrs: Vec<Multiaddr>,
196}
197
198#[derive(Eq, Clone, Debug)]
199pub struct Due(pub PeerId, pub Instant);
200impl PartialEq for Due {
201    fn eq(&self, o: &Self) -> bool {
202        self.1.eq(&o.1)
203    }
204}
205impl Ord for Due {
206    fn cmp(&self, o: &Self) -> Ordering {
207        o.1.cmp(&self.1)
208    }
209}
210impl PartialOrd for Due {
211    fn partial_cmp(&self, o: &Self) -> Option<Ordering> {
212        Some(self.cmp(o))
213    }
214}
215
216/// Network state.
217#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
218pub enum NetworkState {
219    /// Start.
220    Start,
221    /// Dial.
222    Dial,
223    /// Dialing boot node.
224    Dialing,
225    /// Running.
226    Running,
227    /// Disconnected.
228    Disconnected,
229}
230
231pub enum MessagesHelper {
232    Single(Bytes),
233    Vec(VecDeque<Bytes>),
234}
235
236/// Method that update allow and block lists
237async fn request_peer_list(
238    client: reqwest::Client,
239    service: String,
240    request_timeout: Duration,
241    graceful_token: CancellationToken,
242    crash_token: CancellationToken,
243    list_kind: &'static str,
244) -> Option<Vec<String>> {
245    let response = tokio::select! {
246        _ = graceful_token.clone().cancelled_owned() => return None,
247        _ = crash_token.clone().cancelled_owned() => return None,
248        response = client.get(&service).timeout(request_timeout).send() => response,
249    };
250
251    match response {
252        Ok(res) => {
253            if !res.status().is_success() {
254                warn!(
255                    target: TARGET,
256                    list_kind = list_kind,
257                    url = service,
258                    status = %res.status(),
259                    "control-list service returned error status"
260                );
261                return None;
262            }
263
264            let peers = tokio::select! {
265                _ = graceful_token.clone().cancelled_owned() => return None,
266                _ = crash_token.clone().cancelled_owned() => return None,
267                peers = res.json::<Vec<String>>() => peers,
268            };
269
270            match peers {
271                Ok(peers) => Some(peers),
272                Err(e) => {
273                    warn!(
274                        target: TARGET,
275                        list_kind = list_kind,
276                        url = service,
277                        error = %e,
278                        "control-list service returned unexpected body"
279                    );
280                    None
281                }
282            }
283        }
284        Err(e) => {
285            if e.is_timeout() {
286                warn!(
287                    target: TARGET,
288                    list_kind = list_kind,
289                    url = service,
290                    timeout_secs = request_timeout.as_secs_f64(),
291                    "control-list service timed out"
292                );
293            } else {
294                warn!(
295                    target: TARGET,
296                    list_kind = list_kind,
297                    url = service,
298                    error = %e,
299                    "control-list service unreachable"
300                );
301            }
302            None
303        }
304    }
305}
306
307async fn request_peer_lists(
308    client: reqwest::Client,
309    services: Vec<String>,
310    request_timeout: Duration,
311    max_concurrent_requests: usize,
312    graceful_token: CancellationToken,
313    crash_token: CancellationToken,
314    list_kind: &'static str,
315) -> (Vec<String>, u16) {
316    if services.is_empty()
317        || graceful_token.is_cancelled()
318        || crash_token.is_cancelled()
319    {
320        return (vec![], 0);
321    }
322
323    let responses = stream::iter(services.into_iter().map(|service| {
324        let client = client.clone();
325        let graceful_token = graceful_token.clone();
326        let crash_token = crash_token.clone();
327
328        async move {
329            request_peer_list(
330                client.clone(),
331                service,
332                request_timeout,
333                graceful_token,
334                crash_token,
335                list_kind,
336            )
337            .await
338        }
339    }))
340    .buffer_unordered(max_concurrent_requests.max(1))
341    .collect::<Vec<Option<Vec<String>>>>()
342    .await;
343
344    let mut peers = Vec::new();
345    let mut successful = 0u16;
346
347    for item in responses.into_iter().flatten() {
348        peers.extend(item);
349        successful = successful.saturating_add(1);
350    }
351
352    (peers, successful)
353}
354
355pub async fn request_update_lists(
356    client: reqwest::Client,
357    service_allow: Vec<String>,
358    service_block: Vec<String>,
359    request_timeout: Duration,
360    max_concurrent_requests: usize,
361    graceful_token: CancellationToken,
362    crash_token: CancellationToken,
363) -> ((Vec<String>, Vec<String>), (u16, u16)) {
364    let (
365        (vec_allow_peers, successful_allow),
366        (vec_block_peers, successful_block),
367    ) = tokio::join!(
368        request_peer_lists(
369            client.clone(),
370            service_allow,
371            request_timeout,
372            max_concurrent_requests,
373            graceful_token.clone(),
374            crash_token.clone(),
375            "allow"
376        ),
377        request_peer_lists(
378            client,
379            service_block,
380            request_timeout,
381            max_concurrent_requests,
382            graceful_token.clone(),
383            crash_token.clone(),
384            "block"
385        )
386    );
387
388    (
389        (vec_allow_peers, vec_block_peers),
390        (successful_allow, successful_block),
391    )
392}
393
394/// Convert boot nodes to `PeerId` and `Multiaddr`.
395pub fn convert_boot_nodes(
396    boot_nodes: &[RoutingNode],
397) -> HashMap<PeerId, Vec<Multiaddr>> {
398    let mut boot_nodes_aux = HashMap::new();
399
400    for node in boot_nodes {
401        let Ok(peer) = bs58::decode(node.peer_id.clone()).into_vec() else {
402            continue;
403        };
404
405        let Ok(peer) = PeerId::from_bytes(peer.as_slice()) else {
406            continue;
407        };
408
409        let mut aux_addrs = vec![];
410        for addr in node.address.iter() {
411            let Ok(addr) = Multiaddr::from_str(addr) else {
412                continue;
413            };
414
415            aux_addrs.push(addr);
416        }
417
418        if !aux_addrs.is_empty() {
419            boot_nodes_aux.insert(peer, aux_addrs);
420        }
421    }
422
423    boot_nodes_aux
424}
425
426/// Gets the list of external (public) addresses for the node from string array.
427pub fn convert_addresses(
428    addresses: &[String],
429) -> Result<HashSet<Multiaddr>, Error> {
430    let mut addrs = HashSet::new();
431    for address in addresses {
432        if let Some(value) = multiaddr(address) {
433            addrs.insert(value);
434        } else {
435            return Err(Error::InvalidAddress(address.clone()));
436        }
437    }
438    Ok(addrs)
439}
440
441/// Parses a string into a `Multiaddr` if possible.
442fn multiaddr(addr: &str) -> Option<Multiaddr> {
443    addr.parse::<Multiaddr>().ok()
444}
445
446/// Check if the given `Multiaddr` is reachable.
447///
448/// This test is successful only for global IP addresses and DNS names.
449// NB: Currently all DNS names are allowed and no check for TLD suffixes is done
450// because the set of valid domains is highly dynamic and would require frequent
451// updates, for example by utilising publicsuffix.org or IANA.
452#[cfg(not(any(test, feature = "test")))]
453pub fn is_global(addr: &Multiaddr) -> bool {
454    addr.iter().any(|p| match p {
455        Protocol::Ip4(ip) => IpNetwork::from(ip).is_global(),
456        Protocol::Ip6(ip) => IpNetwork::from(ip).is_global(),
457        _ => false,
458    })
459}
460
461#[cfg(not(any(test, feature = "test")))]
462pub fn is_private(addr: &Multiaddr) -> bool {
463    addr.iter().any(|p| match p {
464        Protocol::Ip4(ip) => ip.is_private(),
465        Protocol::Ip6(ip) => ip.is_unique_local(),
466        _ => false,
467    })
468}
469
470#[cfg(not(any(test, feature = "test")))]
471pub fn is_loop_back(addr: &Multiaddr) -> bool {
472    addr.iter().any(|p| match p {
473        Protocol::Ip4(ip) => ip.is_loopback(),
474        Protocol::Ip6(ip) => ip.is_loopback(),
475        _ => false,
476    })
477}
478
479#[cfg(not(any(test, feature = "test")))]
480pub fn is_dns(addr: &Multiaddr) -> bool {
481    addr.iter().any(|p| {
482        matches!(p, Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_))
483    })
484}
485
486/// Chech if the given `Multiaddr` is a memory address.
487#[cfg(not(any(test, feature = "test")))]
488pub fn is_tcp(addr: &Multiaddr) -> bool {
489    addr.iter().any(|p| matches!(p, Protocol::Tcp(_)))
490}
491
492/// The configuration for a `Behaviour` protocol.
493#[derive(Debug, Clone, Deserialize)]
494#[serde(default)]
495pub struct ReqResConfig {
496    /// message timeout
497    #[serde(deserialize_with = "deserialize_duration_secs")]
498    pub message_timeout: Duration,
499    /// max concurrent streams
500    pub max_concurrent_streams: usize,
501}
502
503fn deserialize_duration_secs<'de, D>(
504    deserializer: D,
505) -> Result<Duration, D::Error>
506where
507    D: Deserializer<'de>,
508{
509    let u: u64 = u64::deserialize(deserializer)?;
510    Ok(Duration::from_secs(u))
511}
512
513impl ReqResConfig {
514    /// Create a ReqRes Confing
515    pub const fn new(
516        message_timeout: Duration,
517        max_concurrent_streams: usize,
518    ) -> Self {
519        Self {
520            message_timeout,
521            max_concurrent_streams,
522        }
523    }
524}
525
526impl Default for ReqResConfig {
527    fn default() -> Self {
528        Self {
529            message_timeout: Duration::from_secs(10),
530            max_concurrent_streams: 100,
531        }
532    }
533}
534
535impl ReqResConfig {
536    /// Sets the timeout for inbound and outbound requests.
537    pub const fn with_message_timeout(mut self, timeout: Duration) -> Self {
538        self.message_timeout = timeout;
539        self
540    }
541
542    /// Sets the upper bound for the number of concurrent inbound + outbound streams.
543    pub const fn with_max_concurrent_streams(
544        mut self,
545        num_streams: usize,
546    ) -> Self {
547        self.max_concurrent_streams = num_streams;
548        self
549    }
550
551    /// Get message timeout
552    pub const fn get_message_timeout(&self) -> Duration {
553        self.message_timeout
554    }
555
556    /// Get max concurrent streams
557    pub const fn get_max_concurrent_streams(&self) -> usize {
558        self.max_concurrent_streams
559    }
560}