use crate::{
backend,
metrics::METRICS,
rlpx::{connection::server::PeerConnection, p2p::Capability},
types::{Node, NodeRecord},
utils::distance,
};
use bytes::Bytes;
use ethrex_common::{H256, U256};
use ethrex_storage::Store;
use indexmap::IndexMap;
use rand::distributions::WeightedIndex;
use rand::prelude::Distribution;
use rand::seq::{IteratorRandom, SliceRandom};
use rustc_hash::{FxHashMap, FxHashSet};
use spawned_concurrency::{
actor,
error::ActorError,
protocol,
tasks::{Actor, ActorRef, ActorStart as _, Context, Handler, Response, send_message_on},
};
use std::{
net::IpAddr,
time::{Duration, Instant},
};
const MAX_SCORE: i64 = 50;
const MIN_SCORE: i64 = -50;
const MIN_SCORE_CRITICAL: i64 = MIN_SCORE * 3;
const SCORE_WEIGHT: i64 = 1;
const REQUESTS_WEIGHT: i64 = 1;
const MAX_CONCURRENT_REQUESTS_PER_PEER: i64 = 100;
pub const TARGET_PEERS: usize = 100;
pub(crate) const MAX_NODES_IN_NEIGHBORS_PACKET: usize = 16;
const MAX_ENRS_PER_FINDNODE_RESPONSE: usize = 16;
const NUMBER_OF_BUCKETS: usize = 256;
pub const MAX_NODES_PER_BUCKET: usize = 16;
const MAX_REPLACEMENTS_PER_BUCKET: usize = 10;
const MAX_CONNECTION_POOL_SIZE: usize = 10_000;
#[derive(Debug, Clone, Default)]
pub struct KBucket {
pub(crate) contacts: Vec<(H256, Contact)>,
pub(crate) replacements: Vec<(H256, Contact)>,
}
impl KBucket {
fn get(&self, node_id: &H256) -> Option<&Contact> {
self.contacts
.iter()
.find(|(id, _)| id == node_id)
.map(|(_, c)| c)
}
fn get_any(&self, node_id: &H256) -> Option<&Contact> {
self.get(node_id).or_else(|| {
self.replacements
.iter()
.find(|(id, _)| id == node_id)
.map(|(_, c)| c)
})
}
fn get_mut(&mut self, node_id: &H256) -> Option<&mut Contact> {
if let Some((_, c)) = self.contacts.iter_mut().find(|(id, _)| id == node_id) {
return Some(c);
}
self.replacements
.iter_mut()
.find(|(id, _)| id == node_id)
.map(|(_, c)| c)
}
fn contains(&self, node_id: &H256) -> bool {
self.contacts.iter().any(|(id, _)| id == node_id)
|| self.replacements.iter().any(|(id, _)| id == node_id)
}
fn insert(&mut self, node_id: H256, contact: Contact) -> bool {
if self.contacts.len() < MAX_NODES_PER_BUCKET {
self.contacts.push((node_id, contact));
true
} else {
self.insert_replacement(node_id, contact);
false
}
}
fn insert_replacement(&mut self, node_id: H256, contact: Contact) {
if self.replacements.len() >= MAX_REPLACEMENTS_PER_BUCKET {
self.replacements.remove(0);
}
self.replacements.push((node_id, contact));
}
fn remove_and_promote(&mut self, node_id: &H256) -> Option<H256> {
let idx = self.contacts.iter().position(|(id, _)| id == node_id)?;
self.contacts.remove(idx);
if !self.replacements.is_empty() {
let (replacement_id, replacement) = self.replacements.remove(0);
self.contacts.push((replacement_id, replacement));
Some(replacement_id)
} else {
None
}
}
}
fn bucket_index(local_node_id: &H256, node_id: &H256) -> Option<usize> {
let xor = *local_node_id ^ *node_id;
let dist = U256::from_big_endian(xor.as_bytes());
if dist.is_zero() {
None
} else {
Some(dist.bits() - 1)
}
}
pub(crate) fn xor_distance(a: &H256, b: &H256) -> H256 {
*a ^ *b
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DiscoveryProtocol {
Discv4,
Discv5,
}
pub use crate::discv5::session::Session;
#[derive(Debug, Clone)]
pub struct Contact {
pub node: Node,
pub is_discv4: bool,
pub is_discv5: bool,
pub validation_timestamp: Option<Instant>,
pub ping_id: Option<Bytes>,
pub enr_request_hash: Option<H256>,
pub record: Option<NodeRecord>,
pub disposable: bool,
pub knows_us: bool,
pub unwanted: bool,
pub is_fork_id_valid: Option<bool>,
session: Option<Session>,
}
impl Contact {
pub fn was_validated(&self) -> bool {
self.validation_timestamp.is_some() && !self.has_pending_ping()
}
pub fn has_pending_ping(&self) -> bool {
self.ping_id.is_some()
}
pub fn record_ping_sent(&mut self, ping_id: Bytes) {
self.validation_timestamp = Some(Instant::now());
self.ping_id = Some(ping_id);
}
pub fn record_enr_request_sent(&mut self, request_hash: H256) {
self.enr_request_hash = Some(request_hash);
}
pub fn record_enr_response_received(&mut self, request_hash: H256, record: NodeRecord) {
if self
.enr_request_hash
.take_if(|h| *h == request_hash)
.is_some()
{
self.record = Some(record);
}
}
pub fn has_pending_enr_request(&self) -> bool {
self.enr_request_hash.is_some()
}
}
impl Contact {
pub fn new(node: Node, protocol: DiscoveryProtocol) -> Self {
Self {
node,
is_discv4: protocol == DiscoveryProtocol::Discv4,
is_discv5: protocol == DiscoveryProtocol::Discv5,
validation_timestamp: None,
ping_id: None,
enr_request_hash: None,
record: None,
disposable: false,
knows_us: true,
unwanted: false,
is_fork_id_valid: None,
session: None,
}
}
pub fn supports_protocol(&self, protocol: DiscoveryProtocol) -> bool {
match protocol {
DiscoveryProtocol::Discv4 => self.is_discv4,
DiscoveryProtocol::Discv5 => self.is_discv5,
}
}
pub fn add_protocol(&mut self, protocol: DiscoveryProtocol) {
match protocol {
DiscoveryProtocol::Discv4 => self.is_discv4 = true,
DiscoveryProtocol::Discv5 => self.is_discv5 = true,
}
}
}
#[derive(Debug, Clone)]
pub struct PeerData {
pub node: Node,
pub record: Option<NodeRecord>,
pub supported_capabilities: Vec<Capability>,
pub is_connection_inbound: bool,
pub connection: Option<PeerConnection>,
score: i64,
requests: i64,
pub last_response_time: Option<u64>,
}
impl PeerData {
pub fn new(
node: Node,
record: Option<NodeRecord>,
connection: Option<PeerConnection>,
capabilities: Vec<Capability>,
) -> Self {
Self {
node,
record,
supported_capabilities: capabilities,
is_connection_inbound: false,
connection,
score: Default::default(),
requests: Default::default(),
last_response_time: None,
}
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct PeerDiagnostics {
pub peer_id: H256,
pub score: i64,
pub inflight_requests: i64,
pub eligible: bool,
pub capabilities: Vec<String>,
pub ip: IpAddr,
pub client_version: String,
pub connection_direction: String,
pub last_response_time: Option<u64>,
}
#[derive(Debug, Clone)]
pub enum ContactValidation {
Valid(Box<Contact>),
InvalidContact,
UnknownContact,
IpMismatch,
}
#[must_use = "dropping this permit immediately releases the peer's request slot"]
pub struct RequestPermit {
peer_table: PeerTable,
peer_id: H256,
}
impl RequestPermit {
pub(crate) fn new(peer_table: PeerTable, peer_id: H256) -> Self {
Self {
peer_table,
peer_id,
}
}
}
impl std::fmt::Debug for RequestPermit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RequestPermit")
.field("peer_id", &self.peer_id)
.finish_non_exhaustive()
}
}
impl Drop for RequestPermit {
fn drop(&mut self) {
let _ = self.peer_table.dec_requests(self.peer_id);
}
}
#[protocol]
pub trait PeerTableServerProtocol: Send + Sync {
fn new_contacts(&self, nodes: Vec<Node>, protocol: DiscoveryProtocol)
-> Result<(), ActorError>;
fn new_contact_records(&self, node_records: Vec<NodeRecord>) -> Result<(), ActorError>;
fn new_connected_peer(
&self,
node: Node,
connection: PeerConnection,
capabilities: Vec<Capability>,
) -> Result<(), ActorError>;
fn set_session_info(&self, node_id: H256, session: Session) -> Result<(), ActorError>;
fn remove_peer(&self, node_id: H256) -> Result<(), ActorError>;
fn dec_requests(&self, node_id: H256) -> Result<(), ActorError>;
fn set_unwanted(&self, node_id: H256) -> Result<(), ActorError>;
fn set_is_fork_id_valid(&self, node_id: H256, valid: bool) -> Result<(), ActorError>;
fn record_success(&self, node_id: H256) -> Result<(), ActorError>;
fn record_failure(&self, node_id: H256) -> Result<(), ActorError>;
fn record_critical_failure(&self, node_id: H256) -> Result<(), ActorError>;
fn record_ping_sent(&self, node_id: H256, ping_id: Bytes) -> Result<(), ActorError>;
fn record_pong_received(&self, node_id: H256, ping_id: Bytes) -> Result<(), ActorError>;
fn record_enr_request_sent(&self, node_id: H256, request_hash: H256) -> Result<(), ActorError>;
fn record_enr_response_received(
&self,
node_id: H256,
request_hash: H256,
record: NodeRecord,
) -> Result<(), ActorError>;
fn set_disposable(&self, node_id: H256) -> Result<(), ActorError>;
fn mark_knows_us(&self, node_id: H256) -> Result<(), ActorError>;
fn prune_table(&self) -> Result<(), ActorError>;
fn shutdown(&self) -> Result<(), ActorError>;
fn peer_count(&self) -> Response<usize>;
fn peer_count_by_capabilities(&self, capabilities: Vec<Capability>) -> Response<usize>;
fn target_reached(&self) -> Response<bool>;
fn target_peers_reached(&self) -> Response<bool>;
fn target_peers_completion(&self) -> Response<f64>;
fn get_contact_to_initiate(&self) -> Response<Option<Box<Contact>>>;
fn get_contact_for_enr_lookup(&self) -> Response<Option<Box<Contact>>>;
fn get_closest_from_pool(&self, target: H256, count: usize) -> Response<Vec<(H256, Node)>>;
fn get_contact(&self, node_id: H256) -> Response<Option<Box<Contact>>>;
fn get_contact_to_revalidate(
&self,
revalidation_interval: Duration,
protocol: DiscoveryProtocol,
) -> Response<Option<Box<Contact>>>;
fn get_best_peer(
&self,
capabilities: Vec<Capability>,
) -> Response<Option<(H256, PeerConnection, RequestPermit)>>;
fn get_best_peer_excluding(
&self,
capabilities: Vec<Capability>,
excluded: Vec<H256>,
) -> Response<Option<(H256, PeerConnection, RequestPermit)>>;
fn get_best_n_peers(
&self,
capabilities: Vec<Capability>,
n: usize,
) -> Response<Vec<(H256, PeerConnection, RequestPermit)>>;
fn has_eligible_peer(&self, capabilities: Vec<Capability>) -> Response<bool>;
fn get_score(&self, node_id: H256) -> Response<i64>;
fn get_connected_nodes(&self) -> Response<Vec<Node>>;
fn get_peers_with_capabilities(&self)
-> Response<Vec<(H256, PeerConnection, Vec<Capability>)>>;
fn insert_if_new(&self, node: Node, protocol: DiscoveryProtocol) -> Response<bool>;
fn validate_contact(&self, node_id: H256, sender_ip: IpAddr) -> Response<ContactValidation>;
fn get_closest_nodes(&self, node_id: H256) -> Response<Vec<Node>>;
fn get_nodes_at_distances(&self, distances: Vec<u32>) -> Response<Vec<NodeRecord>>;
fn get_peers_data(&self) -> Response<Vec<PeerData>>;
fn get_random_peer(
&self,
capabilities: Vec<Capability>,
) -> Response<Option<(H256, PeerConnection, RequestPermit)>>;
fn get_session_info(&self, node_id: H256) -> Response<Option<Session>>;
fn get_peer_diagnostics(&self) -> Response<Vec<PeerDiagnostics>>;
fn get_peer_connection(&self, peer_id: H256) -> Response<Option<PeerConnection>>;
}
#[derive(Debug)]
pub struct PeerTableServer {
local_node_id: H256,
buckets: Vec<KBucket>,
peers: IndexMap<H256, PeerData>,
already_tried_peers: FxHashSet<H256>,
target_peers: usize,
store: Store,
sessions: FxHashMap<H256, Session>,
connection_pool: IndexMap<H256, Node>,
}
#[actor(protocol = PeerTableServerProtocol)]
impl PeerTableServer {
pub fn spawn(local_node_id: H256, target_peers: usize, store: Store) -> PeerTable {
PeerTableServer::new(local_node_id, target_peers, store).start()
}
pub(crate) fn new(local_node_id: H256, target_peers: usize, store: Store) -> Self {
Self {
local_node_id,
buckets: vec![KBucket::default(); NUMBER_OF_BUCKETS],
peers: Default::default(),
already_tried_peers: Default::default(),
target_peers,
store,
sessions: Default::default(),
connection_pool: IndexMap::with_capacity(MAX_CONNECTION_POOL_SIZE),
}
}
#[started]
async fn started(&mut self, ctx: &Context<Self>) {
send_message_on(
ctx.clone(),
tokio::signal::ctrl_c(),
peer_table_server_protocol::Shutdown,
);
}
#[send_handler]
async fn handle_new_contacts(
&mut self,
msg: peer_table_server_protocol::NewContacts,
_ctx: &Context<Self>,
) {
self.do_new_contacts(msg.nodes, msg.protocol).await;
}
#[send_handler]
async fn handle_new_contact_records(
&mut self,
msg: peer_table_server_protocol::NewContactRecords,
_ctx: &Context<Self>,
) {
self.do_new_contact_records(msg.node_records).await;
}
#[send_handler]
async fn handle_new_connected_peer(
&mut self,
msg: peer_table_server_protocol::NewConnectedPeer,
_ctx: &Context<Self>,
) {
let new_peer_id = msg.node.node_id();
let new_peer = PeerData::new(msg.node, None, Some(msg.connection), msg.capabilities);
self.peers.insert(new_peer_id, new_peer);
}
#[send_handler]
async fn handle_set_session_info(
&mut self,
msg: peer_table_server_protocol::SetSessionInfo,
_ctx: &Context<Self>,
) {
self.sessions.insert(msg.node_id, msg.session.clone());
if let Some(contact) = self.get_contact_mut(&msg.node_id) {
contact.session = Some(msg.session);
}
}
#[send_handler]
async fn handle_remove_peer(
&mut self,
msg: peer_table_server_protocol::RemovePeer,
_ctx: &Context<Self>,
) {
self.peers.swap_remove(&msg.node_id);
}
#[send_handler]
async fn handle_dec_requests(
&mut self,
msg: peer_table_server_protocol::DecRequests,
_ctx: &Context<Self>,
) {
self.peers.entry(msg.node_id).and_modify(|peer_data| {
if peer_data.requests <= 0 {
tracing::debug!(
peer_id = ?msg.node_id,
requests = peer_data.requests,
"dec_requests with counter already <= 0",
);
}
peer_data.requests = peer_data.requests.saturating_sub(1).max(0)
});
}
#[send_handler]
async fn handle_set_unwanted(
&mut self,
msg: peer_table_server_protocol::SetUnwanted,
_ctx: &Context<Self>,
) {
if let Some(contact) = self.get_contact_mut(&msg.node_id) {
contact.unwanted = true;
}
}
#[send_handler]
async fn handle_set_is_fork_id_valid(
&mut self,
msg: peer_table_server_protocol::SetIsForkIdValid,
_ctx: &Context<Self>,
) {
if let Some(contact) = self.get_contact_mut(&msg.node_id) {
contact.is_fork_id_valid = Some(msg.valid);
}
}
#[send_handler]
async fn handle_record_success(
&mut self,
msg: peer_table_server_protocol::RecordSuccess,
_ctx: &Context<Self>,
) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.peers.entry(msg.node_id).and_modify(|peer_data| {
peer_data.score = (peer_data.score + 1).min(MAX_SCORE);
peer_data.last_response_time = Some(now);
});
}
#[send_handler]
async fn handle_record_failure(
&mut self,
msg: peer_table_server_protocol::RecordFailure,
_ctx: &Context<Self>,
) {
self.peers
.entry(msg.node_id)
.and_modify(|peer_data| peer_data.score = (peer_data.score - 1).max(MIN_SCORE));
}
#[send_handler]
async fn handle_record_critical_failure(
&mut self,
msg: peer_table_server_protocol::RecordCriticalFailure,
_ctx: &Context<Self>,
) {
self.peers
.entry(msg.node_id)
.and_modify(|peer_data| peer_data.score = MIN_SCORE_CRITICAL);
}
#[send_handler]
async fn handle_record_ping_sent(
&mut self,
msg: peer_table_server_protocol::RecordPingSent,
_ctx: &Context<Self>,
) {
if let Some(contact) = self.get_contact_mut(&msg.node_id) {
contact.record_ping_sent(msg.ping_id);
}
}
#[send_handler]
async fn handle_record_pong_received(
&mut self,
msg: peer_table_server_protocol::RecordPongReceived,
_ctx: &Context<Self>,
) {
if let Some(contact) = self.get_contact_mut(&msg.node_id)
&& contact
.ping_id
.as_ref()
.map(|value| *value == msg.ping_id)
.unwrap_or(false)
{
contact.ping_id = None;
}
}
#[send_handler]
async fn handle_record_enr_request_sent(
&mut self,
msg: peer_table_server_protocol::RecordEnrRequestSent,
_ctx: &Context<Self>,
) {
if let Some(contact) = self.get_contact_mut(&msg.node_id) {
contact.record_enr_request_sent(msg.request_hash);
}
}
#[send_handler]
async fn handle_record_enr_response_received(
&mut self,
msg: peer_table_server_protocol::RecordEnrResponseReceived,
_ctx: &Context<Self>,
) {
if let Some(contact) = self.get_contact_mut(&msg.node_id) {
contact.record_enr_response_received(msg.request_hash, msg.record);
}
}
#[send_handler]
async fn handle_set_disposable(
&mut self,
msg: peer_table_server_protocol::SetDisposable,
_ctx: &Context<Self>,
) {
if let Some(contact) = self.get_contact_mut(&msg.node_id) {
contact.disposable = true;
}
}
#[send_handler]
async fn handle_mark_knows_us(
&mut self,
msg: peer_table_server_protocol::MarkKnowsUs,
_ctx: &Context<Self>,
) {
if let Some(contact) = self.get_contact_mut(&msg.node_id) {
contact.knows_us = true;
}
}
#[send_handler]
async fn handle_prune_table(
&mut self,
_msg: peer_table_server_protocol::PruneTable,
_ctx: &Context<Self>,
) {
self.prune();
}
#[send_handler]
async fn handle_shutdown(
&mut self,
_msg: peer_table_server_protocol::Shutdown,
ctx: &Context<Self>,
) {
ctx.stop();
}
#[request_handler]
async fn handle_peer_count(
&mut self,
_msg: peer_table_server_protocol::PeerCount,
_ctx: &Context<Self>,
) -> usize {
self.peers.len()
}
#[request_handler]
async fn handle_peer_count_by_capabilities(
&mut self,
msg: peer_table_server_protocol::PeerCountByCapabilities,
_ctx: &Context<Self>,
) -> usize {
self.do_peer_count_by_capabilities(msg.capabilities)
}
#[request_handler]
async fn handle_target_reached(
&mut self,
_msg: peer_table_server_protocol::TargetReached,
_ctx: &Context<Self>,
) -> bool {
self.peers.len() >= self.target_peers
}
#[request_handler]
async fn handle_target_peers_reached(
&mut self,
_msg: peer_table_server_protocol::TargetPeersReached,
_ctx: &Context<Self>,
) -> bool {
self.peers.len() >= self.target_peers
}
#[request_handler]
async fn handle_target_peers_completion(
&mut self,
_msg: peer_table_server_protocol::TargetPeersCompletion,
_ctx: &Context<Self>,
) -> f64 {
self.peers.len() as f64 / self.target_peers as f64
}
#[request_handler]
async fn handle_get_contact_to_initiate(
&mut self,
_msg: peer_table_server_protocol::GetContactToInitiate,
_ctx: &Context<Self>,
) -> Option<Box<Contact>> {
self.do_get_contact_to_initiate().map(Box::new)
}
#[request_handler]
async fn handle_get_closest_from_pool(
&mut self,
msg: peer_table_server_protocol::GetClosestFromPool,
_ctx: &Context<Self>,
) -> Vec<(H256, Node)> {
self.do_get_closest_from_pool(msg.target, msg.count)
}
#[request_handler]
async fn handle_get_contact_for_enr_lookup(
&mut self,
_msg: peer_table_server_protocol::GetContactForEnrLookup,
_ctx: &Context<Self>,
) -> Option<Box<Contact>> {
self.do_get_contact_for_enr_lookup().map(Box::new)
}
#[request_handler]
async fn handle_get_contact(
&mut self,
msg: peer_table_server_protocol::GetContact,
_ctx: &Context<Self>,
) -> Option<Box<Contact>> {
self.get_contact(&msg.node_id).cloned().map(Box::new)
}
#[request_handler]
async fn handle_get_contact_to_revalidate(
&mut self,
msg: peer_table_server_protocol::GetContactToRevalidate,
_ctx: &Context<Self>,
) -> Option<Box<Contact>> {
self.do_get_contact_to_revalidate(msg.revalidation_interval, msg.protocol)
}
#[request_handler]
async fn handle_get_best_peer(
&mut self,
msg: peer_table_server_protocol::GetBestPeer,
ctx: &Context<Self>,
) -> Option<(H256, PeerConnection, RequestPermit)> {
let (peer_id, conn) = self.do_get_best_peer(&msg.capabilities)?;
self.peers
.get_mut(&peer_id)
.expect("peer returned by do_get_best_peer must be present in self.peers")
.requests += 1;
Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id)))
}
#[request_handler]
async fn handle_get_best_peer_excluding(
&mut self,
msg: peer_table_server_protocol::GetBestPeerExcluding,
ctx: &Context<Self>,
) -> Option<(H256, PeerConnection, RequestPermit)> {
let (peer_id, conn) = self.do_get_best_peer_excluding(&msg.capabilities, &msg.excluded)?;
self.peers
.get_mut(&peer_id)
.expect("peer returned by do_get_best_peer_excluding must be present in self.peers")
.requests += 1;
Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id)))
}
#[request_handler]
async fn handle_get_best_n_peers(
&mut self,
msg: peer_table_server_protocol::GetBestNPeers,
ctx: &Context<Self>,
) -> Vec<(H256, PeerConnection, RequestPermit)> {
let picks = self.do_get_best_n_peers(&msg.capabilities, msg.n);
let mut out = Vec::with_capacity(picks.len());
for (peer_id, conn) in picks {
self.peers
.get_mut(&peer_id)
.expect("peer returned by do_get_best_n_peers must be present in self.peers")
.requests += 1;
out.push((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id)));
}
out
}
#[request_handler]
async fn handle_has_eligible_peer(
&mut self,
msg: peer_table_server_protocol::HasEligiblePeer,
_ctx: &Context<Self>,
) -> bool {
self.peers.values().any(|peer_data| {
peer_data.connection.is_some()
&& self.can_try_more_requests(&peer_data.score, &peer_data.requests)
&& msg
.capabilities
.iter()
.any(|cap| peer_data.supported_capabilities.contains(cap))
})
}
#[request_handler]
async fn handle_get_score(
&mut self,
msg: peer_table_server_protocol::GetScore,
_ctx: &Context<Self>,
) -> i64 {
self.peers
.get(&msg.node_id)
.map(|peer_data| peer_data.score)
.unwrap_or_default()
}
#[request_handler]
async fn handle_get_connected_nodes(
&mut self,
_msg: peer_table_server_protocol::GetConnectedNodes,
_ctx: &Context<Self>,
) -> Vec<Node> {
self.peers
.values()
.map(|peer_data| peer_data.node.clone())
.collect()
}
#[request_handler]
async fn handle_get_peers_with_capabilities(
&mut self,
_msg: peer_table_server_protocol::GetPeersWithCapabilities,
_ctx: &Context<Self>,
) -> Vec<(H256, PeerConnection, Vec<Capability>)> {
self.peers
.iter()
.filter_map(|(peer_id, peer_data)| {
peer_data.connection.clone().map(|connection| {
(
*peer_id,
connection,
peer_data.supported_capabilities.clone(),
)
})
})
.collect()
}
#[request_handler]
async fn handle_insert_if_new(
&mut self,
msg: peer_table_server_protocol::InsertIfNew,
_ctx: &Context<Self>,
) -> bool {
let node_id = msg.node.node_id();
self.insert_to_connection_pool(node_id, msg.node.clone());
if self.contact_exists(&node_id) {
return false;
}
let contact = Contact::new(msg.node, msg.protocol);
self.insert_contact(node_id, contact);
METRICS.record_new_discovery().await;
true
}
#[request_handler]
async fn handle_validate_contact(
&mut self,
msg: peer_table_server_protocol::ValidateContact,
_ctx: &Context<Self>,
) -> ContactValidation {
self.do_validate_contact(msg.node_id, msg.sender_ip)
}
#[request_handler]
async fn handle_get_closest_nodes(
&mut self,
msg: peer_table_server_protocol::GetClosestNodes,
_ctx: &Context<Self>,
) -> Vec<Node> {
self.do_get_closest_nodes(msg.node_id)
}
#[request_handler]
async fn handle_get_nodes_at_distances(
&mut self,
msg: peer_table_server_protocol::GetNodesAtDistances,
_ctx: &Context<Self>,
) -> Vec<NodeRecord> {
self.do_get_nodes_at_distances(&msg.distances)
}
#[request_handler]
async fn handle_get_peers_data(
&mut self,
_msg: peer_table_server_protocol::GetPeersData,
_ctx: &Context<Self>,
) -> Vec<PeerData> {
self.peers.values().cloned().collect()
}
#[request_handler]
async fn handle_get_random_peer(
&mut self,
msg: peer_table_server_protocol::GetRandomPeer,
ctx: &Context<Self>,
) -> Option<(H256, PeerConnection, RequestPermit)> {
let (peer_id, conn) = self.do_get_random_peer(msg.capabilities)?;
self.peers
.get_mut(&peer_id)
.expect("peer returned by do_get_random_peer must be present in self.peers")
.requests += 1;
Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id)))
}
#[request_handler]
async fn handle_get_session_info(
&mut self,
msg: peer_table_server_protocol::GetSessionInfo,
_ctx: &Context<Self>,
) -> Option<Session> {
self.sessions
.get(&msg.node_id)
.cloned()
.or_else(|| self.get_contact(&msg.node_id)?.session.clone())
}
#[request_handler]
async fn handle_get_peer_connection(
&mut self,
msg: peer_table_server_protocol::GetPeerConnection,
_ctx: &Context<Self>,
) -> Option<PeerConnection> {
self.peers
.get(&msg.peer_id)
.and_then(|peer_data| peer_data.connection.clone())
}
#[request_handler]
async fn handle_get_peer_diagnostics(
&mut self,
_msg: peer_table_server_protocol::GetPeerDiagnostics,
_ctx: &Context<Self>,
) -> Vec<PeerDiagnostics> {
self.peers
.iter()
.map(|(id, peer_data)| PeerDiagnostics {
peer_id: *id,
score: peer_data.score,
inflight_requests: peer_data.requests,
eligible: self.can_try_more_requests(&peer_data.score, &peer_data.requests),
capabilities: peer_data
.supported_capabilities
.iter()
.map(|c| format!("{}/{}", c.protocol(), c.version))
.collect(),
ip: peer_data.node.ip,
client_version: peer_data.node.version.clone().unwrap_or_default(),
connection_direction: if peer_data.is_connection_inbound {
"inbound".to_string()
} else {
"outbound".to_string()
},
last_response_time: peer_data.last_response_time,
})
.collect()
}
fn bucket_for(&self, node_id: &H256) -> Option<usize> {
bucket_index(&self.local_node_id, node_id)
}
fn get_contact(&self, node_id: &H256) -> Option<&Contact> {
let idx = self.bucket_for(node_id)?;
self.buckets[idx].get_any(node_id)
}
fn get_contact_mut(&mut self, node_id: &H256) -> Option<&mut Contact> {
let idx = self.bucket_for(node_id)?;
self.buckets[idx].get_mut(node_id)
}
fn contact_exists(&self, node_id: &H256) -> bool {
let Some(idx) = self.bucket_for(node_id) else {
return false;
};
self.buckets[idx].contains(node_id)
}
fn insert_contact(&mut self, node_id: H256, contact: Contact) -> bool {
#[cfg(feature = "metrics")]
let start = std::time::Instant::now();
let Some(idx) = self.bucket_for(&node_id) else {
return false;
};
let result = self.buckets[idx].insert(node_id, contact);
#[cfg(feature = "metrics")]
{
use ethrex_metrics::p2p::METRICS_P2P;
METRICS_P2P.observe_insert_contact_duration(start.elapsed().as_secs_f64());
}
result
}
fn insert_to_connection_pool(&mut self, node_id: H256, node: Node) {
if self.connection_pool.contains_key(&node_id) {
return;
}
if self.connection_pool.len() >= MAX_CONNECTION_POOL_SIZE {
self.connection_pool.shift_remove_index(0);
}
self.connection_pool.insert(node_id, node);
}
fn get_contact_or_replacement(&self, node_id: &H256) -> Option<&Contact> {
let idx = self.bucket_for(node_id)?;
self.buckets[idx].get_any(node_id)
}
fn get_contact_or_replacement_mut(&mut self, node_id: &H256) -> Option<&mut Contact> {
let idx = self.bucket_for(node_id)?;
let bucket = &mut self.buckets[idx];
if let Some(pos) = bucket.contacts.iter().position(|(id, _)| id == node_id) {
return Some(&mut bucket.contacts[pos].1);
}
if let Some(pos) = bucket.replacements.iter().position(|(id, _)| id == node_id) {
return Some(&mut bucket.replacements[pos].1);
}
None
}
fn iter_contacts(&self) -> impl Iterator<Item = (&H256, &Contact)> {
self.buckets.iter().flat_map(|bucket| {
bucket
.contacts
.iter()
.chain(bucket.replacements.iter())
.map(|(id, c)| (id, c))
})
}
fn weight_peer(&self, score: &i64, requests: &i64) -> i64 {
score * SCORE_WEIGHT - requests * REQUESTS_WEIGHT
}
fn can_try_more_requests(&self, score: &i64, requests: &i64) -> bool {
let score_ratio = (score - MIN_SCORE) as f64 / (MAX_SCORE - MIN_SCORE) as f64;
let max_requests = (MAX_CONCURRENT_REQUESTS_PER_PEER as f64 * score_ratio).max(1.0);
(*requests as f64) < max_requests
}
fn do_get_best_peer(&self, capabilities: &[Capability]) -> Option<(H256, PeerConnection)> {
self.do_get_best_peer_excluding(capabilities, &[])
}
fn do_get_best_peer_excluding(
&self,
capabilities: &[Capability],
excluded: &[H256],
) -> Option<(H256, PeerConnection)> {
self.peers
.iter()
.filter_map(|(id, peer_data)| {
if excluded.contains(id)
|| !self.can_try_more_requests(&peer_data.score, &peer_data.requests)
|| !capabilities
.iter()
.any(|cap| peer_data.supported_capabilities.contains(cap))
{
None
} else {
let connection = peer_data.connection.clone()?;
Some((*id, peer_data.score, peer_data.requests, connection))
}
})
.max_by_key(|(_, score, reqs, _)| self.weight_peer(score, reqs))
.map(|(k, _, _, v)| (k, v))
}
fn do_get_best_n_peers(
&self,
capabilities: &[Capability],
n: usize,
) -> Vec<(H256, PeerConnection)> {
let mut candidates: Vec<(H256, i64, i64, PeerConnection)> = self
.peers
.iter()
.filter_map(|(id, peer_data)| {
if !self.can_try_more_requests(&peer_data.score, &peer_data.requests)
|| !capabilities
.iter()
.any(|cap| peer_data.supported_capabilities.contains(cap))
{
None
} else {
let connection = peer_data.connection.clone()?;
Some((*id, peer_data.score, peer_data.requests, connection))
}
})
.collect();
candidates.sort_by_key(|(_, score, reqs, _)| -self.weight_peer(score, reqs));
candidates
.into_iter()
.take(n)
.map(|(id, _, _, conn)| (id, conn))
.collect()
}
fn prune(&mut self) {
for bucket in &mut self.buckets {
let main_disposable: Vec<H256> = bucket
.contacts
.iter()
.filter(|(_, c)| c.disposable)
.map(|(id, _)| *id)
.collect();
for node_id in main_disposable {
bucket.remove_and_promote(&node_id);
}
bucket.replacements.retain(|(_, c)| !c.disposable);
}
}
fn do_get_contact_to_initiate(&mut self) -> Option<Contact> {
let pool_len = self.connection_pool.len();
if pool_len == 0 {
return None;
}
let start = rand::random::<usize>() % pool_len;
for offset in 0..pool_len {
let idx = (start + offset) % pool_len;
let Some((node_id, node)) = self.connection_pool.get_index(idx) else {
continue;
};
let node_id = *node_id;
if self.peers.contains_key(&node_id)
|| self.already_tried_peers.contains(&node_id)
|| self
.get_contact_or_replacement(&node_id)
.map(|c| !c.knows_us || c.unwanted || c.is_fork_id_valid == Some(false))
.unwrap_or(false)
{
continue;
}
let node = node.clone();
self.already_tried_peers.insert(node_id);
let contact = self
.get_contact_or_replacement(&node_id)
.cloned()
.unwrap_or_else(|| Contact::new(node, DiscoveryProtocol::Discv4));
return Some(contact);
}
tracing::trace!("Resetting list of tried peers.");
self.already_tried_peers.clear();
None
}
fn do_get_closest_from_pool(&self, target: H256, count: usize) -> Vec<(H256, Node)> {
let mut nodes: Vec<(H256, Node, H256)> = Vec::with_capacity(count);
for (node_id, node) in &self.connection_pool {
let dist = xor_distance(&target, node_id);
if nodes.len() < count {
nodes.push((*node_id, node.clone(), dist));
} else if let Some((farthest_idx, _)) =
nodes.iter().enumerate().max_by_key(|(_, (_, _, d))| *d)
&& dist < nodes[farthest_idx].2
{
nodes[farthest_idx] = (*node_id, node.clone(), dist);
}
}
nodes.sort_by(|a, b| a.2.cmp(&b.2));
nodes.into_iter().map(|(id, node, _)| (id, node)).collect()
}
fn do_get_contact_for_enr_lookup(&mut self) -> Option<Contact> {
self.iter_contacts()
.filter(|(_, c)| {
c.is_discv4
&& c.was_validated()
&& !c.has_pending_enr_request()
&& c.record.is_none()
&& !c.disposable
})
.map(|(_, c)| c)
.collect::<Vec<_>>()
.choose(&mut rand::rngs::OsRng)
.cloned()
.cloned()
}
fn do_get_contact_to_revalidate(
&self,
revalidation_interval: Duration,
protocol: DiscoveryProtocol,
) -> Option<Box<Contact>> {
self.iter_contacts()
.filter(|(_, c)| {
c.supports_protocol(protocol)
&& Self::is_validation_needed(c, revalidation_interval)
})
.map(|(_, c)| c)
.choose(&mut rand::rngs::OsRng)
.cloned()
.map(Box::new)
}
fn do_validate_contact(&self, node_id: H256, sender_ip: IpAddr) -> ContactValidation {
let Some(contact) = self.get_contact(&node_id) else {
return ContactValidation::UnknownContact;
};
if !contact.was_validated() {
return ContactValidation::InvalidContact;
}
if sender_ip != contact.node.ip {
return ContactValidation::IpMismatch;
}
ContactValidation::Valid(Box::new(contact.clone()))
}
fn do_get_closest_nodes(&self, node_id: H256) -> Vec<Node> {
#[cfg(feature = "metrics")]
let scan_start = std::time::Instant::now();
let mut nodes: Vec<(Node, H256)> = vec![];
for (contact_id, contact) in self.iter_contacts() {
let dist = xor_distance(&node_id, contact_id);
if nodes.len() < MAX_NODES_IN_NEIGHBORS_PACKET {
nodes.push((contact.node.clone(), dist));
} else if let Some((farthest_idx, _)) =
nodes.iter().enumerate().max_by_key(|(_, (_, d))| *d)
&& dist < nodes[farthest_idx].1
{
nodes[farthest_idx] = (contact.node.clone(), dist);
}
}
#[cfg(feature = "metrics")]
{
use ethrex_metrics::p2p::METRICS_P2P;
METRICS_P2P.observe_iter_contacts_duration(scan_start.elapsed().as_secs_f64());
}
nodes.into_iter().map(|(node, _)| node).collect()
}
fn do_get_nodes_at_distances(&self, distances: &[u32]) -> Vec<NodeRecord> {
self.iter_contacts()
.filter_map(|(contact_id, contact)| {
let dist = distance(&self.local_node_id, contact_id) as u32;
if distances.contains(&dist) {
contact.record.clone()
} else {
None
}
})
.take(MAX_ENRS_PER_FINDNODE_RESPONSE)
.collect()
}
async fn do_new_contacts(&mut self, nodes: Vec<Node>, protocol: DiscoveryProtocol) {
for node in nodes {
let node_id = node.node_id();
if node_id == self.local_node_id {
continue;
}
#[cfg(feature = "metrics")]
let insert_start = std::time::Instant::now();
self.insert_to_connection_pool(node_id, node.clone());
if self.contact_exists(&node_id) {
if let Some(contact) = self.get_contact_or_replacement_mut(&node_id) {
contact.add_protocol(protocol);
}
} else {
let contact = Contact::new(node, protocol);
self.insert_contact(node_id, contact);
METRICS.record_new_discovery().await;
}
#[cfg(feature = "metrics")]
{
use ethrex_metrics::p2p::METRICS_P2P;
METRICS_P2P.observe_insert_contact_duration(insert_start.elapsed().as_secs_f64());
}
}
}
async fn do_new_contact_records(&mut self, node_records: Vec<NodeRecord>) {
for node_record in node_records {
if !node_record.verify_signature() {
continue;
}
if let Ok(node) = Node::from_enr(&node_record) {
let node_id = node.node_id();
if node_id == self.local_node_id {
continue;
}
self.insert_to_connection_pool(node_id, node.clone());
if self.contact_exists(&node_id) {
let should_update = self
.get_contact_or_replacement(&node_id)
.map(|c| match c.record.as_ref() {
None => true,
Some(r) => node_record.seq > r.seq,
})
.unwrap_or(false);
let is_fork_id_valid = if should_update {
Self::evaluate_fork_id(&node_record, &self.store).await
} else {
None
};
if let Some(contact) = self.get_contact_or_replacement_mut(&node_id) {
contact.add_protocol(DiscoveryProtocol::Discv5);
if should_update {
if contact.node.ip != node.ip || contact.node.udp_port != node.udp_port
{
contact.validation_timestamp = None;
contact.ping_id = None;
}
contact.node = node;
contact.record = Some(node_record);
contact.is_fork_id_valid = is_fork_id_valid;
}
}
} else {
let is_fork_id_valid = Self::evaluate_fork_id(&node_record, &self.store).await;
let mut contact = Contact::new(node, DiscoveryProtocol::Discv5);
contact.is_fork_id_valid = is_fork_id_valid;
contact.record = Some(node_record);
self.insert_contact(node_id, contact);
METRICS.record_new_discovery().await;
}
}
}
}
async fn evaluate_fork_id(record: &NodeRecord, store: &Store) -> Option<bool> {
if let Some(remote_fork_id) = record.get_fork_id() {
backend::is_fork_id_valid(store, remote_fork_id)
.await
.ok()
.or(Some(false))
} else {
Some(false)
}
}
fn do_peer_count_by_capabilities(&self, capabilities: Vec<Capability>) -> usize {
self.peers
.values()
.filter(|peer_data| {
capabilities
.iter()
.any(|cap| peer_data.supported_capabilities.contains(cap))
})
.count()
}
fn do_get_random_peer(&self, capabilities: Vec<Capability>) -> Option<(H256, PeerConnection)> {
let peers: Vec<(H256, &PeerConnection, i64)> = self
.peers
.iter()
.filter_map(|(node_id, peer_data)| {
if !capabilities
.iter()
.any(|cap| peer_data.supported_capabilities.contains(cap))
{
return None;
}
peer_data
.connection
.as_ref()
.map(|connection| (*node_id, connection, peer_data.score))
})
.collect();
if peers.is_empty() {
return None;
}
let weights: Vec<u64> = peers
.iter()
.map(|(_, _, score)| (score.max(&MIN_SCORE_CRITICAL) - MIN_SCORE_CRITICAL + 1) as u64)
.collect();
let dist = WeightedIndex::new(&weights).ok()?;
let idx = dist.sample(&mut rand::rngs::OsRng);
Some((peers[idx].0, peers[idx].1.clone()))
}
fn is_validation_needed(contact: &Contact, revalidation_interval: Duration) -> bool {
if contact.disposable {
return false;
}
let sent_ping_ttl = Duration::from_secs(30);
if contact.has_pending_ping() {
contact
.validation_timestamp
.map(|ts| Instant::now().saturating_duration_since(ts) > sent_ping_ttl)
.unwrap_or(false)
} else {
!contact.was_validated()
|| contact
.validation_timestamp
.map(|ts| Instant::now().saturating_duration_since(ts) > revalidation_interval)
.unwrap_or(false)
}
}
}
pub type PeerTable = ActorRef<PeerTableServer>;
#[cfg(test)]
mod tests {
use super::*;
use ethrex_common::H512;
use std::net::Ipv4Addr;
fn dummy_contact(seed: u8) -> (H256, Contact) {
let pk = H512::from_low_u64_be(seed as u64 + 1);
let node = Node::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, seed)), 30303, 30303, pk);
let node_id = node.node_id();
let contact = Contact::new(node, DiscoveryProtocol::Discv4);
(node_id, contact)
}
#[test]
fn insert_into_empty_bucket() {
let mut bucket = KBucket::default();
let (id, contact) = dummy_contact(1);
assert!(bucket.insert(id, contact));
assert_eq!(bucket.contacts.len(), 1);
assert!(bucket.replacements.is_empty());
}
#[test]
fn insert_fills_bucket_then_goes_to_replacements() {
let mut bucket = KBucket::default();
for i in 0..MAX_NODES_PER_BUCKET as u8 {
let (id, contact) = dummy_contact(i);
assert!(bucket.insert(id, contact), "contact {i} should go to main");
}
assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET);
let (id, contact) = dummy_contact(200);
assert!(!bucket.insert(id, contact));
assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET);
assert_eq!(bucket.replacements.len(), 1);
}
#[test]
fn contains_checks_main_and_replacement() {
let mut bucket = KBucket::default();
let (id_main, contact_main) = dummy_contact(1);
bucket.insert(id_main, contact_main);
assert!(bucket.contains(&id_main));
for i in 2..=(MAX_NODES_PER_BUCKET as u8) {
let (id, c) = dummy_contact(i);
bucket.insert(id, c);
}
let (id_repl, contact_repl) = dummy_contact(100);
bucket.insert(id_repl, contact_repl);
assert!(bucket.contains(&id_repl));
assert!(!bucket.contains(&H256::zero()));
}
#[test]
fn get_returns_main_list_only() {
let mut bucket = KBucket::default();
let (id, contact) = dummy_contact(1);
bucket.insert(id, contact);
assert!(bucket.get(&id).is_some());
assert!(bucket.get(&H256::zero()).is_none());
}
#[test]
fn get_any_returns_from_replacement() {
let mut bucket = KBucket::default();
for i in 0..MAX_NODES_PER_BUCKET as u8 {
let (id, c) = dummy_contact(i);
bucket.insert(id, c);
}
let (id_repl, c_repl) = dummy_contact(200);
bucket.insert(id_repl, c_repl);
assert!(bucket.get(&id_repl).is_none()); assert!(bucket.get_any(&id_repl).is_some()); }
#[test]
fn remove_and_promote_with_replacement() {
let mut bucket = KBucket::default();
let mut main_ids = Vec::new();
for i in 0..MAX_NODES_PER_BUCKET as u8 {
let (id, c) = dummy_contact(i);
main_ids.push(id);
bucket.insert(id, c);
}
let (repl_id, repl_contact) = dummy_contact(200);
bucket.insert(repl_id, repl_contact);
let promoted = bucket.remove_and_promote(&main_ids[0]);
assert_eq!(promoted, Some(repl_id));
assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET);
assert!(bucket.replacements.is_empty());
assert!(!bucket.contains(&main_ids[0]));
assert!(bucket.contains(&repl_id));
}
#[test]
fn remove_and_promote_without_replacement() {
let mut bucket = KBucket::default();
let (id, c) = dummy_contact(1);
bucket.insert(id, c);
let promoted = bucket.remove_and_promote(&id);
assert!(promoted.is_none());
assert!(bucket.contacts.is_empty());
}
#[test]
fn remove_nonexistent_returns_none() {
let mut bucket = KBucket::default();
assert!(bucket.remove_and_promote(&H256::zero()).is_none());
}
#[test]
fn replacement_list_evicts_oldest_when_full() {
let mut bucket = KBucket::default();
for i in 0..MAX_NODES_PER_BUCKET as u8 {
let (id, c) = dummy_contact(i);
bucket.insert(id, c);
}
let mut repl_ids = Vec::new();
for i in 0..(MAX_REPLACEMENTS_PER_BUCKET + 2) as u8 {
let seed = 100 + i;
let (id, c) = dummy_contact(seed);
repl_ids.push(id);
bucket.insert(id, c);
}
assert_eq!(bucket.replacements.len(), MAX_REPLACEMENTS_PER_BUCKET);
assert!(!bucket.contains(&repl_ids[0]));
assert!(!bucket.contains(&repl_ids[1]));
assert!(bucket.contains(repl_ids.last().unwrap()));
}
#[test]
fn bucket_index_self_is_none() {
let id = H256::random();
assert_eq!(bucket_index(&id, &id), None);
}
#[test]
fn bucket_index_minimal_distance() {
let local = H256::zero();
let mut remote = H256::zero();
remote.0[31] = 1;
assert_eq!(bucket_index(&local, &remote), Some(0));
}
#[test]
fn bucket_index_maximal_distance() {
let local = H256::zero();
let mut remote = H256::zero();
remote.0[0] = 0x80;
assert_eq!(bucket_index(&local, &remote), Some(255));
}
}