1use crate::{Error, routing::RoutingNode};
2use bytes::Bytes;
3use futures::{StreamExt, stream};
4use libp2p::{
5 Multiaddr, PeerId,
6 identity::{self},
7 multihash::Multihash,
8 swarm::ConnectionId,
9};
10use serde::{Deserialize, Deserializer, Serialize};
11use tokio::time::Instant;
12use tokio_util::sync::CancellationToken;
13use tracing::warn;
14
15use std::{
16 cmp::Ordering,
17 collections::{HashMap, HashSet, VecDeque},
18 str::FromStr,
19 time::Duration,
20};
21
22#[cfg(not(any(test, feature = "test")))]
23use ip_network::IpNetwork;
24#[cfg(not(any(test, feature = "test")))]
25use libp2p::multiaddr::Protocol;
26
27const TARGET: &str = "ave::network::utils";
28pub const NOISE_PROTOCOL: &str = "ave-p2p-v1";
29pub const REQRES_PROTOCOL: &str = "/ave/reqres/1.0.0";
30pub const ROUTING_PROTOCOL: &str = "/ave/routing/1.0.0";
31pub const IDENTIFY_PROTOCOL: &str = "/ave/1.0.0";
32pub const USER_AGENT: &str = "ave/0.8.0";
33pub const MAX_APP_MESSAGE_BYTES: usize = 1024 * 1024; pub const DEFAULT_MAX_PENDING_OUTBOUND_BYTES_PER_PEER: usize = 8 * 1024 * 1024; pub const DEFAULT_MAX_PENDING_INBOUND_BYTES_PER_PEER: usize = 8 * 1024 * 1024; pub const DEFAULT_MAX_PENDING_OUTBOUND_BYTES_TOTAL: usize = 0; pub const DEFAULT_MAX_PENDING_INBOUND_BYTES_TOTAL: usize = 0; #[derive(Debug, thiserror::Error)]
40pub enum PeerIdToEd25519Error {
41 #[error(
42 "peer id is not an identity multihash (public key is not recoverable)"
43 )]
44 NotIdentityMultihash,
45 #[error("multihash digest is empty or invalid")]
46 InvalidDigest,
47 #[error("failed to decode protobuf-encoded public key: {0}")]
48 Protobuf(#[from] identity::DecodingError),
49 #[error("public key is not ed25519: {0}")]
50 NotEd25519(#[from] identity::OtherVariantError),
51}
52
53pub fn peer_id_to_ed25519_pubkey_bytes(
54 peer_id: &PeerId,
55) -> Result<[u8; 32], PeerIdToEd25519Error> {
56 let mh: &Multihash<64> = peer_id.as_ref();
58
59 if mh.code() != 0x00 {
61 return Err(PeerIdToEd25519Error::NotIdentityMultihash);
62 }
63
64 let digest = mh.digest();
65 if digest.is_empty() {
66 return Err(PeerIdToEd25519Error::InvalidDigest);
67 }
68
69 let pk = identity::PublicKey::try_decode_protobuf(digest)?;
71 let ed_pk = pk.try_into_ed25519()?;
72 Ok(ed_pk.to_bytes())
73}
74
75#[derive(Clone)]
76pub struct LimitsConfig {
77 pub yamux_max_num_streams: usize,
78 #[cfg_attr(any(test, feature = "test"), allow(dead_code))]
79 pub tcp_listen_backlog: u32,
80 #[cfg_attr(any(test, feature = "test"), allow(dead_code))]
81 pub tcp_nodelay: bool,
82 pub reqres_max_concurrent_streams: usize,
83 pub reqres_request_timeout: u64,
84 pub identify_cache: usize,
85 pub kademlia_query_timeout: u64,
86 pub conn_limmits_max_pending_incoming: Option<u32>,
87 pub conn_limmits_max_pending_outgoing: Option<u32>,
88 pub conn_limmits_max_established_incoming: Option<u32>,
89 pub conn_limmits_max_established_outgoing: Option<u32>,
90 pub conn_limmits_max_established_per_peer: Option<u32>,
91 pub conn_limmits_max_established_total: Option<u32>,
92}
93
94impl LimitsConfig {
95 pub fn build(ram_mb: u64, cpu_cores: usize) -> Self {
110 let cores = cpu_cores.max(1);
111
112 let budget_bytes = ram_mb * 1024 * 1024 * 10 / 100;
114 let bytes_per_conn: u64 = 50 * 1024; let max_total =
118 ((budget_bytes / bytes_per_conn) as u32).clamp(50, 9_000);
119
120 let max_incoming = (max_total * 80 / 100).clamp(30, 8_000);
122 let max_outgoing = (max_total * 20 / 100).clamp(20, 1_000);
123
124 let pending_incoming = (max_incoming / 10)
128 .max(10)
129 .min((cores as u32) * 64)
130 .min(512);
131 let pending_outgoing = (max_outgoing / 4).clamp(20, 128);
132
133 let reqres_streams = (cores * 512).clamp(64, 4_096);
137
138 let yamux_streams = (reqres_streams + 64).clamp(256, 8_192);
141
142 let tcp_backlog = (max_incoming / 8).clamp(128, 8_192);
145
146 let identify_cache = ((max_total / 4) as usize).min(1_024);
149
150 Self {
151 yamux_max_num_streams: yamux_streams,
152 tcp_listen_backlog: tcp_backlog,
153 tcp_nodelay: true,
154 reqres_max_concurrent_streams: reqres_streams,
155 reqres_request_timeout: 30,
156 identify_cache,
157 kademlia_query_timeout: 25,
158 conn_limmits_max_pending_incoming: Some(pending_incoming),
159 conn_limmits_max_pending_outgoing: Some(pending_outgoing),
160 conn_limmits_max_established_incoming: Some(max_incoming),
161 conn_limmits_max_established_outgoing: Some(max_outgoing),
162 conn_limmits_max_established_per_peer: Some(2),
163 conn_limmits_max_established_total: Some(max_total),
164 }
165 }
166}
167
168pub enum ScheduleType {
169 Discover,
170 Dial(Vec<Multiaddr>),
171}
172
173#[derive(Copy, Clone, Debug)]
174pub enum Action {
175 Discover,
176 Dial,
177 Identified(ConnectionId),
178}
179
180impl From<RetryKind> for Action {
181 fn from(value: RetryKind) -> Self {
182 match value {
183 RetryKind::Discover => Self::Discover,
184 RetryKind::Dial => Self::Dial,
185 }
186 }
187}
188
189#[derive(Copy, Clone, Debug)]
190pub enum RetryKind {
191 Discover,
192 Dial,
193}
194
195#[derive(Clone, Debug)]
196pub struct RetryState {
197 pub attempts: u8,
198 pub when: Instant,
199 pub kind: RetryKind,
200 pub addrs: Vec<Multiaddr>,
201}
202
203#[derive(Eq, Clone, Debug)]
204pub struct Due(pub PeerId, pub Instant);
205impl PartialEq for Due {
206 fn eq(&self, o: &Self) -> bool {
207 self.1.eq(&o.1)
208 }
209}
210impl Ord for Due {
211 fn cmp(&self, o: &Self) -> Ordering {
212 o.1.cmp(&self.1)
213 }
214}
215impl PartialOrd for Due {
216 fn partial_cmp(&self, o: &Self) -> Option<Ordering> {
217 Some(self.cmp(o))
218 }
219}
220
221#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
223pub enum NetworkState {
224 Start,
226 Dial,
228 Dialing,
230 Running,
232 Disconnected,
234}
235
236pub enum MessagesHelper {
237 Single(Bytes),
238 Vec(VecDeque<Bytes>),
239}
240
241async fn request_peer_list(
243 client: reqwest::Client,
244 service: String,
245 request_timeout: Duration,
246 graceful_token: CancellationToken,
247 crash_token: CancellationToken,
248 list_kind: &'static str,
249) -> Option<Vec<String>> {
250 let response = tokio::select! {
251 _ = graceful_token.clone().cancelled_owned() => return None,
252 _ = crash_token.clone().cancelled_owned() => return None,
253 response = client.get(&service).timeout(request_timeout).send() => response,
254 };
255
256 match response {
257 Ok(res) => {
258 if !res.status().is_success() {
259 warn!(
260 target: TARGET,
261 list_kind = list_kind,
262 url = service,
263 status = %res.status(),
264 "control-list service returned error status"
265 );
266 return None;
267 }
268
269 let peers = tokio::select! {
270 _ = graceful_token.clone().cancelled_owned() => return None,
271 _ = crash_token.clone().cancelled_owned() => return None,
272 peers = res.json::<Vec<String>>() => peers,
273 };
274
275 match peers {
276 Ok(peers) => Some(peers),
277 Err(e) => {
278 warn!(
279 target: TARGET,
280 list_kind = list_kind,
281 url = service,
282 error = %e,
283 "control-list service returned unexpected body"
284 );
285 None
286 }
287 }
288 }
289 Err(e) => {
290 if e.is_timeout() {
291 warn!(
292 target: TARGET,
293 list_kind = list_kind,
294 url = service,
295 timeout_secs = request_timeout.as_secs_f64(),
296 "control-list service timed out"
297 );
298 } else {
299 warn!(
300 target: TARGET,
301 list_kind = list_kind,
302 url = service,
303 error = %e,
304 "control-list service unreachable"
305 );
306 }
307 None
308 }
309 }
310}
311
312async fn request_peer_lists(
313 client: reqwest::Client,
314 services: Vec<String>,
315 request_timeout: Duration,
316 max_concurrent_requests: usize,
317 graceful_token: CancellationToken,
318 crash_token: CancellationToken,
319 list_kind: &'static str,
320) -> (Vec<String>, u16) {
321 if services.is_empty()
322 || graceful_token.is_cancelled()
323 || crash_token.is_cancelled()
324 {
325 return (vec![], 0);
326 }
327
328 let responses = stream::iter(services.into_iter().map(|service| {
329 let client = client.clone();
330 let graceful_token = graceful_token.clone();
331 let crash_token = crash_token.clone();
332
333 async move {
334 request_peer_list(
335 client.clone(),
336 service,
337 request_timeout,
338 graceful_token,
339 crash_token,
340 list_kind,
341 )
342 .await
343 }
344 }))
345 .buffer_unordered(max_concurrent_requests.max(1))
346 .collect::<Vec<Option<Vec<String>>>>()
347 .await;
348
349 let mut peers = Vec::new();
350 let mut successful = 0u16;
351
352 for item in responses.into_iter().flatten() {
353 peers.extend(item);
354 successful = successful.saturating_add(1);
355 }
356
357 (peers, successful)
358}
359
360pub async fn request_update_lists(
361 client: reqwest::Client,
362 service_allow: Vec<String>,
363 service_block: Vec<String>,
364 request_timeout: Duration,
365 max_concurrent_requests: usize,
366 graceful_token: CancellationToken,
367 crash_token: CancellationToken,
368) -> ((Vec<String>, Vec<String>), (u16, u16)) {
369 let (
370 (vec_allow_peers, successful_allow),
371 (vec_block_peers, successful_block),
372 ) = tokio::join!(
373 request_peer_lists(
374 client.clone(),
375 service_allow,
376 request_timeout,
377 max_concurrent_requests,
378 graceful_token.clone(),
379 crash_token.clone(),
380 "allow"
381 ),
382 request_peer_lists(
383 client,
384 service_block,
385 request_timeout,
386 max_concurrent_requests,
387 graceful_token.clone(),
388 crash_token.clone(),
389 "block"
390 )
391 );
392
393 (
394 (vec_allow_peers, vec_block_peers),
395 (successful_allow, successful_block),
396 )
397}
398
399pub fn convert_boot_nodes(
401 boot_nodes: &[RoutingNode],
402) -> HashMap<PeerId, Vec<Multiaddr>> {
403 let mut boot_nodes_aux = HashMap::new();
404
405 for node in boot_nodes {
406 let Ok(peer) = bs58::decode(node.peer_id.clone()).into_vec() else {
407 continue;
408 };
409
410 let Ok(peer) = PeerId::from_bytes(peer.as_slice()) else {
411 continue;
412 };
413
414 let mut aux_addrs = vec![];
415 for addr in node.address.iter() {
416 let Ok(addr) = Multiaddr::from_str(addr) else {
417 continue;
418 };
419
420 aux_addrs.push(addr);
421 }
422
423 if !aux_addrs.is_empty() {
424 boot_nodes_aux.insert(peer, aux_addrs);
425 }
426 }
427
428 boot_nodes_aux
429}
430
431pub fn convert_addresses(
433 addresses: &[String],
434) -> Result<HashSet<Multiaddr>, Error> {
435 let mut addrs = HashSet::new();
436 for address in addresses {
437 if let Some(value) = multiaddr(address) {
438 addrs.insert(value);
439 } else {
440 return Err(Error::InvalidAddress(address.clone()));
441 }
442 }
443 Ok(addrs)
444}
445
446fn multiaddr(addr: &str) -> Option<Multiaddr> {
448 addr.parse::<Multiaddr>().ok()
449}
450
451#[cfg(not(any(test, feature = "test")))]
458pub fn is_global(addr: &Multiaddr) -> bool {
459 addr.iter().any(|p| match p {
460 Protocol::Ip4(ip) => IpNetwork::from(ip).is_global(),
461 Protocol::Ip6(ip) => IpNetwork::from(ip).is_global(),
462 _ => false,
463 })
464}
465
466#[cfg(not(any(test, feature = "test")))]
467pub fn is_private(addr: &Multiaddr) -> bool {
468 addr.iter().any(|p| match p {
469 Protocol::Ip4(ip) => ip.is_private(),
470 Protocol::Ip6(ip) => ip.is_unique_local(),
471 _ => false,
472 })
473}
474
475#[cfg(not(any(test, feature = "test")))]
476pub fn is_loop_back(addr: &Multiaddr) -> bool {
477 addr.iter().any(|p| match p {
478 Protocol::Ip4(ip) => ip.is_loopback(),
479 Protocol::Ip6(ip) => ip.is_loopback(),
480 _ => false,
481 })
482}
483
484#[cfg(not(any(test, feature = "test")))]
485pub fn is_dns(addr: &Multiaddr) -> bool {
486 addr.iter().any(|p| {
487 matches!(p, Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_))
488 })
489}
490
491#[cfg(not(any(test, feature = "test")))]
493pub fn is_tcp(addr: &Multiaddr) -> bool {
494 addr.iter().any(|p| matches!(p, Protocol::Tcp(_)))
495}
496
497#[derive(Debug, Clone, Deserialize)]
499#[serde(default)]
500pub struct ReqResConfig {
501 #[serde(deserialize_with = "deserialize_duration_secs")]
503 pub message_timeout: Duration,
504 pub max_concurrent_streams: usize,
506}
507
508fn deserialize_duration_secs<'de, D>(
509 deserializer: D,
510) -> Result<Duration, D::Error>
511where
512 D: Deserializer<'de>,
513{
514 let u: u64 = u64::deserialize(deserializer)?;
515 Ok(Duration::from_secs(u))
516}
517
518impl ReqResConfig {
519 pub const fn new(
521 message_timeout: Duration,
522 max_concurrent_streams: usize,
523 ) -> Self {
524 Self {
525 message_timeout,
526 max_concurrent_streams,
527 }
528 }
529}
530
531impl Default for ReqResConfig {
532 fn default() -> Self {
533 Self {
534 message_timeout: Duration::from_secs(10),
535 max_concurrent_streams: 100,
536 }
537 }
538}
539
540impl ReqResConfig {
541 pub const fn with_message_timeout(mut self, timeout: Duration) -> Self {
543 self.message_timeout = timeout;
544 self
545 }
546
547 pub const fn with_max_concurrent_streams(
549 mut self,
550 num_streams: usize,
551 ) -> Self {
552 self.max_concurrent_streams = num_streams;
553 self
554 }
555
556 pub const fn get_message_timeout(&self) -> Duration {
558 self.message_timeout
559 }
560
561 pub const fn get_max_concurrent_streams(&self) -> usize {
563 self.max_concurrent_streams
564 }
565}