1use crate::{Error, routing::RoutingNode};
2use bytes::Bytes;
3use futures::{StreamExt, stream};
4use ip_network::IpNetwork;
5use libp2p::{
6 Multiaddr, PeerId,
7 identity::{self},
8 multiaddr::Protocol,
9 multihash::Multihash,
10 swarm::ConnectionId,
11};
12use serde::{Deserialize, Deserializer, Serialize};
13use tokio::time::Instant;
14use tokio_util::sync::CancellationToken;
15use tracing::warn;
16
17use std::{
18 cmp::Ordering,
19 collections::{HashMap, HashSet, VecDeque},
20 str::FromStr,
21 time::Duration,
22};
23
24const TARGET: &str = "ave::network::utils";
25pub const NOISE_PROTOCOL: &str = "ave-p2p-v1";
26pub const REQRES_PROTOCOL: &str = "/ave/reqres/1.0.0";
27pub const ROUTING_PROTOCOL: &str = "/ave/routing/1.0.0";
28pub const IDENTIFY_PROTOCOL: &str = "/ave/1.0.0";
29pub const USER_AGENT: &str = "ave/0.8.0";
30pub const MAX_APP_MESSAGE_BYTES: usize = 1024 * 1024; pub const DEFAULT_MAX_PENDING_OUTBOUND_BYTES_PER_PEER: usize = 8 * 1024 * 1024; pub const DEFAULT_MAX_PENDING_INBOUND_BYTES_PER_PEER: usize = 8 * 1024 * 1024; pub const DEFAULT_MAX_PENDING_OUTBOUND_BYTES_TOTAL: usize = 0; pub const DEFAULT_MAX_PENDING_INBOUND_BYTES_TOTAL: usize = 0; #[derive(Debug, thiserror::Error)]
37pub enum PeerIdToEd25519Error {
38 #[error(
39 "peer id is not an identity multihash (public key is not recoverable)"
40 )]
41 NotIdentityMultihash,
42 #[error("multihash digest is empty or invalid")]
43 InvalidDigest,
44 #[error("failed to decode protobuf-encoded public key: {0}")]
45 Protobuf(#[from] identity::DecodingError),
46 #[error("public key is not ed25519: {0}")]
47 NotEd25519(#[from] identity::OtherVariantError),
48}
49
50pub fn peer_id_to_ed25519_pubkey_bytes(
51 peer_id: &PeerId,
52) -> Result<[u8; 32], PeerIdToEd25519Error> {
53 let mh: &Multihash<64> = peer_id.as_ref();
55
56 if mh.code() != 0x00 {
58 return Err(PeerIdToEd25519Error::NotIdentityMultihash);
59 }
60
61 let digest = mh.digest();
62 if digest.is_empty() {
63 return Err(PeerIdToEd25519Error::InvalidDigest);
64 }
65
66 let pk = identity::PublicKey::try_decode_protobuf(digest)?;
68 let ed_pk = pk.try_into_ed25519()?;
69 Ok(ed_pk.to_bytes())
70}
71
72#[derive(Clone)]
73pub struct LimitsConfig {
74 pub yamux_max_num_streams: usize,
75 pub tcp_listen_backlog: u32,
76 pub tcp_nodelay: bool,
77 pub reqres_max_concurrent_streams: usize,
78 pub reqres_request_timeout: u64,
79 pub identify_cache: usize,
80 pub kademlia_query_timeout: u64,
81 pub conn_limmits_max_pending_incoming: Option<u32>,
82 pub conn_limmits_max_pending_outgoing: Option<u32>,
83 pub conn_limmits_max_established_incoming: Option<u32>,
84 pub conn_limmits_max_established_outgoing: Option<u32>,
85 pub conn_limmits_max_established_per_peer: Option<u32>,
86 pub conn_limmits_max_established_total: Option<u32>,
87}
88
89impl LimitsConfig {
90 pub fn build(ram_mb: u64, cpu_cores: usize) -> Self {
105 let cores = cpu_cores.max(1);
106
107 let budget_bytes = ram_mb * 1024 * 1024 * 10 / 100;
109 let bytes_per_conn: u64 = 50 * 1024; let max_total =
113 ((budget_bytes / bytes_per_conn) as u32).clamp(50, 9_000);
114
115 let max_incoming = (max_total * 80 / 100).clamp(30, 8_000);
117 let max_outgoing = (max_total * 20 / 100).clamp(20, 1_000);
118
119 let pending_incoming = (max_incoming / 10)
123 .max(10)
124 .min((cores as u32) * 64)
125 .min(512);
126 let pending_outgoing = (max_outgoing / 4).clamp(20, 128);
127
128 let reqres_streams = (cores * 512).clamp(64, 4_096);
132
133 let yamux_streams = (reqres_streams + 64).clamp(256, 8_192);
136
137 let tcp_backlog = (max_incoming / 8).clamp(128, 8_192);
140
141 let identify_cache = ((max_total / 4) as usize).min(1_024);
144
145 Self {
146 yamux_max_num_streams: yamux_streams,
147 tcp_listen_backlog: tcp_backlog,
148 tcp_nodelay: true,
149 reqres_max_concurrent_streams: reqres_streams,
150 reqres_request_timeout: 30,
151 identify_cache,
152 kademlia_query_timeout: 25,
153 conn_limmits_max_pending_incoming: Some(pending_incoming),
154 conn_limmits_max_pending_outgoing: Some(pending_outgoing),
155 conn_limmits_max_established_incoming: Some(max_incoming),
156 conn_limmits_max_established_outgoing: Some(max_outgoing),
157 conn_limmits_max_established_per_peer: Some(2),
158 conn_limmits_max_established_total: Some(max_total),
159 }
160 }
161}
162
163pub enum ScheduleType {
164 Discover,
165 Dial(Vec<Multiaddr>),
166}
167
168#[derive(Copy, Clone, Debug)]
169pub enum Action {
170 Discover,
171 Dial,
172 Identified(ConnectionId),
173}
174
175impl From<RetryKind> for Action {
176 fn from(value: RetryKind) -> Self {
177 match value {
178 RetryKind::Discover => Self::Discover,
179 RetryKind::Dial => Self::Dial,
180 }
181 }
182}
183
184#[derive(Copy, Clone, Debug)]
185pub enum RetryKind {
186 Discover,
187 Dial,
188}
189
190#[derive(Clone, Debug)]
191pub struct RetryState {
192 pub attempts: u8,
193 pub when: Instant,
194 pub kind: RetryKind,
195 pub addrs: Vec<Multiaddr>,
196}
197
198#[derive(Eq, Clone, Debug)]
199pub struct Due(pub PeerId, pub Instant);
200impl PartialEq for Due {
201 fn eq(&self, o: &Self) -> bool {
202 self.1.eq(&o.1)
203 }
204}
205impl Ord for Due {
206 fn cmp(&self, o: &Self) -> Ordering {
207 o.1.cmp(&self.1)
208 }
209}
210impl PartialOrd for Due {
211 fn partial_cmp(&self, o: &Self) -> Option<Ordering> {
212 Some(self.cmp(o))
213 }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
218pub enum NetworkState {
219 Start,
221 Dial,
223 Dialing,
225 Running,
227 Disconnected,
229}
230
231pub enum MessagesHelper {
232 Single(Bytes),
233 Vec(VecDeque<Bytes>),
234}
235
236async fn request_peer_list(
238 client: reqwest::Client,
239 service: String,
240 request_timeout: Duration,
241 graceful_token: CancellationToken,
242 crash_token: CancellationToken,
243 list_kind: &'static str,
244) -> Option<Vec<String>> {
245 let response = tokio::select! {
246 _ = graceful_token.clone().cancelled_owned() => return None,
247 _ = crash_token.clone().cancelled_owned() => return None,
248 response = client.get(&service).timeout(request_timeout).send() => response,
249 };
250
251 match response {
252 Ok(res) => {
253 if !res.status().is_success() {
254 warn!(
255 target: TARGET,
256 list_kind = list_kind,
257 url = service,
258 status = %res.status(),
259 "control-list service returned error status"
260 );
261 return None;
262 }
263
264 let peers = tokio::select! {
265 _ = graceful_token.clone().cancelled_owned() => return None,
266 _ = crash_token.clone().cancelled_owned() => return None,
267 peers = res.json::<Vec<String>>() => peers,
268 };
269
270 match peers {
271 Ok(peers) => Some(peers),
272 Err(e) => {
273 warn!(
274 target: TARGET,
275 list_kind = list_kind,
276 url = service,
277 error = %e,
278 "control-list service returned unexpected body"
279 );
280 None
281 }
282 }
283 }
284 Err(e) => {
285 if e.is_timeout() {
286 warn!(
287 target: TARGET,
288 list_kind = list_kind,
289 url = service,
290 timeout_secs = request_timeout.as_secs_f64(),
291 "control-list service timed out"
292 );
293 } else {
294 warn!(
295 target: TARGET,
296 list_kind = list_kind,
297 url = service,
298 error = %e,
299 "control-list service unreachable"
300 );
301 }
302 None
303 }
304 }
305}
306
307async fn request_peer_lists(
308 client: reqwest::Client,
309 services: Vec<String>,
310 request_timeout: Duration,
311 max_concurrent_requests: usize,
312 graceful_token: CancellationToken,
313 crash_token: CancellationToken,
314 list_kind: &'static str,
315) -> (Vec<String>, u16) {
316 if services.is_empty()
317 || graceful_token.is_cancelled()
318 || crash_token.is_cancelled()
319 {
320 return (vec![], 0);
321 }
322
323 let responses = stream::iter(services.into_iter().map(|service| {
324 let client = client.clone();
325 let graceful_token = graceful_token.clone();
326 let crash_token = crash_token.clone();
327
328 async move {
329 request_peer_list(
330 client.clone(),
331 service,
332 request_timeout,
333 graceful_token,
334 crash_token,
335 list_kind,
336 )
337 .await
338 }
339 }))
340 .buffer_unordered(max_concurrent_requests.max(1))
341 .collect::<Vec<Option<Vec<String>>>>()
342 .await;
343
344 let mut peers = Vec::new();
345 let mut successful = 0u16;
346
347 for item in responses.into_iter().flatten() {
348 peers.extend(item);
349 successful = successful.saturating_add(1);
350 }
351
352 (peers, successful)
353}
354
355pub async fn request_update_lists(
356 client: reqwest::Client,
357 service_allow: Vec<String>,
358 service_block: Vec<String>,
359 request_timeout: Duration,
360 max_concurrent_requests: usize,
361 graceful_token: CancellationToken,
362 crash_token: CancellationToken,
363) -> ((Vec<String>, Vec<String>), (u16, u16)) {
364 let (
365 (vec_allow_peers, successful_allow),
366 (vec_block_peers, successful_block),
367 ) = tokio::join!(
368 request_peer_lists(
369 client.clone(),
370 service_allow,
371 request_timeout,
372 max_concurrent_requests,
373 graceful_token.clone(),
374 crash_token.clone(),
375 "allow"
376 ),
377 request_peer_lists(
378 client,
379 service_block,
380 request_timeout,
381 max_concurrent_requests,
382 graceful_token.clone(),
383 crash_token.clone(),
384 "block"
385 )
386 );
387
388 (
389 (vec_allow_peers, vec_block_peers),
390 (successful_allow, successful_block),
391 )
392}
393
394pub fn convert_boot_nodes(
396 boot_nodes: &[RoutingNode],
397) -> HashMap<PeerId, Vec<Multiaddr>> {
398 let mut boot_nodes_aux = HashMap::new();
399
400 for node in boot_nodes {
401 let Ok(peer) = bs58::decode(node.peer_id.clone()).into_vec() else {
402 continue;
403 };
404
405 let Ok(peer) = PeerId::from_bytes(peer.as_slice()) else {
406 continue;
407 };
408
409 let mut aux_addrs = vec![];
410 for addr in node.address.iter() {
411 let Ok(addr) = Multiaddr::from_str(addr) else {
412 continue;
413 };
414
415 aux_addrs.push(addr);
416 }
417
418 if !aux_addrs.is_empty() {
419 boot_nodes_aux.insert(peer, aux_addrs);
420 }
421 }
422
423 boot_nodes_aux
424}
425
426pub fn convert_addresses(
428 addresses: &[String],
429) -> Result<HashSet<Multiaddr>, Error> {
430 let mut addrs = HashSet::new();
431 for address in addresses {
432 if let Some(value) = multiaddr(address) {
433 addrs.insert(value);
434 } else {
435 return Err(Error::InvalidAddress(address.clone()));
436 }
437 }
438 Ok(addrs)
439}
440
441fn multiaddr(addr: &str) -> Option<Multiaddr> {
443 addr.parse::<Multiaddr>().ok()
444}
445
446#[cfg(not(any(test, feature = "test")))]
453pub fn is_global(addr: &Multiaddr) -> bool {
454 addr.iter().any(|p| match p {
455 Protocol::Ip4(ip) => IpNetwork::from(ip).is_global(),
456 Protocol::Ip6(ip) => IpNetwork::from(ip).is_global(),
457 _ => false,
458 })
459}
460
461#[cfg(not(any(test, feature = "test")))]
462pub fn is_private(addr: &Multiaddr) -> bool {
463 addr.iter().any(|p| match p {
464 Protocol::Ip4(ip) => ip.is_private(),
465 Protocol::Ip6(ip) => ip.is_unique_local(),
466 _ => false,
467 })
468}
469
470#[cfg(not(any(test, feature = "test")))]
471pub fn is_loop_back(addr: &Multiaddr) -> bool {
472 addr.iter().any(|p| match p {
473 Protocol::Ip4(ip) => ip.is_loopback(),
474 Protocol::Ip6(ip) => ip.is_loopback(),
475 _ => false,
476 })
477}
478
479#[cfg(not(any(test, feature = "test")))]
480pub fn is_dns(addr: &Multiaddr) -> bool {
481 addr.iter().any(|p| {
482 matches!(p, Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_))
483 })
484}
485
486#[cfg(not(any(test, feature = "test")))]
488pub fn is_tcp(addr: &Multiaddr) -> bool {
489 addr.iter().any(|p| matches!(p, Protocol::Tcp(_)))
490}
491
492#[derive(Debug, Clone, Deserialize)]
494#[serde(default)]
495pub struct ReqResConfig {
496 #[serde(deserialize_with = "deserialize_duration_secs")]
498 pub message_timeout: Duration,
499 pub max_concurrent_streams: usize,
501}
502
503fn deserialize_duration_secs<'de, D>(
504 deserializer: D,
505) -> Result<Duration, D::Error>
506where
507 D: Deserializer<'de>,
508{
509 let u: u64 = u64::deserialize(deserializer)?;
510 Ok(Duration::from_secs(u))
511}
512
513impl ReqResConfig {
514 pub const fn new(
516 message_timeout: Duration,
517 max_concurrent_streams: usize,
518 ) -> Self {
519 Self {
520 message_timeout,
521 max_concurrent_streams,
522 }
523 }
524}
525
526impl Default for ReqResConfig {
527 fn default() -> Self {
528 Self {
529 message_timeout: Duration::from_secs(10),
530 max_concurrent_streams: 100,
531 }
532 }
533}
534
535impl ReqResConfig {
536 pub const fn with_message_timeout(mut self, timeout: Duration) -> Self {
538 self.message_timeout = timeout;
539 self
540 }
541
542 pub const fn with_max_concurrent_streams(
544 mut self,
545 num_streams: usize,
546 ) -> Self {
547 self.max_concurrent_streams = num_streams;
548 self
549 }
550
551 pub const fn get_message_timeout(&self) -> Duration {
553 self.message_timeout
554 }
555
556 pub const fn get_max_concurrent_streams(&self) -> usize {
558 self.max_concurrent_streams
559 }
560}