use crate::{Error, routing::RoutingNode};
use bytes::Bytes;
use futures::{StreamExt, stream};
use libp2p::{
Multiaddr, PeerId,
identity::{self},
multihash::Multihash,
swarm::ConnectionId,
};
use serde::{Deserialize, Deserializer, Serialize};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use std::{
cmp::Ordering,
collections::{HashMap, HashSet, VecDeque},
str::FromStr,
time::Duration,
};
#[cfg(not(any(test, feature = "test")))]
use ip_network::IpNetwork;
#[cfg(not(any(test, feature = "test")))]
use libp2p::multiaddr::Protocol;
const TARGET: &str = "ave::network::utils";
pub const NOISE_PROTOCOL: &str = "ave-p2p-v1";
pub const REQRES_PROTOCOL: &str = "/ave/reqres/1.0.0";
pub const ROUTING_PROTOCOL: &str = "/ave/routing/1.0.0";
pub const IDENTIFY_PROTOCOL: &str = "/ave/1.0.0";
pub const USER_AGENT: &str = "ave/0.8.0";
pub const MAX_APP_MESSAGE_BYTES: usize = 1024 * 1024; pub const DEFAULT_MAX_PENDING_OUTBOUND_BYTES_PER_PEER: usize = 8 * 1024 * 1024; pub const DEFAULT_MAX_PENDING_INBOUND_BYTES_PER_PEER: usize = 8 * 1024 * 1024; pub const DEFAULT_MAX_PENDING_OUTBOUND_BYTES_TOTAL: usize = 0; pub const DEFAULT_MAX_PENDING_INBOUND_BYTES_TOTAL: usize = 0;
#[derive(Debug, thiserror::Error)]
pub enum PeerIdToEd25519Error {
#[error(
"peer id is not an identity multihash (public key is not recoverable)"
)]
NotIdentityMultihash,
#[error("multihash digest is empty or invalid")]
InvalidDigest,
#[error("failed to decode protobuf-encoded public key: {0}")]
Protobuf(#[from] identity::DecodingError),
#[error("public key is not ed25519: {0}")]
NotEd25519(#[from] identity::OtherVariantError),
}
pub fn peer_id_to_ed25519_pubkey_bytes(
peer_id: &PeerId,
) -> Result<[u8; 32], PeerIdToEd25519Error> {
let mh: &Multihash<64> = peer_id.as_ref();
if mh.code() != 0x00 {
return Err(PeerIdToEd25519Error::NotIdentityMultihash);
}
let digest = mh.digest();
if digest.is_empty() {
return Err(PeerIdToEd25519Error::InvalidDigest);
}
let pk = identity::PublicKey::try_decode_protobuf(digest)?;
let ed_pk = pk.try_into_ed25519()?;
Ok(ed_pk.to_bytes())
}
#[derive(Clone)]
pub struct LimitsConfig {
pub yamux_max_num_streams: usize,
#[cfg_attr(any(test, feature = "test"), allow(dead_code))]
pub tcp_listen_backlog: u32,
#[cfg_attr(any(test, feature = "test"), allow(dead_code))]
pub tcp_nodelay: bool,
pub reqres_max_concurrent_streams: usize,
pub reqres_request_timeout: u64,
pub identify_cache: usize,
pub kademlia_query_timeout: u64,
pub conn_limmits_max_pending_incoming: Option<u32>,
pub conn_limmits_max_pending_outgoing: Option<u32>,
pub conn_limmits_max_established_incoming: Option<u32>,
pub conn_limmits_max_established_outgoing: Option<u32>,
pub conn_limmits_max_established_per_peer: Option<u32>,
pub conn_limmits_max_established_total: Option<u32>,
}
impl LimitsConfig {
pub fn build(ram_mb: u64, cpu_cores: usize) -> Self {
let cores = cpu_cores.max(1);
let budget_bytes = ram_mb * 1024 * 1024 * 10 / 100;
let bytes_per_conn: u64 = 50 * 1024;
let max_total =
((budget_bytes / bytes_per_conn) as u32).clamp(50, 9_000);
let max_incoming = (max_total * 80 / 100).clamp(30, 8_000);
let max_outgoing = (max_total * 20 / 100).clamp(20, 1_000);
let pending_incoming = (max_incoming / 10)
.max(10)
.min((cores as u32) * 64)
.min(512);
let pending_outgoing = (max_outgoing / 4).clamp(20, 128);
let reqres_streams = (cores * 512).clamp(64, 4_096);
let yamux_streams = (reqres_streams + 64).clamp(256, 8_192);
let tcp_backlog = (max_incoming / 8).clamp(128, 8_192);
let identify_cache = ((max_total / 4) as usize).min(1_024);
Self {
yamux_max_num_streams: yamux_streams,
tcp_listen_backlog: tcp_backlog,
tcp_nodelay: true,
reqres_max_concurrent_streams: reqres_streams,
reqres_request_timeout: 30,
identify_cache,
kademlia_query_timeout: 25,
conn_limmits_max_pending_incoming: Some(pending_incoming),
conn_limmits_max_pending_outgoing: Some(pending_outgoing),
conn_limmits_max_established_incoming: Some(max_incoming),
conn_limmits_max_established_outgoing: Some(max_outgoing),
conn_limmits_max_established_per_peer: Some(2),
conn_limmits_max_established_total: Some(max_total),
}
}
}
pub enum ScheduleType {
Discover,
Dial(Vec<Multiaddr>),
}
#[derive(Copy, Clone, Debug)]
pub enum Action {
Discover,
Dial,
Identified(ConnectionId),
}
impl From<RetryKind> for Action {
fn from(value: RetryKind) -> Self {
match value {
RetryKind::Discover => Self::Discover,
RetryKind::Dial => Self::Dial,
}
}
}
#[derive(Copy, Clone, Debug)]
pub enum RetryKind {
Discover,
Dial,
}
#[derive(Clone, Debug)]
pub struct RetryState {
pub attempts: u8,
pub when: Instant,
pub kind: RetryKind,
pub addrs: Vec<Multiaddr>,
}
#[derive(Eq, Clone, Debug)]
pub struct Due(pub PeerId, pub Instant);
impl PartialEq for Due {
fn eq(&self, o: &Self) -> bool {
self.1.eq(&o.1)
}
}
impl Ord for Due {
fn cmp(&self, o: &Self) -> Ordering {
o.1.cmp(&self.1)
}
}
impl PartialOrd for Due {
fn partial_cmp(&self, o: &Self) -> Option<Ordering> {
Some(self.cmp(o))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum NetworkState {
Start,
Dial,
Dialing,
Running,
Disconnected,
}
pub enum MessagesHelper {
Single(Bytes),
Vec(VecDeque<Bytes>),
}
async fn request_peer_list(
client: reqwest::Client,
service: String,
request_timeout: Duration,
graceful_token: CancellationToken,
crash_token: CancellationToken,
list_kind: &'static str,
) -> Option<Vec<String>> {
let response = tokio::select! {
_ = graceful_token.clone().cancelled_owned() => return None,
_ = crash_token.clone().cancelled_owned() => return None,
response = client.get(&service).timeout(request_timeout).send() => response,
};
match response {
Ok(res) => {
if !res.status().is_success() {
warn!(
target: TARGET,
list_kind = list_kind,
url = service,
status = %res.status(),
"control-list service returned error status"
);
return None;
}
let peers = tokio::select! {
_ = graceful_token.clone().cancelled_owned() => return None,
_ = crash_token.clone().cancelled_owned() => return None,
peers = res.json::<Vec<String>>() => peers,
};
match peers {
Ok(peers) => Some(peers),
Err(e) => {
warn!(
target: TARGET,
list_kind = list_kind,
url = service,
error = %e,
"control-list service returned unexpected body"
);
None
}
}
}
Err(e) => {
if e.is_timeout() {
warn!(
target: TARGET,
list_kind = list_kind,
url = service,
timeout_secs = request_timeout.as_secs_f64(),
"control-list service timed out"
);
} else {
warn!(
target: TARGET,
list_kind = list_kind,
url = service,
error = %e,
"control-list service unreachable"
);
}
None
}
}
}
async fn request_peer_lists(
client: reqwest::Client,
services: Vec<String>,
request_timeout: Duration,
max_concurrent_requests: usize,
graceful_token: CancellationToken,
crash_token: CancellationToken,
list_kind: &'static str,
) -> (Vec<String>, u16) {
if services.is_empty()
|| graceful_token.is_cancelled()
|| crash_token.is_cancelled()
{
return (vec![], 0);
}
let responses = stream::iter(services.into_iter().map(|service| {
let client = client.clone();
let graceful_token = graceful_token.clone();
let crash_token = crash_token.clone();
async move {
request_peer_list(
client.clone(),
service,
request_timeout,
graceful_token,
crash_token,
list_kind,
)
.await
}
}))
.buffer_unordered(max_concurrent_requests.max(1))
.collect::<Vec<Option<Vec<String>>>>()
.await;
let mut peers = Vec::new();
let mut successful = 0u16;
for item in responses.into_iter().flatten() {
peers.extend(item);
successful = successful.saturating_add(1);
}
(peers, successful)
}
pub async fn request_update_lists(
client: reqwest::Client,
service_allow: Vec<String>,
service_block: Vec<String>,
request_timeout: Duration,
max_concurrent_requests: usize,
graceful_token: CancellationToken,
crash_token: CancellationToken,
) -> ((Vec<String>, Vec<String>), (u16, u16)) {
let (
(vec_allow_peers, successful_allow),
(vec_block_peers, successful_block),
) = tokio::join!(
request_peer_lists(
client.clone(),
service_allow,
request_timeout,
max_concurrent_requests,
graceful_token.clone(),
crash_token.clone(),
"allow"
),
request_peer_lists(
client,
service_block,
request_timeout,
max_concurrent_requests,
graceful_token.clone(),
crash_token.clone(),
"block"
)
);
(
(vec_allow_peers, vec_block_peers),
(successful_allow, successful_block),
)
}
pub fn convert_boot_nodes(
boot_nodes: &[RoutingNode],
) -> HashMap<PeerId, Vec<Multiaddr>> {
let mut boot_nodes_aux = HashMap::new();
for node in boot_nodes {
let Ok(peer) = bs58::decode(node.peer_id.clone()).into_vec() else {
continue;
};
let Ok(peer) = PeerId::from_bytes(peer.as_slice()) else {
continue;
};
let mut aux_addrs = vec![];
for addr in node.address.iter() {
let Ok(addr) = Multiaddr::from_str(addr) else {
continue;
};
aux_addrs.push(addr);
}
if !aux_addrs.is_empty() {
boot_nodes_aux.insert(peer, aux_addrs);
}
}
boot_nodes_aux
}
pub fn convert_addresses(
addresses: &[String],
) -> Result<HashSet<Multiaddr>, Error> {
let mut addrs = HashSet::new();
for address in addresses {
if let Some(value) = multiaddr(address) {
addrs.insert(value);
} else {
return Err(Error::InvalidAddress(address.clone()));
}
}
Ok(addrs)
}
fn multiaddr(addr: &str) -> Option<Multiaddr> {
addr.parse::<Multiaddr>().ok()
}
#[cfg(not(any(test, feature = "test")))]
pub fn is_global(addr: &Multiaddr) -> bool {
addr.iter().any(|p| match p {
Protocol::Ip4(ip) => IpNetwork::from(ip).is_global(),
Protocol::Ip6(ip) => IpNetwork::from(ip).is_global(),
_ => false,
})
}
#[cfg(not(any(test, feature = "test")))]
pub fn is_private(addr: &Multiaddr) -> bool {
addr.iter().any(|p| match p {
Protocol::Ip4(ip) => ip.is_private(),
Protocol::Ip6(ip) => ip.is_unique_local(),
_ => false,
})
}
#[cfg(not(any(test, feature = "test")))]
pub fn is_loop_back(addr: &Multiaddr) -> bool {
addr.iter().any(|p| match p {
Protocol::Ip4(ip) => ip.is_loopback(),
Protocol::Ip6(ip) => ip.is_loopback(),
_ => false,
})
}
#[cfg(not(any(test, feature = "test")))]
pub fn is_dns(addr: &Multiaddr) -> bool {
addr.iter().any(|p| {
matches!(p, Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_))
})
}
#[cfg(not(any(test, feature = "test")))]
pub fn is_tcp(addr: &Multiaddr) -> bool {
addr.iter().any(|p| matches!(p, Protocol::Tcp(_)))
}
#[derive(Debug, Clone, Deserialize)]
#[serde(default)]
pub struct ReqResConfig {
#[serde(deserialize_with = "deserialize_duration_secs")]
pub message_timeout: Duration,
pub max_concurrent_streams: usize,
}
fn deserialize_duration_secs<'de, D>(
deserializer: D,
) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let u: u64 = u64::deserialize(deserializer)?;
Ok(Duration::from_secs(u))
}
impl ReqResConfig {
pub const fn new(
message_timeout: Duration,
max_concurrent_streams: usize,
) -> Self {
Self {
message_timeout,
max_concurrent_streams,
}
}
}
impl Default for ReqResConfig {
fn default() -> Self {
Self {
message_timeout: Duration::from_secs(10),
max_concurrent_streams: 100,
}
}
}
impl ReqResConfig {
pub const fn with_message_timeout(mut self, timeout: Duration) -> Self {
self.message_timeout = timeout;
self
}
pub const fn with_max_concurrent_streams(
mut self,
num_streams: usize,
) -> Self {
self.max_concurrent_streams = num_streams;
self
}
pub const fn get_message_timeout(&self) -> Duration {
self.message_timeout
}
pub const fn get_max_concurrent_streams(&self) -> usize {
self.max_concurrent_streams
}
}