Skip to main content

ave_network/
utils.rs

1use crate::{Error, routing::RoutingNode};
2use bytes::Bytes;
3use futures::{StreamExt, stream};
4use libp2p::{
5    Multiaddr, PeerId,
6    identity::{self},
7    multihash::Multihash,
8    swarm::ConnectionId,
9};
10use serde::{Deserialize, Deserializer, Serialize};
11use tokio::time::Instant;
12use tokio_util::sync::CancellationToken;
13use tracing::warn;
14
15use std::{
16    cmp::Ordering,
17    collections::{HashMap, HashSet, VecDeque},
18    str::FromStr,
19    time::Duration,
20};
21
22#[cfg(not(any(test, feature = "test")))]
23use ip_network::IpNetwork;
24#[cfg(not(any(test, feature = "test")))]
25use libp2p::multiaddr::Protocol;
26
27const TARGET: &str = "ave::network::utils";
28pub const NOISE_PROTOCOL: &str = "ave-p2p-v1";
29pub const REQRES_PROTOCOL: &str = "/ave/reqres/1.0.0";
30pub const ROUTING_PROTOCOL: &str = "/ave/routing/1.0.0";
31pub const IDENTIFY_PROTOCOL: &str = "/ave/1.0.0";
32pub const USER_AGENT: &str = "ave/0.8.0";
33pub const MAX_APP_MESSAGE_BYTES: usize = 1024 * 1024; // 1 MiB
34pub const DEFAULT_MAX_PENDING_OUTBOUND_BYTES_PER_PEER: usize = 8 * 1024 * 1024; // 8 MiB
35pub const DEFAULT_MAX_PENDING_INBOUND_BYTES_PER_PEER: usize = 8 * 1024 * 1024; // 8 MiB
36pub const DEFAULT_MAX_PENDING_OUTBOUND_BYTES_TOTAL: usize = 0; // disabled
37pub const DEFAULT_MAX_PENDING_INBOUND_BYTES_TOTAL: usize = 0; // disabled
38
39#[derive(Debug, thiserror::Error)]
40pub enum PeerIdToEd25519Error {
41    #[error(
42        "peer id is not an identity multihash (public key is not recoverable)"
43    )]
44    NotIdentityMultihash,
45    #[error("multihash digest is empty or invalid")]
46    InvalidDigest,
47    #[error("failed to decode protobuf-encoded public key: {0}")]
48    Protobuf(#[from] identity::DecodingError),
49    #[error("public key is not ed25519: {0}")]
50    NotEd25519(#[from] identity::OtherVariantError),
51}
52
53pub fn peer_id_to_ed25519_pubkey_bytes(
54    peer_id: &PeerId,
55) -> Result<[u8; 32], PeerIdToEd25519Error> {
56    // PeerId: AsRef<Multihash<64>>
57    let mh: &Multihash<64> = peer_id.as_ref();
58
59    // multihash identity = 0x00
60    if mh.code() != 0x00 {
61        return Err(PeerIdToEd25519Error::NotIdentityMultihash);
62    }
63
64    let digest = mh.digest();
65    if digest.is_empty() {
66        return Err(PeerIdToEd25519Error::InvalidDigest);
67    }
68
69    // digest == protobuf-encoded public key
70    let pk = identity::PublicKey::try_decode_protobuf(digest)?;
71    let ed_pk = pk.try_into_ed25519()?;
72    Ok(ed_pk.to_bytes())
73}
74
75#[derive(Clone)]
76pub struct LimitsConfig {
77    pub yamux_max_num_streams: usize,
78    #[cfg_attr(any(test, feature = "test"), allow(dead_code))]
79    pub tcp_listen_backlog: u32,
80    #[cfg_attr(any(test, feature = "test"), allow(dead_code))]
81    pub tcp_nodelay: bool,
82    pub reqres_max_concurrent_streams: usize,
83    pub reqres_request_timeout: u64,
84    pub identify_cache: usize,
85    pub kademlia_query_timeout: u64,
86    pub conn_limmits_max_pending_incoming: Option<u32>,
87    pub conn_limmits_max_pending_outgoing: Option<u32>,
88    pub conn_limmits_max_established_incoming: Option<u32>,
89    pub conn_limmits_max_established_outgoing: Option<u32>,
90    pub conn_limmits_max_established_per_peer: Option<u32>,
91    pub conn_limmits_max_established_total: Option<u32>,
92}
93
94impl LimitsConfig {
95    /// Build network limits from the total machine RAM and CPU count.
96    ///
97    /// `ram_mb` and `cpu_cores` are **total machine specs** shared by all components
98    /// (DB backends, actor runtime, libp2p, OS).
99    ///
100    /// ## Resource split
101    ///
102    /// - **RAM-driven**: connection counts. Each established connection ≈ 50 KB
103    ///   (TCP state ~20 KB + yamux ~4 KB + Noise ~3 KB + bookkeeping ~3 KB).
104    ///   Budget: 10 % of total RAM. Split: 80 % inbound / 20 % outbound.
105    ///
106    /// - **CPU-driven**: stream concurrency and pending handshakes.
107    ///   Noise handshakes (pending connections) are asymmetric-crypto intensive.
108    ///   ReqRes concurrent streams are tokio tasks — more cores = more parallelism.
109    pub fn build(ram_mb: u64, cpu_cores: usize) -> Self {
110        let cores = cpu_cores.max(1);
111
112        // ── Connection limits (RAM) ──────────────────────────────────────────────
113        let budget_bytes = ram_mb * 1024 * 1024 * 10 / 100;
114        let bytes_per_conn: u64 = 50 * 1024; // ~50 KB per established connection
115
116        // Total connections: floor 50, cap 9 000 (file-descriptor & kernel limits)
117        let max_total =
118            ((budget_bytes / bytes_per_conn) as u32).clamp(50, 9_000);
119
120        // 80 % incoming (nodes are mostly servers), 20 % outgoing
121        let max_incoming = (max_total * 80 / 100).clamp(30, 8_000);
122        let max_outgoing = (max_total * 20 / 100).clamp(20, 1_000);
123
124        // ── Pending connections (CPU) ────────────────────────────────────────────
125        // Each pending connection performs a Noise handshake (X25519 + ChaCha20).
126        // ~64 parallel handshakes per core is a practical bound.
127        let pending_incoming = (max_incoming / 10)
128            .max(10)
129            .min((cores as u32) * 64)
130            .min(512);
131        let pending_outgoing = (max_outgoing / 4).clamp(20, 128);
132
133        // ── Stream concurrency (CPU) ─────────────────────────────────────────────
134        // Each concurrent ReqRes stream is a tokio task. More cores → more tasks
135        // that run in true parallel. ~512 concurrent tasks per core is sensible.
136        let reqres_streams = (cores * 512).clamp(64, 4_096);
137
138        // Yamux per-connection stream limit: must cover the worst case where a
139        // single peer saturates the full ReqRes budget, plus routing/kad overhead.
140        let yamux_streams = (reqres_streams + 64).clamp(256, 8_192);
141
142        // ── TCP listen backlog (kernel-managed) ──────────────────────────────────
143        // Sized for SYN bursts: 1/8 of max_incoming, floor 128, cap 8 192.
144        let tcp_backlog = (max_incoming / 8).clamp(128, 8_192);
145
146        // ── Identify cache (RAM) ─────────────────────────────────────────────────
147        // Metadata for frequently-contacted peers: 1/4 of total, cap 1 024.
148        let identify_cache = ((max_total / 4) as usize).min(1_024);
149
150        Self {
151            yamux_max_num_streams: yamux_streams,
152            tcp_listen_backlog: tcp_backlog,
153            tcp_nodelay: true,
154            reqres_max_concurrent_streams: reqres_streams,
155            reqres_request_timeout: 30,
156            identify_cache,
157            kademlia_query_timeout: 25,
158            conn_limmits_max_pending_incoming: Some(pending_incoming),
159            conn_limmits_max_pending_outgoing: Some(pending_outgoing),
160            conn_limmits_max_established_incoming: Some(max_incoming),
161            conn_limmits_max_established_outgoing: Some(max_outgoing),
162            conn_limmits_max_established_per_peer: Some(2),
163            conn_limmits_max_established_total: Some(max_total),
164        }
165    }
166}
167
168pub enum ScheduleType {
169    Discover,
170    Dial(Vec<Multiaddr>),
171}
172
173#[derive(Copy, Clone, Debug)]
174pub enum Action {
175    Discover,
176    Dial,
177    Identified(ConnectionId),
178}
179
180impl From<RetryKind> for Action {
181    fn from(value: RetryKind) -> Self {
182        match value {
183            RetryKind::Discover => Self::Discover,
184            RetryKind::Dial => Self::Dial,
185        }
186    }
187}
188
189#[derive(Copy, Clone, Debug)]
190pub enum RetryKind {
191    Discover,
192    Dial,
193}
194
195#[derive(Clone, Debug)]
196pub struct RetryState {
197    pub attempts: u8,
198    pub when: Instant,
199    pub kind: RetryKind,
200    pub addrs: Vec<Multiaddr>,
201}
202
203#[derive(Eq, Clone, Debug)]
204pub struct Due(pub PeerId, pub Instant);
205impl PartialEq for Due {
206    fn eq(&self, o: &Self) -> bool {
207        self.1.eq(&o.1)
208    }
209}
210impl Ord for Due {
211    fn cmp(&self, o: &Self) -> Ordering {
212        o.1.cmp(&self.1)
213    }
214}
215impl PartialOrd for Due {
216    fn partial_cmp(&self, o: &Self) -> Option<Ordering> {
217        Some(self.cmp(o))
218    }
219}
220
221/// Network state.
222#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
223pub enum NetworkState {
224    /// Start.
225    Start,
226    /// Dial.
227    Dial,
228    /// Dialing boot node.
229    Dialing,
230    /// Running.
231    Running,
232    /// Disconnected.
233    Disconnected,
234}
235
236pub enum MessagesHelper {
237    Single(Bytes),
238    Vec(VecDeque<Bytes>),
239}
240
241/// Method that update allow and block lists
242async fn request_peer_list(
243    client: reqwest::Client,
244    service: String,
245    request_timeout: Duration,
246    graceful_token: CancellationToken,
247    crash_token: CancellationToken,
248    list_kind: &'static str,
249) -> Option<Vec<String>> {
250    let response = tokio::select! {
251        _ = graceful_token.clone().cancelled_owned() => return None,
252        _ = crash_token.clone().cancelled_owned() => return None,
253        response = client.get(&service).timeout(request_timeout).send() => response,
254    };
255
256    match response {
257        Ok(res) => {
258            if !res.status().is_success() {
259                warn!(
260                    target: TARGET,
261                    list_kind = list_kind,
262                    url = service,
263                    status = %res.status(),
264                    "control-list service returned error status"
265                );
266                return None;
267            }
268
269            let peers = tokio::select! {
270                _ = graceful_token.clone().cancelled_owned() => return None,
271                _ = crash_token.clone().cancelled_owned() => return None,
272                peers = res.json::<Vec<String>>() => peers,
273            };
274
275            match peers {
276                Ok(peers) => Some(peers),
277                Err(e) => {
278                    warn!(
279                        target: TARGET,
280                        list_kind = list_kind,
281                        url = service,
282                        error = %e,
283                        "control-list service returned unexpected body"
284                    );
285                    None
286                }
287            }
288        }
289        Err(e) => {
290            if e.is_timeout() {
291                warn!(
292                    target: TARGET,
293                    list_kind = list_kind,
294                    url = service,
295                    timeout_secs = request_timeout.as_secs_f64(),
296                    "control-list service timed out"
297                );
298            } else {
299                warn!(
300                    target: TARGET,
301                    list_kind = list_kind,
302                    url = service,
303                    error = %e,
304                    "control-list service unreachable"
305                );
306            }
307            None
308        }
309    }
310}
311
312async fn request_peer_lists(
313    client: reqwest::Client,
314    services: Vec<String>,
315    request_timeout: Duration,
316    max_concurrent_requests: usize,
317    graceful_token: CancellationToken,
318    crash_token: CancellationToken,
319    list_kind: &'static str,
320) -> (Vec<String>, u16) {
321    if services.is_empty()
322        || graceful_token.is_cancelled()
323        || crash_token.is_cancelled()
324    {
325        return (vec![], 0);
326    }
327
328    let responses = stream::iter(services.into_iter().map(|service| {
329        let client = client.clone();
330        let graceful_token = graceful_token.clone();
331        let crash_token = crash_token.clone();
332
333        async move {
334            request_peer_list(
335                client.clone(),
336                service,
337                request_timeout,
338                graceful_token,
339                crash_token,
340                list_kind,
341            )
342            .await
343        }
344    }))
345    .buffer_unordered(max_concurrent_requests.max(1))
346    .collect::<Vec<Option<Vec<String>>>>()
347    .await;
348
349    let mut peers = Vec::new();
350    let mut successful = 0u16;
351
352    for item in responses.into_iter().flatten() {
353        peers.extend(item);
354        successful = successful.saturating_add(1);
355    }
356
357    (peers, successful)
358}
359
360pub async fn request_update_lists(
361    client: reqwest::Client,
362    service_allow: Vec<String>,
363    service_block: Vec<String>,
364    request_timeout: Duration,
365    max_concurrent_requests: usize,
366    graceful_token: CancellationToken,
367    crash_token: CancellationToken,
368) -> ((Vec<String>, Vec<String>), (u16, u16)) {
369    let (
370        (vec_allow_peers, successful_allow),
371        (vec_block_peers, successful_block),
372    ) = tokio::join!(
373        request_peer_lists(
374            client.clone(),
375            service_allow,
376            request_timeout,
377            max_concurrent_requests,
378            graceful_token.clone(),
379            crash_token.clone(),
380            "allow"
381        ),
382        request_peer_lists(
383            client,
384            service_block,
385            request_timeout,
386            max_concurrent_requests,
387            graceful_token.clone(),
388            crash_token.clone(),
389            "block"
390        )
391    );
392
393    (
394        (vec_allow_peers, vec_block_peers),
395        (successful_allow, successful_block),
396    )
397}
398
399/// Convert boot nodes to `PeerId` and `Multiaddr`.
400pub fn convert_boot_nodes(
401    boot_nodes: &[RoutingNode],
402) -> HashMap<PeerId, Vec<Multiaddr>> {
403    let mut boot_nodes_aux = HashMap::new();
404
405    for node in boot_nodes {
406        let Ok(peer) = bs58::decode(node.peer_id.clone()).into_vec() else {
407            continue;
408        };
409
410        let Ok(peer) = PeerId::from_bytes(peer.as_slice()) else {
411            continue;
412        };
413
414        let mut aux_addrs = vec![];
415        for addr in node.address.iter() {
416            let Ok(addr) = Multiaddr::from_str(addr) else {
417                continue;
418            };
419
420            aux_addrs.push(addr);
421        }
422
423        if !aux_addrs.is_empty() {
424            boot_nodes_aux.insert(peer, aux_addrs);
425        }
426    }
427
428    boot_nodes_aux
429}
430
431/// Gets the list of external (public) addresses for the node from string array.
432pub fn convert_addresses(
433    addresses: &[String],
434) -> Result<HashSet<Multiaddr>, Error> {
435    let mut addrs = HashSet::new();
436    for address in addresses {
437        if let Some(value) = multiaddr(address) {
438            addrs.insert(value);
439        } else {
440            return Err(Error::InvalidAddress(address.clone()));
441        }
442    }
443    Ok(addrs)
444}
445
446/// Parses a string into a `Multiaddr` if possible.
447fn multiaddr(addr: &str) -> Option<Multiaddr> {
448    addr.parse::<Multiaddr>().ok()
449}
450
451/// Check if the given `Multiaddr` is reachable.
452///
453/// This test is successful only for global IP addresses and DNS names.
454// NB: Currently all DNS names are allowed and no check for TLD suffixes is done
455// because the set of valid domains is highly dynamic and would require frequent
456// updates, for example by utilising publicsuffix.org or IANA.
457#[cfg(not(any(test, feature = "test")))]
458pub fn is_global(addr: &Multiaddr) -> bool {
459    addr.iter().any(|p| match p {
460        Protocol::Ip4(ip) => IpNetwork::from(ip).is_global(),
461        Protocol::Ip6(ip) => IpNetwork::from(ip).is_global(),
462        _ => false,
463    })
464}
465
466#[cfg(not(any(test, feature = "test")))]
467pub fn is_private(addr: &Multiaddr) -> bool {
468    addr.iter().any(|p| match p {
469        Protocol::Ip4(ip) => ip.is_private(),
470        Protocol::Ip6(ip) => ip.is_unique_local(),
471        _ => false,
472    })
473}
474
475#[cfg(not(any(test, feature = "test")))]
476pub fn is_loop_back(addr: &Multiaddr) -> bool {
477    addr.iter().any(|p| match p {
478        Protocol::Ip4(ip) => ip.is_loopback(),
479        Protocol::Ip6(ip) => ip.is_loopback(),
480        _ => false,
481    })
482}
483
484#[cfg(not(any(test, feature = "test")))]
485pub fn is_dns(addr: &Multiaddr) -> bool {
486    addr.iter().any(|p| {
487        matches!(p, Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_))
488    })
489}
490
491/// Chech if the given `Multiaddr` is a memory address.
492#[cfg(not(any(test, feature = "test")))]
493pub fn is_tcp(addr: &Multiaddr) -> bool {
494    addr.iter().any(|p| matches!(p, Protocol::Tcp(_)))
495}
496
497/// The configuration for a `Behaviour` protocol.
498#[derive(Debug, Clone, Deserialize)]
499#[serde(default)]
500pub struct ReqResConfig {
501    /// message timeout
502    #[serde(deserialize_with = "deserialize_duration_secs")]
503    pub message_timeout: Duration,
504    /// max concurrent streams
505    pub max_concurrent_streams: usize,
506}
507
508fn deserialize_duration_secs<'de, D>(
509    deserializer: D,
510) -> Result<Duration, D::Error>
511where
512    D: Deserializer<'de>,
513{
514    let u: u64 = u64::deserialize(deserializer)?;
515    Ok(Duration::from_secs(u))
516}
517
518impl ReqResConfig {
519    /// Create a ReqRes Confing
520    pub const fn new(
521        message_timeout: Duration,
522        max_concurrent_streams: usize,
523    ) -> Self {
524        Self {
525            message_timeout,
526            max_concurrent_streams,
527        }
528    }
529}
530
531impl Default for ReqResConfig {
532    fn default() -> Self {
533        Self {
534            message_timeout: Duration::from_secs(10),
535            max_concurrent_streams: 100,
536        }
537    }
538}
539
540impl ReqResConfig {
541    /// Sets the timeout for inbound and outbound requests.
542    pub const fn with_message_timeout(mut self, timeout: Duration) -> Self {
543        self.message_timeout = timeout;
544        self
545    }
546
547    /// Sets the upper bound for the number of concurrent inbound + outbound streams.
548    pub const fn with_max_concurrent_streams(
549        mut self,
550        num_streams: usize,
551    ) -> Self {
552        self.max_concurrent_streams = num_streams;
553        self
554    }
555
556    /// Get message timeout
557    pub const fn get_message_timeout(&self) -> Duration {
558        self.message_timeout
559    }
560
561    /// Get max concurrent streams
562    pub const fn get_max_concurrent_streams(&self) -> usize {
563        self.max_concurrent_streams
564    }
565}