Skip to main content

ethrex_p2p/
peer_table.rs

1//! Unified peer table for both discv4 and discv5 discovery protocols.
2//!
3//! This module provides a protocol-agnostic peer table that stores contact
4//! information discovered through either discv4 or discv5. The key abstraction
5//! is using `Bytes` for ping identifiers:
6//! - discv4: converts H256 ping hash to Bytes
7//! - discv5: already uses Bytes for req_id
8//!
9//! Each contact is tagged with the protocol that discovered it, allowing
10//! protocol-specific lookups to only query compatible contacts.
11
12use crate::{
13    backend,
14    metrics::METRICS,
15    rlpx::{connection::server::PeerConnection, p2p::Capability},
16    types::{Node, NodeRecord},
17    utils::distance,
18};
19use bytes::Bytes;
20use ethrex_common::{H256, U256};
21use ethrex_storage::Store;
22use indexmap::IndexMap;
23use rand::distributions::WeightedIndex;
24use rand::prelude::Distribution;
25use rand::seq::{IteratorRandom, SliceRandom};
26use rustc_hash::{FxHashMap, FxHashSet};
27use spawned_concurrency::{
28    actor,
29    error::ActorError,
30    protocol,
31    tasks::{Actor, ActorRef, ActorStart as _, Context, Handler, Response, send_message_on},
32};
33use std::{
34    net::IpAddr,
35    time::{Duration, Instant},
36};
37
38const MAX_SCORE: i64 = 50;
39const MIN_SCORE: i64 = -50;
40/// Score assigned to peers who are acting maliciously (e.g., returning a node with wrong hash)
41const MIN_SCORE_CRITICAL: i64 = MIN_SCORE * 3;
42/// Score weight for the load balancing function.
43const SCORE_WEIGHT: i64 = 1;
44/// Weight for amount of requests being handled by the peer for the load balancing function.
45const REQUESTS_WEIGHT: i64 = 1;
46/// Max amount of ongoing requests per peer.
47const MAX_CONCURRENT_REQUESTS_PER_PEER: i64 = 100;
48/// The target number of RLPx connections to reach.
49pub const TARGET_PEERS: usize = 100;
50/// Maximum number of ENRs to return in a FindNode response (discv4 compatible).
51pub(crate) const MAX_NODES_IN_NEIGHBORS_PACKET: usize = 16;
52/// Maximum number of ENRs to return in a discv5 FindNode response.
53const MAX_ENRS_PER_FINDNODE_RESPONSE: usize = 16;
54
55/// Number of k-buckets in the Kademlia routing table (one per bit of the 256-bit node ID).
56const NUMBER_OF_BUCKETS: usize = 256;
57/// Maximum number of contacts per k-bucket (Kademlia k parameter).
58pub const MAX_NODES_PER_BUCKET: usize = 16;
59/// Maximum number of replacement entries per k-bucket.
60const MAX_REPLACEMENTS_PER_BUCKET: usize = 10;
61/// Maximum number of entries in the flat connection candidate pool.
62/// This pool is separate from the k-bucket routing table and retains
63/// more contacts for RLPx connection initiation than the k-bucket
64/// structure allows (256 × 16 = 4,096 vs this larger capacity).
65/// 10K matches what Reth and Nethermind use for their candidate pools.
66const MAX_CONNECTION_POOL_SIZE: usize = 10_000;
67
68/// A single k-bucket in the Kademlia routing table.
69/// Each bucket stores contacts at a specific XOR distance range from the local node.
70#[derive(Debug, Clone, Default)]
71pub struct KBucket {
72    pub(crate) contacts: Vec<(H256, Contact)>,
73    pub(crate) replacements: Vec<(H256, Contact)>,
74}
75
76impl KBucket {
77    /// Find a contact by node ID in the main list.
78    fn get(&self, node_id: &H256) -> Option<&Contact> {
79        self.contacts
80            .iter()
81            .find(|(id, _)| id == node_id)
82            .map(|(_, c)| c)
83    }
84
85    /// Find a contact by node ID in either the main or replacement list.
86    fn get_any(&self, node_id: &H256) -> Option<&Contact> {
87        self.get(node_id).or_else(|| {
88            self.replacements
89                .iter()
90                .find(|(id, _)| id == node_id)
91                .map(|(_, c)| c)
92        })
93    }
94
95    /// Find a mutable reference to a contact by node ID (main or replacement list).
96    fn get_mut(&mut self, node_id: &H256) -> Option<&mut Contact> {
97        if let Some((_, c)) = self.contacts.iter_mut().find(|(id, _)| id == node_id) {
98            return Some(c);
99        }
100        self.replacements
101            .iter_mut()
102            .find(|(id, _)| id == node_id)
103            .map(|(_, c)| c)
104    }
105
106    /// Check if a contact exists in this bucket (main or replacement list).
107    fn contains(&self, node_id: &H256) -> bool {
108        self.contacts.iter().any(|(id, _)| id == node_id)
109            || self.replacements.iter().any(|(id, _)| id == node_id)
110    }
111
112    /// Insert a contact into the bucket. Returns true if inserted into main list.
113    /// If the bucket is full, the contact is added to the replacement list instead.
114    fn insert(&mut self, node_id: H256, contact: Contact) -> bool {
115        if self.contacts.len() < MAX_NODES_PER_BUCKET {
116            self.contacts.push((node_id, contact));
117            true
118        } else {
119            self.insert_replacement(node_id, contact);
120            false
121        }
122    }
123
124    /// Add a contact to the replacement list, evicting the oldest if full.
125    fn insert_replacement(&mut self, node_id: H256, contact: Contact) {
126        if self.replacements.len() >= MAX_REPLACEMENTS_PER_BUCKET {
127            self.replacements.remove(0);
128        }
129        self.replacements.push((node_id, contact));
130    }
131
132    /// Remove a contact from the main list and promote a replacement if available.
133    /// Returns the promoted replacement's node ID, if any.
134    fn remove_and_promote(&mut self, node_id: &H256) -> Option<H256> {
135        let idx = self.contacts.iter().position(|(id, _)| id == node_id)?;
136        self.contacts.remove(idx);
137        if !self.replacements.is_empty() {
138            let (replacement_id, replacement) = self.replacements.remove(0);
139            self.contacts.push((replacement_id, replacement));
140            Some(replacement_id)
141        } else {
142            None
143        }
144    }
145}
146
147/// Computes the bucket index for a node relative to the local node.
148/// Uses XOR distance: bucket = floor(log2(XOR(local, remote))), i.e. the
149/// position of the highest set bit minus 1.
150/// Returns None for the local node itself (XOR = 0).
151fn bucket_index(local_node_id: &H256, node_id: &H256) -> Option<usize> {
152    let xor = *local_node_id ^ *node_id;
153    let dist = U256::from_big_endian(xor.as_bytes());
154    if dist.is_zero() {
155        None
156    } else {
157        Some(dist.bits() - 1)
158    }
159}
160
161/// Computes the raw XOR distance between two node IDs.
162/// Used for comparing relative closeness: a is closer to target than b
163/// iff xor_distance(target, a) < xor_distance(target, b).
164pub(crate) fn xor_distance(a: &H256, b: &H256) -> H256 {
165    *a ^ *b
166}
167
168/// Identifies which discovery protocol was used to find a contact.
169/// This allows protocol-specific lookups to only query compatible contacts.
170#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
171pub enum DiscoveryProtocol {
172    /// Contact discovered via discv4 protocol
173    Discv4,
174    /// Contact discovered via discv5 protocol
175    Discv5,
176}
177
178/// Session information for discv5 protocol.
179/// Contains symmetric keys derived from ECDH for message encryption/decryption.
180pub use crate::discv5::session::Session;
181
182#[derive(Debug, Clone)]
183pub struct Contact {
184    pub node: Node,
185    /// Whether this contact is reachable via discv4 protocol.
186    pub is_discv4: bool,
187    /// Whether this contact is reachable via discv5 protocol.
188    pub is_discv5: bool,
189    /// The timestamp when the contact was last sent a ping.
190    /// If None, the contact has never been pinged.
191    pub validation_timestamp: Option<Instant>,
192    /// The identifier of the last unacknowledged ping sent to this contact, or
193    /// None if no ping was sent yet or it was already acknowledged.
194    /// - discv4: H256 hash converted to Bytes
195    /// - discv5: request ID as Bytes
196    pub ping_id: Option<Bytes>,
197
198    /// The hash of the last unacknowledged ENRRequest sent to this contact, or
199    /// None if no request was sent yet or it was already acknowledged.
200    pub enr_request_hash: Option<H256>,
201
202    /// ENR associated with this contact, if it was provided by the peer.
203    pub record: Option<NodeRecord>,
204    /// This contact failed to respond our Ping.
205    pub disposable: bool,
206    /// Set to true after we send a successful ENRResponse to it.
207    pub knows_us: bool,
208    /// This is a known-bad peer (on another network, no matching capabilities, etc)
209    pub unwanted: bool,
210    /// Whether the last known fork ID is valid, None if unknown.
211    pub is_fork_id_valid: Option<bool>,
212    /// Session information for discv5 (None for discv4 contacts)
213    session: Option<Session>,
214}
215
216impl Contact {
217    pub fn was_validated(&self) -> bool {
218        self.validation_timestamp.is_some() && !self.has_pending_ping()
219    }
220
221    pub fn has_pending_ping(&self) -> bool {
222        self.ping_id.is_some()
223    }
224
225    pub fn record_ping_sent(&mut self, ping_id: Bytes) {
226        self.validation_timestamp = Some(Instant::now());
227        self.ping_id = Some(ping_id);
228    }
229
230    pub fn record_enr_request_sent(&mut self, request_hash: H256) {
231        self.enr_request_hash = Some(request_hash);
232    }
233
234    // If hash does not match, ignore. Otherwise, reset enr_request_hash
235    pub fn record_enr_response_received(&mut self, request_hash: H256, record: NodeRecord) {
236        if self
237            .enr_request_hash
238            .take_if(|h| *h == request_hash)
239            .is_some()
240        {
241            self.record = Some(record);
242        }
243    }
244
245    pub fn has_pending_enr_request(&self) -> bool {
246        self.enr_request_hash.is_some()
247    }
248}
249
250impl Contact {
251    pub fn new(node: Node, protocol: DiscoveryProtocol) -> Self {
252        Self {
253            node,
254            is_discv4: protocol == DiscoveryProtocol::Discv4,
255            is_discv5: protocol == DiscoveryProtocol::Discv5,
256            validation_timestamp: None,
257            ping_id: None,
258            enr_request_hash: None,
259            record: None,
260            disposable: false,
261            knows_us: true,
262            unwanted: false,
263            is_fork_id_valid: None,
264            session: None,
265        }
266    }
267
268    /// Check if this contact supports the given protocol.
269    pub fn supports_protocol(&self, protocol: DiscoveryProtocol) -> bool {
270        match protocol {
271            DiscoveryProtocol::Discv4 => self.is_discv4,
272            DiscoveryProtocol::Discv5 => self.is_discv5,
273        }
274    }
275
276    /// Mark this contact as supporting the given protocol.
277    pub fn add_protocol(&mut self, protocol: DiscoveryProtocol) {
278        match protocol {
279            DiscoveryProtocol::Discv4 => self.is_discv4 = true,
280            DiscoveryProtocol::Discv5 => self.is_discv5 = true,
281        }
282    }
283}
284
285#[derive(Debug, Clone)]
286pub struct PeerData {
287    pub node: Node,
288    pub record: Option<NodeRecord>,
289    pub supported_capabilities: Vec<Capability>,
290    /// Set to true if the connection is inbound (aka the connection was started by the peer and not by this node)
291    /// It is only valid as long as is_connected is true
292    pub is_connection_inbound: bool,
293    /// communication channels between the peer data and its active connection
294    pub connection: Option<PeerConnection>,
295    /// This tracks the score of a peer
296    score: i64,
297    /// Track the amount of concurrent requests this peer is handling
298    requests: i64,
299    /// Timestamp (seconds since UNIX epoch) of the last successful response from this peer
300    pub last_response_time: Option<u64>,
301}
302
303impl PeerData {
304    pub fn new(
305        node: Node,
306        record: Option<NodeRecord>,
307        connection: Option<PeerConnection>,
308        capabilities: Vec<Capability>,
309    ) -> Self {
310        Self {
311            node,
312            record,
313            supported_capabilities: capabilities,
314            is_connection_inbound: false,
315            connection,
316            score: Default::default(),
317            requests: Default::default(),
318            last_response_time: None,
319        }
320    }
321}
322
323/// Diagnostic snapshot of a peer's state, used by admin RPC endpoints.
324#[derive(Debug, Clone, serde::Serialize)]
325pub struct PeerDiagnostics {
326    pub peer_id: H256,
327    pub score: i64,
328    pub inflight_requests: i64,
329    pub eligible: bool,
330    pub capabilities: Vec<String>,
331    pub ip: IpAddr,
332    pub client_version: String,
333    pub connection_direction: String,
334    pub last_response_time: Option<u64>,
335}
336
337/// Result of contact validation.
338#[derive(Debug, Clone)]
339pub enum ContactValidation {
340    Valid(Box<Contact>),
341    InvalidContact,
342    UnknownContact,
343    IpMismatch,
344}
345
346/// Reservation handle for a peer request slot.
347///
348/// **Contract:** when a `RequestPermit` exists, the `requests` counter for
349/// its peer has been incremented by one. Dropping the permit releases the
350/// slot via a fire-and-forget `DecRequests` message. The handler that
351/// returns the permit also bumps the counter atomically under `&mut self`,
352/// so selection and reservation cannot be observed out of order.
353///
354/// The permit must travel with whatever code owns the outstanding request —
355/// move it into spawned tasks, send it through channels alongside results,
356/// etc. Dropping early releases the slot early.
357#[must_use = "dropping this permit immediately releases the peer's request slot"]
358pub struct RequestPermit {
359    peer_table: PeerTable,
360    peer_id: H256,
361}
362
363impl RequestPermit {
364    pub(crate) fn new(peer_table: PeerTable, peer_id: H256) -> Self {
365        Self {
366            peer_table,
367            peer_id,
368        }
369    }
370}
371
372impl std::fmt::Debug for RequestPermit {
373    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
374        f.debug_struct("RequestPermit")
375            .field("peer_id", &self.peer_id)
376            .finish_non_exhaustive()
377    }
378}
379
380impl Drop for RequestPermit {
381    fn drop(&mut self) {
382        // Fire-and-forget. If the actor mailbox is closed, p2p is already
383        // shutting down — the lost decrement is a non-issue.
384        let _ = self.peer_table.dec_requests(self.peer_id);
385    }
386}
387
388#[protocol]
389pub trait PeerTableServerProtocol: Send + Sync {
390    // Send (cast) methods
391    fn new_contacts(&self, nodes: Vec<Node>, protocol: DiscoveryProtocol)
392    -> Result<(), ActorError>;
393    fn new_contact_records(&self, node_records: Vec<NodeRecord>) -> Result<(), ActorError>;
394    fn new_connected_peer(
395        &self,
396        node: Node,
397        connection: PeerConnection,
398        capabilities: Vec<Capability>,
399    ) -> Result<(), ActorError>;
400    fn set_session_info(&self, node_id: H256, session: Session) -> Result<(), ActorError>;
401    fn remove_peer(&self, node_id: H256) -> Result<(), ActorError>;
402    fn dec_requests(&self, node_id: H256) -> Result<(), ActorError>;
403    fn set_unwanted(&self, node_id: H256) -> Result<(), ActorError>;
404    fn set_is_fork_id_valid(&self, node_id: H256, valid: bool) -> Result<(), ActorError>;
405    fn record_success(&self, node_id: H256) -> Result<(), ActorError>;
406    fn record_failure(&self, node_id: H256) -> Result<(), ActorError>;
407    fn record_critical_failure(&self, node_id: H256) -> Result<(), ActorError>;
408    fn record_ping_sent(&self, node_id: H256, ping_id: Bytes) -> Result<(), ActorError>;
409    fn record_pong_received(&self, node_id: H256, ping_id: Bytes) -> Result<(), ActorError>;
410    fn record_enr_request_sent(&self, node_id: H256, request_hash: H256) -> Result<(), ActorError>;
411    fn record_enr_response_received(
412        &self,
413        node_id: H256,
414        request_hash: H256,
415        record: NodeRecord,
416    ) -> Result<(), ActorError>;
417    fn set_disposable(&self, node_id: H256) -> Result<(), ActorError>;
418    fn mark_knows_us(&self, node_id: H256) -> Result<(), ActorError>;
419    fn prune_table(&self) -> Result<(), ActorError>;
420    fn shutdown(&self) -> Result<(), ActorError>;
421
422    // Request (call) methods
423    fn peer_count(&self) -> Response<usize>;
424    fn peer_count_by_capabilities(&self, capabilities: Vec<Capability>) -> Response<usize>;
425    fn target_reached(&self) -> Response<bool>;
426    fn target_peers_reached(&self) -> Response<bool>;
427    fn target_peers_completion(&self) -> Response<f64>;
428    fn get_contact_to_initiate(&self) -> Response<Option<Box<Contact>>>;
429    fn get_contact_for_enr_lookup(&self) -> Response<Option<Box<Contact>>>;
430    fn get_closest_from_pool(&self, target: H256, count: usize) -> Response<Vec<(H256, Node)>>;
431    fn get_contact(&self, node_id: H256) -> Response<Option<Box<Contact>>>;
432    fn get_contact_to_revalidate(
433        &self,
434        revalidation_interval: Duration,
435        protocol: DiscoveryProtocol,
436    ) -> Response<Option<Box<Contact>>>;
437    fn get_best_peer(
438        &self,
439        capabilities: Vec<Capability>,
440    ) -> Response<Option<(H256, PeerConnection, RequestPermit)>>;
441    fn get_best_peer_excluding(
442        &self,
443        capabilities: Vec<Capability>,
444        excluded: Vec<H256>,
445    ) -> Response<Option<(H256, PeerConnection, RequestPermit)>>;
446    fn get_best_n_peers(
447        &self,
448        capabilities: Vec<Capability>,
449        n: usize,
450    ) -> Response<Vec<(H256, PeerConnection, RequestPermit)>>;
451    /// Read-only predicate: is there any eligible peer matching `capabilities`?
452    /// Does not reserve a slot; use for capacity/rotation probes only.
453    fn has_eligible_peer(&self, capabilities: Vec<Capability>) -> Response<bool>;
454    fn get_score(&self, node_id: H256) -> Response<i64>;
455    fn get_connected_nodes(&self) -> Response<Vec<Node>>;
456    fn get_peers_with_capabilities(&self)
457    -> Response<Vec<(H256, PeerConnection, Vec<Capability>)>>;
458    fn insert_if_new(&self, node: Node, protocol: DiscoveryProtocol) -> Response<bool>;
459    fn validate_contact(&self, node_id: H256, sender_ip: IpAddr) -> Response<ContactValidation>;
460    fn get_closest_nodes(&self, node_id: H256) -> Response<Vec<Node>>;
461    fn get_nodes_at_distances(&self, distances: Vec<u32>) -> Response<Vec<NodeRecord>>;
462    fn get_peers_data(&self) -> Response<Vec<PeerData>>;
463    fn get_random_peer(
464        &self,
465        capabilities: Vec<Capability>,
466    ) -> Response<Option<(H256, PeerConnection, RequestPermit)>>;
467    fn get_session_info(&self, node_id: H256) -> Response<Option<Session>>;
468    fn get_peer_diagnostics(&self) -> Response<Vec<PeerDiagnostics>>;
469    fn get_peer_connection(&self, peer_id: H256) -> Response<Option<PeerConnection>>;
470}
471
472#[derive(Debug)]
473pub struct PeerTableServer {
474    local_node_id: H256,
475    buckets: Vec<KBucket>,
476    peers: IndexMap<H256, PeerData>,
477    already_tried_peers: FxHashSet<H256>,
478    target_peers: usize,
479    store: Store,
480    /// Standalone session store, independent of contacts.
481    /// Allows sessions to be stored even before the contact's ENR is known/parseable.
482    sessions: FxHashMap<H256, Session>,
483    /// Flat pool of discovered contacts for RLPx connection initiation.
484    /// Decoupled from the k-bucket routing table so that connection initiation
485    /// has access to a much larger candidate pool than the k-bucket structure
486    /// allows (k-buckets: 256 × 16 = 4,096 max; this pool: up to 50,000).
487    /// K-buckets are still used for all Kademlia protocol operations.
488    connection_pool: IndexMap<H256, Node>,
489}
490
491#[actor(protocol = PeerTableServerProtocol)]
492impl PeerTableServer {
493    pub fn spawn(local_node_id: H256, target_peers: usize, store: Store) -> PeerTable {
494        PeerTableServer::new(local_node_id, target_peers, store).start()
495    }
496
497    pub(crate) fn new(local_node_id: H256, target_peers: usize, store: Store) -> Self {
498        Self {
499            local_node_id,
500            buckets: vec![KBucket::default(); NUMBER_OF_BUCKETS],
501            peers: Default::default(),
502            already_tried_peers: Default::default(),
503            target_peers,
504            store,
505            sessions: Default::default(),
506            connection_pool: IndexMap::with_capacity(MAX_CONNECTION_POOL_SIZE),
507        }
508    }
509
510    #[started]
511    async fn started(&mut self, ctx: &Context<Self>) {
512        send_message_on(
513            ctx.clone(),
514            tokio::signal::ctrl_c(),
515            peer_table_server_protocol::Shutdown,
516        );
517    }
518
519    // === Send handlers ===
520
521    #[send_handler]
522    async fn handle_new_contacts(
523        &mut self,
524        msg: peer_table_server_protocol::NewContacts,
525        _ctx: &Context<Self>,
526    ) {
527        self.do_new_contacts(msg.nodes, msg.protocol).await;
528    }
529
530    #[send_handler]
531    async fn handle_new_contact_records(
532        &mut self,
533        msg: peer_table_server_protocol::NewContactRecords,
534        _ctx: &Context<Self>,
535    ) {
536        self.do_new_contact_records(msg.node_records).await;
537    }
538
539    #[send_handler]
540    async fn handle_new_connected_peer(
541        &mut self,
542        msg: peer_table_server_protocol::NewConnectedPeer,
543        _ctx: &Context<Self>,
544    ) {
545        let new_peer_id = msg.node.node_id();
546        let new_peer = PeerData::new(msg.node, None, Some(msg.connection), msg.capabilities);
547        self.peers.insert(new_peer_id, new_peer);
548    }
549
550    #[send_handler]
551    async fn handle_set_session_info(
552        &mut self,
553        msg: peer_table_server_protocol::SetSessionInfo,
554        _ctx: &Context<Self>,
555    ) {
556        // Store in the standalone sessions map (always succeeds, no contact required).
557        self.sessions.insert(msg.node_id, msg.session.clone());
558        // Also update the contact's cached session if the contact exists.
559        if let Some(contact) = self.get_contact_mut(&msg.node_id) {
560            contact.session = Some(msg.session);
561        }
562    }
563
564    #[send_handler]
565    async fn handle_remove_peer(
566        &mut self,
567        msg: peer_table_server_protocol::RemovePeer,
568        _ctx: &Context<Self>,
569    ) {
570        self.peers.swap_remove(&msg.node_id);
571    }
572
573    #[send_handler]
574    async fn handle_dec_requests(
575        &mut self,
576        msg: peer_table_server_protocol::DecRequests,
577        _ctx: &Context<Self>,
578    ) {
579        self.peers.entry(msg.node_id).and_modify(|peer_data| {
580            if peer_data.requests <= 0 {
581                // Expected under the reconnect race (stale permit fires
582                // after remove_peer + new_connected_peer), self-heals.
583                // Otherwise points to a bookkeeping bug worth chasing.
584                tracing::debug!(
585                    peer_id = ?msg.node_id,
586                    requests = peer_data.requests,
587                    "dec_requests with counter already <= 0",
588                );
589            }
590            peer_data.requests = peer_data.requests.saturating_sub(1).max(0)
591        });
592    }
593
594    #[send_handler]
595    async fn handle_set_unwanted(
596        &mut self,
597        msg: peer_table_server_protocol::SetUnwanted,
598        _ctx: &Context<Self>,
599    ) {
600        if let Some(contact) = self.get_contact_mut(&msg.node_id) {
601            contact.unwanted = true;
602        }
603    }
604
605    #[send_handler]
606    async fn handle_set_is_fork_id_valid(
607        &mut self,
608        msg: peer_table_server_protocol::SetIsForkIdValid,
609        _ctx: &Context<Self>,
610    ) {
611        if let Some(contact) = self.get_contact_mut(&msg.node_id) {
612            contact.is_fork_id_valid = Some(msg.valid);
613        }
614    }
615
616    #[send_handler]
617    async fn handle_record_success(
618        &mut self,
619        msg: peer_table_server_protocol::RecordSuccess,
620        _ctx: &Context<Self>,
621    ) {
622        let now = std::time::SystemTime::now()
623            .duration_since(std::time::UNIX_EPOCH)
624            .unwrap_or_default()
625            .as_secs();
626        self.peers.entry(msg.node_id).and_modify(|peer_data| {
627            peer_data.score = (peer_data.score + 1).min(MAX_SCORE);
628            peer_data.last_response_time = Some(now);
629        });
630    }
631
632    #[send_handler]
633    async fn handle_record_failure(
634        &mut self,
635        msg: peer_table_server_protocol::RecordFailure,
636        _ctx: &Context<Self>,
637    ) {
638        self.peers
639            .entry(msg.node_id)
640            .and_modify(|peer_data| peer_data.score = (peer_data.score - 1).max(MIN_SCORE));
641    }
642
643    #[send_handler]
644    async fn handle_record_critical_failure(
645        &mut self,
646        msg: peer_table_server_protocol::RecordCriticalFailure,
647        _ctx: &Context<Self>,
648    ) {
649        self.peers
650            .entry(msg.node_id)
651            .and_modify(|peer_data| peer_data.score = MIN_SCORE_CRITICAL);
652    }
653
654    #[send_handler]
655    async fn handle_record_ping_sent(
656        &mut self,
657        msg: peer_table_server_protocol::RecordPingSent,
658        _ctx: &Context<Self>,
659    ) {
660        if let Some(contact) = self.get_contact_mut(&msg.node_id) {
661            contact.record_ping_sent(msg.ping_id);
662        }
663    }
664
665    #[send_handler]
666    async fn handle_record_pong_received(
667        &mut self,
668        msg: peer_table_server_protocol::RecordPongReceived,
669        _ctx: &Context<Self>,
670    ) {
671        if let Some(contact) = self.get_contact_mut(&msg.node_id)
672            && contact
673                .ping_id
674                .as_ref()
675                .map(|value| *value == msg.ping_id)
676                .unwrap_or(false)
677        {
678            contact.ping_id = None;
679        }
680    }
681
682    #[send_handler]
683    async fn handle_record_enr_request_sent(
684        &mut self,
685        msg: peer_table_server_protocol::RecordEnrRequestSent,
686        _ctx: &Context<Self>,
687    ) {
688        if let Some(contact) = self.get_contact_mut(&msg.node_id) {
689            contact.record_enr_request_sent(msg.request_hash);
690        }
691    }
692
693    #[send_handler]
694    async fn handle_record_enr_response_received(
695        &mut self,
696        msg: peer_table_server_protocol::RecordEnrResponseReceived,
697        _ctx: &Context<Self>,
698    ) {
699        if let Some(contact) = self.get_contact_mut(&msg.node_id) {
700            contact.record_enr_response_received(msg.request_hash, msg.record);
701        }
702    }
703
704    #[send_handler]
705    async fn handle_set_disposable(
706        &mut self,
707        msg: peer_table_server_protocol::SetDisposable,
708        _ctx: &Context<Self>,
709    ) {
710        if let Some(contact) = self.get_contact_mut(&msg.node_id) {
711            contact.disposable = true;
712        }
713    }
714
715    #[send_handler]
716    async fn handle_mark_knows_us(
717        &mut self,
718        msg: peer_table_server_protocol::MarkKnowsUs,
719        _ctx: &Context<Self>,
720    ) {
721        if let Some(contact) = self.get_contact_mut(&msg.node_id) {
722            contact.knows_us = true;
723        }
724    }
725
726    #[send_handler]
727    async fn handle_prune_table(
728        &mut self,
729        _msg: peer_table_server_protocol::PruneTable,
730        _ctx: &Context<Self>,
731    ) {
732        self.prune();
733    }
734
735    #[send_handler]
736    async fn handle_shutdown(
737        &mut self,
738        _msg: peer_table_server_protocol::Shutdown,
739        ctx: &Context<Self>,
740    ) {
741        ctx.stop();
742    }
743
744    // === Request handlers ===
745
746    #[request_handler]
747    async fn handle_peer_count(
748        &mut self,
749        _msg: peer_table_server_protocol::PeerCount,
750        _ctx: &Context<Self>,
751    ) -> usize {
752        self.peers.len()
753    }
754
755    #[request_handler]
756    async fn handle_peer_count_by_capabilities(
757        &mut self,
758        msg: peer_table_server_protocol::PeerCountByCapabilities,
759        _ctx: &Context<Self>,
760    ) -> usize {
761        self.do_peer_count_by_capabilities(msg.capabilities)
762    }
763
764    #[request_handler]
765    async fn handle_target_reached(
766        &mut self,
767        _msg: peer_table_server_protocol::TargetReached,
768        _ctx: &Context<Self>,
769    ) -> bool {
770        self.peers.len() >= self.target_peers
771    }
772
773    #[request_handler]
774    async fn handle_target_peers_reached(
775        &mut self,
776        _msg: peer_table_server_protocol::TargetPeersReached,
777        _ctx: &Context<Self>,
778    ) -> bool {
779        self.peers.len() >= self.target_peers
780    }
781
782    #[request_handler]
783    async fn handle_target_peers_completion(
784        &mut self,
785        _msg: peer_table_server_protocol::TargetPeersCompletion,
786        _ctx: &Context<Self>,
787    ) -> f64 {
788        self.peers.len() as f64 / self.target_peers as f64
789    }
790
791    #[request_handler]
792    async fn handle_get_contact_to_initiate(
793        &mut self,
794        _msg: peer_table_server_protocol::GetContactToInitiate,
795        _ctx: &Context<Self>,
796    ) -> Option<Box<Contact>> {
797        self.do_get_contact_to_initiate().map(Box::new)
798    }
799
800    #[request_handler]
801    async fn handle_get_closest_from_pool(
802        &mut self,
803        msg: peer_table_server_protocol::GetClosestFromPool,
804        _ctx: &Context<Self>,
805    ) -> Vec<(H256, Node)> {
806        self.do_get_closest_from_pool(msg.target, msg.count)
807    }
808
809    #[request_handler]
810    async fn handle_get_contact_for_enr_lookup(
811        &mut self,
812        _msg: peer_table_server_protocol::GetContactForEnrLookup,
813        _ctx: &Context<Self>,
814    ) -> Option<Box<Contact>> {
815        self.do_get_contact_for_enr_lookup().map(Box::new)
816    }
817
818    #[request_handler]
819    async fn handle_get_contact(
820        &mut self,
821        msg: peer_table_server_protocol::GetContact,
822        _ctx: &Context<Self>,
823    ) -> Option<Box<Contact>> {
824        self.get_contact(&msg.node_id).cloned().map(Box::new)
825    }
826
827    #[request_handler]
828    async fn handle_get_contact_to_revalidate(
829        &mut self,
830        msg: peer_table_server_protocol::GetContactToRevalidate,
831        _ctx: &Context<Self>,
832    ) -> Option<Box<Contact>> {
833        self.do_get_contact_to_revalidate(msg.revalidation_interval, msg.protocol)
834    }
835
836    #[request_handler]
837    async fn handle_get_best_peer(
838        &mut self,
839        msg: peer_table_server_protocol::GetBestPeer,
840        ctx: &Context<Self>,
841    ) -> Option<(H256, PeerConnection, RequestPermit)> {
842        let (peer_id, conn) = self.do_get_best_peer(&msg.capabilities)?;
843        self.peers
844            .get_mut(&peer_id)
845            .expect("peer returned by do_get_best_peer must be present in self.peers")
846            .requests += 1;
847        Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id)))
848    }
849
850    #[request_handler]
851    async fn handle_get_best_peer_excluding(
852        &mut self,
853        msg: peer_table_server_protocol::GetBestPeerExcluding,
854        ctx: &Context<Self>,
855    ) -> Option<(H256, PeerConnection, RequestPermit)> {
856        let (peer_id, conn) = self.do_get_best_peer_excluding(&msg.capabilities, &msg.excluded)?;
857        self.peers
858            .get_mut(&peer_id)
859            .expect("peer returned by do_get_best_peer_excluding must be present in self.peers")
860            .requests += 1;
861        Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id)))
862    }
863
864    #[request_handler]
865    async fn handle_get_best_n_peers(
866        &mut self,
867        msg: peer_table_server_protocol::GetBestNPeers,
868        ctx: &Context<Self>,
869    ) -> Vec<(H256, PeerConnection, RequestPermit)> {
870        let picks = self.do_get_best_n_peers(&msg.capabilities, msg.n);
871        let mut out = Vec::with_capacity(picks.len());
872        for (peer_id, conn) in picks {
873            self.peers
874                .get_mut(&peer_id)
875                .expect("peer returned by do_get_best_n_peers must be present in self.peers")
876                .requests += 1;
877            out.push((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id)));
878        }
879        out
880    }
881
882    #[request_handler]
883    async fn handle_has_eligible_peer(
884        &mut self,
885        msg: peer_table_server_protocol::HasEligiblePeer,
886        _ctx: &Context<Self>,
887    ) -> bool {
888        self.peers.values().any(|peer_data| {
889            peer_data.connection.is_some()
890                && self.can_try_more_requests(&peer_data.score, &peer_data.requests)
891                && msg
892                    .capabilities
893                    .iter()
894                    .any(|cap| peer_data.supported_capabilities.contains(cap))
895        })
896    }
897
898    #[request_handler]
899    async fn handle_get_score(
900        &mut self,
901        msg: peer_table_server_protocol::GetScore,
902        _ctx: &Context<Self>,
903    ) -> i64 {
904        self.peers
905            .get(&msg.node_id)
906            .map(|peer_data| peer_data.score)
907            .unwrap_or_default()
908    }
909
910    #[request_handler]
911    async fn handle_get_connected_nodes(
912        &mut self,
913        _msg: peer_table_server_protocol::GetConnectedNodes,
914        _ctx: &Context<Self>,
915    ) -> Vec<Node> {
916        self.peers
917            .values()
918            .map(|peer_data| peer_data.node.clone())
919            .collect()
920    }
921
922    #[request_handler]
923    async fn handle_get_peers_with_capabilities(
924        &mut self,
925        _msg: peer_table_server_protocol::GetPeersWithCapabilities,
926        _ctx: &Context<Self>,
927    ) -> Vec<(H256, PeerConnection, Vec<Capability>)> {
928        self.peers
929            .iter()
930            .filter_map(|(peer_id, peer_data)| {
931                peer_data.connection.clone().map(|connection| {
932                    (
933                        *peer_id,
934                        connection,
935                        peer_data.supported_capabilities.clone(),
936                    )
937                })
938            })
939            .collect()
940    }
941
942    #[request_handler]
943    async fn handle_insert_if_new(
944        &mut self,
945        msg: peer_table_server_protocol::InsertIfNew,
946        _ctx: &Context<Self>,
947    ) -> bool {
948        let node_id = msg.node.node_id();
949        // Always add to the connection pool
950        self.insert_to_connection_pool(node_id, msg.node.clone());
951        if self.contact_exists(&node_id) {
952            return false;
953        }
954        let contact = Contact::new(msg.node, msg.protocol);
955        // Return true for any genuinely new node, even if it overflows to the
956        // replacement list.  This ensures the caller sends a reciprocal ping
957        // which establishes the bond needed for FindNode validation.
958        self.insert_contact(node_id, contact);
959        METRICS.record_new_discovery().await;
960        true
961    }
962
963    #[request_handler]
964    async fn handle_validate_contact(
965        &mut self,
966        msg: peer_table_server_protocol::ValidateContact,
967        _ctx: &Context<Self>,
968    ) -> ContactValidation {
969        self.do_validate_contact(msg.node_id, msg.sender_ip)
970    }
971
972    #[request_handler]
973    async fn handle_get_closest_nodes(
974        &mut self,
975        msg: peer_table_server_protocol::GetClosestNodes,
976        _ctx: &Context<Self>,
977    ) -> Vec<Node> {
978        self.do_get_closest_nodes(msg.node_id)
979    }
980
981    #[request_handler]
982    async fn handle_get_nodes_at_distances(
983        &mut self,
984        msg: peer_table_server_protocol::GetNodesAtDistances,
985        _ctx: &Context<Self>,
986    ) -> Vec<NodeRecord> {
987        self.do_get_nodes_at_distances(&msg.distances)
988    }
989
990    #[request_handler]
991    async fn handle_get_peers_data(
992        &mut self,
993        _msg: peer_table_server_protocol::GetPeersData,
994        _ctx: &Context<Self>,
995    ) -> Vec<PeerData> {
996        self.peers.values().cloned().collect()
997    }
998
999    #[request_handler]
1000    async fn handle_get_random_peer(
1001        &mut self,
1002        msg: peer_table_server_protocol::GetRandomPeer,
1003        ctx: &Context<Self>,
1004    ) -> Option<(H256, PeerConnection, RequestPermit)> {
1005        let (peer_id, conn) = self.do_get_random_peer(msg.capabilities)?;
1006        self.peers
1007            .get_mut(&peer_id)
1008            .expect("peer returned by do_get_random_peer must be present in self.peers")
1009            .requests += 1;
1010        Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id)))
1011    }
1012
1013    #[request_handler]
1014    async fn handle_get_session_info(
1015        &mut self,
1016        msg: peer_table_server_protocol::GetSessionInfo,
1017        _ctx: &Context<Self>,
1018    ) -> Option<Session> {
1019        // Check standalone sessions map first; fall back to contact.session.
1020        self.sessions
1021            .get(&msg.node_id)
1022            .cloned()
1023            .or_else(|| self.get_contact(&msg.node_id)?.session.clone())
1024    }
1025
1026    #[request_handler]
1027    async fn handle_get_peer_connection(
1028        &mut self,
1029        msg: peer_table_server_protocol::GetPeerConnection,
1030        _ctx: &Context<Self>,
1031    ) -> Option<PeerConnection> {
1032        self.peers
1033            .get(&msg.peer_id)
1034            .and_then(|peer_data| peer_data.connection.clone())
1035    }
1036
1037    #[request_handler]
1038    async fn handle_get_peer_diagnostics(
1039        &mut self,
1040        _msg: peer_table_server_protocol::GetPeerDiagnostics,
1041        _ctx: &Context<Self>,
1042    ) -> Vec<PeerDiagnostics> {
1043        self.peers
1044            .iter()
1045            .map(|(id, peer_data)| PeerDiagnostics {
1046                peer_id: *id,
1047                score: peer_data.score,
1048                inflight_requests: peer_data.requests,
1049                eligible: self.can_try_more_requests(&peer_data.score, &peer_data.requests),
1050                capabilities: peer_data
1051                    .supported_capabilities
1052                    .iter()
1053                    .map(|c| format!("{}/{}", c.protocol(), c.version))
1054                    .collect(),
1055                ip: peer_data.node.ip,
1056                client_version: peer_data.node.version.clone().unwrap_or_default(),
1057                connection_direction: if peer_data.is_connection_inbound {
1058                    "inbound".to_string()
1059                } else {
1060                    "outbound".to_string()
1061                },
1062                last_response_time: peer_data.last_response_time,
1063            })
1064            .collect()
1065    }
1066
1067    // === Private helper methods ===
1068
1069    // --- K-bucket accessors ---
1070
1071    /// Get the bucket index for a node ID, or None if it's the local node.
1072    fn bucket_for(&self, node_id: &H256) -> Option<usize> {
1073        bucket_index(&self.local_node_id, node_id)
1074    }
1075
1076    /// Look up a contact by node ID in main or replacement list (O(K) within the bucket).
1077    fn get_contact(&self, node_id: &H256) -> Option<&Contact> {
1078        let idx = self.bucket_for(node_id)?;
1079        self.buckets[idx].get_any(node_id)
1080    }
1081
1082    /// Look up a mutable reference to a contact by node ID.
1083    fn get_contact_mut(&mut self, node_id: &H256) -> Option<&mut Contact> {
1084        let idx = self.bucket_for(node_id)?;
1085        self.buckets[idx].get_mut(node_id)
1086    }
1087
1088    /// Check if a contact exists in any bucket (main or replacement list).
1089    fn contact_exists(&self, node_id: &H256) -> bool {
1090        let Some(idx) = self.bucket_for(node_id) else {
1091            return false;
1092        };
1093        self.buckets[idx].contains(node_id)
1094    }
1095
1096    /// Insert a contact into the appropriate k-bucket. Returns true if inserted
1097    /// into the main list, false if the node went to the replacement list or is
1098    /// the local node.
1099    fn insert_contact(&mut self, node_id: H256, contact: Contact) -> bool {
1100        #[cfg(feature = "metrics")]
1101        let start = std::time::Instant::now();
1102
1103        let Some(idx) = self.bucket_for(&node_id) else {
1104            return false;
1105        };
1106        let result = self.buckets[idx].insert(node_id, contact);
1107
1108        #[cfg(feature = "metrics")]
1109        {
1110            use ethrex_metrics::p2p::METRICS_P2P;
1111            METRICS_P2P.observe_insert_contact_duration(start.elapsed().as_secs_f64());
1112        }
1113
1114        result
1115    }
1116
1117    /// Insert a node into the flat connection pool for RLPx initiation.
1118    /// Evicts the oldest entry when the pool is at capacity.
1119    fn insert_to_connection_pool(&mut self, node_id: H256, node: Node) {
1120        if self.connection_pool.contains_key(&node_id) {
1121            return;
1122        }
1123        if self.connection_pool.len() >= MAX_CONNECTION_POOL_SIZE {
1124            self.connection_pool.shift_remove_index(0);
1125        }
1126        self.connection_pool.insert(node_id, node);
1127    }
1128
1129    /// Look up a contact by node ID in either the main or replacement list.
1130    fn get_contact_or_replacement(&self, node_id: &H256) -> Option<&Contact> {
1131        let idx = self.bucket_for(node_id)?;
1132        self.buckets[idx].get_any(node_id)
1133    }
1134
1135    /// Look up a mutable reference in either the main or replacement list.
1136    fn get_contact_or_replacement_mut(&mut self, node_id: &H256) -> Option<&mut Contact> {
1137        let idx = self.bucket_for(node_id)?;
1138        let bucket = &mut self.buckets[idx];
1139        // Search main list first, then replacement list.
1140        // Done inline to avoid borrow-checker issues with or_else closures.
1141        if let Some(pos) = bucket.contacts.iter().position(|(id, _)| id == node_id) {
1142            return Some(&mut bucket.contacts[pos].1);
1143        }
1144        if let Some(pos) = bucket.replacements.iter().position(|(id, _)| id == node_id) {
1145            return Some(&mut bucket.replacements[pos].1);
1146        }
1147        None
1148    }
1149
1150    /// Iterate over all contacts across all buckets (main and replacement lists).
1151    fn iter_contacts(&self) -> impl Iterator<Item = (&H256, &Contact)> {
1152        self.buckets.iter().flat_map(|bucket| {
1153            bucket
1154                .contacts
1155                .iter()
1156                .chain(bucket.replacements.iter())
1157                .map(|(id, c)| (id, c))
1158        })
1159    }
1160
1161    // --- Peer selection ---
1162
1163    fn weight_peer(&self, score: &i64, requests: &i64) -> i64 {
1164        score * SCORE_WEIGHT - requests * REQUESTS_WEIGHT
1165    }
1166
1167    fn can_try_more_requests(&self, score: &i64, requests: &i64) -> bool {
1168        let score_ratio = (score - MIN_SCORE) as f64 / (MAX_SCORE - MIN_SCORE) as f64;
1169        let max_requests = (MAX_CONCURRENT_REQUESTS_PER_PEER as f64 * score_ratio).max(1.0);
1170        (*requests as f64) < max_requests
1171    }
1172
1173    fn do_get_best_peer(&self, capabilities: &[Capability]) -> Option<(H256, PeerConnection)> {
1174        self.do_get_best_peer_excluding(capabilities, &[])
1175    }
1176
1177    /// Like `do_get_best_peer`, but excludes specific peers from selection.
1178    /// Used by `update_pivot` to rotate through peers on repeated failures.
1179    fn do_get_best_peer_excluding(
1180        &self,
1181        capabilities: &[Capability],
1182        excluded: &[H256],
1183    ) -> Option<(H256, PeerConnection)> {
1184        self.peers
1185            .iter()
1186            .filter_map(|(id, peer_data)| {
1187                if excluded.contains(id)
1188                    || !self.can_try_more_requests(&peer_data.score, &peer_data.requests)
1189                    || !capabilities
1190                        .iter()
1191                        .any(|cap| peer_data.supported_capabilities.contains(cap))
1192                {
1193                    None
1194                } else {
1195                    let connection = peer_data.connection.clone()?;
1196                    Some((*id, peer_data.score, peer_data.requests, connection))
1197                }
1198            })
1199            .max_by_key(|(_, score, reqs, _)| self.weight_peer(score, reqs))
1200            .map(|(k, _, _, v)| (k, v))
1201    }
1202
1203    /// Returns up to `n` best peers with capability overlap, sorted by weight
1204    /// descending. Excludes peers at capacity. Does NOT mutate state — caller
1205    /// is responsible for incrementing `requests` on each returned peer. The
1206    /// sort uses a pre-increment snapshot: later picks don't see earlier
1207    /// picks' bumps, which is fine for small `n`.
1208    fn do_get_best_n_peers(
1209        &self,
1210        capabilities: &[Capability],
1211        n: usize,
1212    ) -> Vec<(H256, PeerConnection)> {
1213        let mut candidates: Vec<(H256, i64, i64, PeerConnection)> = self
1214            .peers
1215            .iter()
1216            .filter_map(|(id, peer_data)| {
1217                if !self.can_try_more_requests(&peer_data.score, &peer_data.requests)
1218                    || !capabilities
1219                        .iter()
1220                        .any(|cap| peer_data.supported_capabilities.contains(cap))
1221                {
1222                    None
1223                } else {
1224                    let connection = peer_data.connection.clone()?;
1225                    Some((*id, peer_data.score, peer_data.requests, connection))
1226                }
1227            })
1228            .collect();
1229
1230        candidates.sort_by_key(|(_, score, reqs, _)| -self.weight_peer(score, reqs));
1231        candidates
1232            .into_iter()
1233            .take(n)
1234            .map(|(id, _, _, conn)| (id, conn))
1235            .collect()
1236    }
1237
1238    // --- Contact operations ---
1239
1240    /// Prune disposable contacts from both main and replacement lists.
1241    /// When a main contact is removed, a replacement is automatically promoted.
1242    /// Pruned contacts remain in the connection pool so they can be retried
1243    /// later — the RLPx handshake will reject them if they're truly bad.
1244    fn prune(&mut self) {
1245        for bucket in &mut self.buckets {
1246            // Collect disposable contacts from main list
1247            let main_disposable: Vec<H256> = bucket
1248                .contacts
1249                .iter()
1250                .filter(|(_, c)| c.disposable)
1251                .map(|(id, _)| *id)
1252                .collect();
1253
1254            // Remove from main list and promote replacements
1255            for node_id in main_disposable {
1256                bucket.remove_and_promote(&node_id);
1257            }
1258
1259            // Remove disposable contacts from replacement list
1260            // (these don't get promoted, just removed)
1261            bucket.replacements.retain(|(_, c)| !c.disposable);
1262        }
1263    }
1264
1265    fn do_get_contact_to_initiate(&mut self) -> Option<Contact> {
1266        // Draw from the flat connection pool using O(1) random index probing.
1267        // Pick a random start index and scan forward (wrapping) until we find
1268        // an eligible candidate or complete a full loop.
1269        let pool_len = self.connection_pool.len();
1270        if pool_len == 0 {
1271            return None;
1272        }
1273
1274        let start = rand::random::<usize>() % pool_len;
1275        for offset in 0..pool_len {
1276            let idx = (start + offset) % pool_len;
1277            let Some((node_id, node)) = self.connection_pool.get_index(idx) else {
1278                continue;
1279            };
1280            let node_id = *node_id;
1281
1282            if self.peers.contains_key(&node_id)
1283                || self.already_tried_peers.contains(&node_id)
1284                || self
1285                    .get_contact_or_replacement(&node_id)
1286                    .map(|c| !c.knows_us || c.unwanted || c.is_fork_id_valid == Some(false))
1287                    .unwrap_or(false)
1288            {
1289                continue;
1290            }
1291
1292            let node = node.clone();
1293            self.already_tried_peers.insert(node_id);
1294            let contact = self
1295                .get_contact_or_replacement(&node_id)
1296                .cloned()
1297                .unwrap_or_else(|| Contact::new(node, DiscoveryProtocol::Discv4));
1298            return Some(contact);
1299        }
1300
1301        // Exhausted all candidates — reset tried set for next cycle.
1302        tracing::trace!("Resetting list of tried peers.");
1303        self.already_tried_peers.clear();
1304        None
1305    }
1306
1307    /// Get the `count` closest nodes from the connection pool, sorted by XOR distance to `target`.
1308    fn do_get_closest_from_pool(&self, target: H256, count: usize) -> Vec<(H256, Node)> {
1309        let mut nodes: Vec<(H256, Node, H256)> = Vec::with_capacity(count);
1310
1311        for (node_id, node) in &self.connection_pool {
1312            let dist = xor_distance(&target, node_id);
1313            if nodes.len() < count {
1314                nodes.push((*node_id, node.clone(), dist));
1315            } else if let Some((farthest_idx, _)) =
1316                nodes.iter().enumerate().max_by_key(|(_, (_, _, d))| *d)
1317                && dist < nodes[farthest_idx].2
1318            {
1319                nodes[farthest_idx] = (*node_id, node.clone(), dist);
1320            }
1321        }
1322
1323        nodes.sort_by(|a, b| a.2.cmp(&b.2));
1324        nodes.into_iter().map(|(id, node, _)| (id, node)).collect()
1325    }
1326
1327    /// Get contact for ENR lookup (discv4 only)
1328    fn do_get_contact_for_enr_lookup(&mut self) -> Option<Contact> {
1329        self.iter_contacts()
1330            .filter(|(_, c)| {
1331                c.is_discv4
1332                    && c.was_validated()
1333                    && !c.has_pending_enr_request()
1334                    && c.record.is_none()
1335                    && !c.disposable
1336            })
1337            .map(|(_, c)| c)
1338            .collect::<Vec<_>>()
1339            .choose(&mut rand::rngs::OsRng)
1340            .cloned()
1341            .cloned()
1342    }
1343
1344    fn do_get_contact_to_revalidate(
1345        &self,
1346        revalidation_interval: Duration,
1347        protocol: DiscoveryProtocol,
1348    ) -> Option<Box<Contact>> {
1349        self.iter_contacts()
1350            .filter(|(_, c)| {
1351                c.supports_protocol(protocol)
1352                    && Self::is_validation_needed(c, revalidation_interval)
1353            })
1354            .map(|(_, c)| c)
1355            .choose(&mut rand::rngs::OsRng)
1356            .cloned()
1357            .map(Box::new)
1358    }
1359
1360    fn do_validate_contact(&self, node_id: H256, sender_ip: IpAddr) -> ContactValidation {
1361        let Some(contact) = self.get_contact(&node_id) else {
1362            return ContactValidation::UnknownContact;
1363        };
1364        if !contact.was_validated() {
1365            return ContactValidation::InvalidContact;
1366        }
1367
1368        // Check that the IP address from which we receive the request matches the one we have stored
1369        // to prevent amplification attacks.
1370        if sender_ip != contact.node.ip {
1371            return ContactValidation::IpMismatch;
1372        }
1373        ContactValidation::Valid(Box::new(contact.clone()))
1374    }
1375
1376    /// Get closest nodes using raw XOR distance for accurate ordering.
1377    fn do_get_closest_nodes(&self, node_id: H256) -> Vec<Node> {
1378        #[cfg(feature = "metrics")]
1379        let scan_start = std::time::Instant::now();
1380
1381        let mut nodes: Vec<(Node, H256)> = vec![];
1382
1383        for (contact_id, contact) in self.iter_contacts() {
1384            let dist = xor_distance(&node_id, contact_id);
1385            if nodes.len() < MAX_NODES_IN_NEIGHBORS_PACKET {
1386                nodes.push((contact.node.clone(), dist));
1387            } else if let Some((farthest_idx, _)) =
1388                nodes.iter().enumerate().max_by_key(|(_, (_, d))| *d)
1389                && dist < nodes[farthest_idx].1
1390            {
1391                nodes[farthest_idx] = (contact.node.clone(), dist);
1392            }
1393        }
1394
1395        #[cfg(feature = "metrics")]
1396        {
1397            use ethrex_metrics::p2p::METRICS_P2P;
1398            METRICS_P2P.observe_iter_contacts_duration(scan_start.elapsed().as_secs_f64());
1399        }
1400
1401        nodes.into_iter().map(|(node, _)| node).collect()
1402    }
1403
1404    /// Get nodes at distances for discv5 (returns Vec<NodeRecord>).
1405    /// Uses the discv5 spec log-distance: `floor(log2(XOR))` for non-zero XOR.
1406    /// Distance 0 is reserved for the local node itself (handled by the caller),
1407    /// so contacts start at distance >= 1.
1408    fn do_get_nodes_at_distances(&self, distances: &[u32]) -> Vec<NodeRecord> {
1409        self.iter_contacts()
1410            .filter_map(|(contact_id, contact)| {
1411                let dist = distance(&self.local_node_id, contact_id) as u32;
1412                if distances.contains(&dist) {
1413                    contact.record.clone()
1414                } else {
1415                    None
1416                }
1417            })
1418            .take(MAX_ENRS_PER_FINDNODE_RESPONSE)
1419            .collect()
1420    }
1421
1422    async fn do_new_contacts(&mut self, nodes: Vec<Node>, protocol: DiscoveryProtocol) {
1423        for node in nodes {
1424            let node_id = node.node_id();
1425            if node_id == self.local_node_id {
1426                continue;
1427            }
1428            #[cfg(feature = "metrics")]
1429            let insert_start = std::time::Instant::now();
1430
1431            // Always add to the connection pool (regardless of k-bucket capacity)
1432            self.insert_to_connection_pool(node_id, node.clone());
1433
1434            if self.contact_exists(&node_id) {
1435                // Contact already exists (main or replacement list), update protocol
1436                if let Some(contact) = self.get_contact_or_replacement_mut(&node_id) {
1437                    contact.add_protocol(protocol);
1438                }
1439            } else {
1440                let contact = Contact::new(node, protocol);
1441                self.insert_contact(node_id, contact);
1442                METRICS.record_new_discovery().await;
1443            }
1444
1445            #[cfg(feature = "metrics")]
1446            {
1447                use ethrex_metrics::p2p::METRICS_P2P;
1448                METRICS_P2P.observe_insert_contact_duration(insert_start.elapsed().as_secs_f64());
1449            }
1450        }
1451    }
1452
1453    async fn do_new_contact_records(&mut self, node_records: Vec<NodeRecord>) {
1454        for node_record in node_records {
1455            if !node_record.verify_signature() {
1456                continue;
1457            }
1458            if let Ok(node) = Node::from_enr(&node_record) {
1459                let node_id = node.node_id();
1460                if node_id == self.local_node_id {
1461                    continue;
1462                }
1463
1464                // Always add to the connection pool (regardless of k-bucket capacity)
1465                self.insert_to_connection_pool(node_id, node.clone());
1466
1467                if self.contact_exists(&node_id) {
1468                    // Check if we need to evaluate fork_id before taking
1469                    // the mutable borrow.
1470                    let should_update = self
1471                        .get_contact_or_replacement(&node_id)
1472                        .map(|c| match c.record.as_ref() {
1473                            None => true,
1474                            Some(r) => node_record.seq > r.seq,
1475                        })
1476                        .unwrap_or(false);
1477                    let is_fork_id_valid = if should_update {
1478                        Self::evaluate_fork_id(&node_record, &self.store).await
1479                    } else {
1480                        None
1481                    };
1482                    if let Some(contact) = self.get_contact_or_replacement_mut(&node_id) {
1483                        contact.add_protocol(DiscoveryProtocol::Discv5);
1484                        if should_update {
1485                            if contact.node.ip != node.ip || contact.node.udp_port != node.udp_port
1486                            {
1487                                contact.validation_timestamp = None;
1488                                contact.ping_id = None;
1489                            }
1490                            contact.node = node;
1491                            contact.record = Some(node_record);
1492                            contact.is_fork_id_valid = is_fork_id_valid;
1493                        }
1494                    }
1495                } else {
1496                    let is_fork_id_valid = Self::evaluate_fork_id(&node_record, &self.store).await;
1497                    let mut contact = Contact::new(node, DiscoveryProtocol::Discv5);
1498                    contact.is_fork_id_valid = is_fork_id_valid;
1499                    contact.record = Some(node_record);
1500                    self.insert_contact(node_id, contact);
1501                    METRICS.record_new_discovery().await;
1502                }
1503            }
1504        }
1505    }
1506
1507    async fn evaluate_fork_id(record: &NodeRecord, store: &Store) -> Option<bool> {
1508        if let Some(remote_fork_id) = record.get_fork_id() {
1509            backend::is_fork_id_valid(store, remote_fork_id)
1510                .await
1511                .ok()
1512                .or(Some(false))
1513        } else {
1514            Some(false)
1515        }
1516    }
1517
1518    fn do_peer_count_by_capabilities(&self, capabilities: Vec<Capability>) -> usize {
1519        self.peers
1520            .values()
1521            .filter(|peer_data| {
1522                capabilities
1523                    .iter()
1524                    .any(|cap| peer_data.supported_capabilities.contains(cap))
1525            })
1526            .count()
1527    }
1528
1529    fn do_get_random_peer(&self, capabilities: Vec<Capability>) -> Option<(H256, PeerConnection)> {
1530        let peers: Vec<(H256, &PeerConnection, i64)> = self
1531            .peers
1532            .iter()
1533            .filter_map(|(node_id, peer_data)| {
1534                if !capabilities
1535                    .iter()
1536                    .any(|cap| peer_data.supported_capabilities.contains(cap))
1537                {
1538                    return None;
1539                }
1540                peer_data
1541                    .connection
1542                    .as_ref()
1543                    .map(|connection| (*node_id, connection, peer_data.score))
1544            })
1545            .collect();
1546        if peers.is_empty() {
1547            return None;
1548        }
1549        // Weight by score: maps [-150, 50] to [1, 201] so bad peers are unlikely but not excluded
1550        let weights: Vec<u64> = peers
1551            .iter()
1552            .map(|(_, _, score)| (score.max(&MIN_SCORE_CRITICAL) - MIN_SCORE_CRITICAL + 1) as u64)
1553            .collect();
1554        let dist = WeightedIndex::new(&weights).ok()?;
1555        let idx = dist.sample(&mut rand::rngs::OsRng);
1556        Some((peers[idx].0, peers[idx].1.clone()))
1557    }
1558
1559    fn is_validation_needed(contact: &Contact, revalidation_interval: Duration) -> bool {
1560        if contact.disposable {
1561            return false;
1562        }
1563
1564        let sent_ping_ttl = Duration::from_secs(30);
1565
1566        if contact.has_pending_ping() {
1567            // Outstanding ping — only re-ping if it timed out (stale).
1568            contact
1569                .validation_timestamp
1570                .map(|ts| Instant::now().saturating_duration_since(ts) > sent_ping_ttl)
1571                .unwrap_or(false)
1572        } else {
1573            // No pending ping — check if never validated or validation expired.
1574            !contact.was_validated()
1575                || contact
1576                    .validation_timestamp
1577                    .map(|ts| Instant::now().saturating_duration_since(ts) > revalidation_interval)
1578                    .unwrap_or(false)
1579        }
1580    }
1581}
1582
1583pub type PeerTable = ActorRef<PeerTableServer>;
1584
1585#[cfg(test)]
1586mod tests {
1587    use super::*;
1588    use ethrex_common::H512;
1589    use std::net::Ipv4Addr;
1590
1591    /// Helper: build a dummy contact with a unique node derived from `seed`.
1592    fn dummy_contact(seed: u8) -> (H256, Contact) {
1593        let pk = H512::from_low_u64_be(seed as u64 + 1);
1594        let node = Node::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, seed)), 30303, 30303, pk);
1595        let node_id = node.node_id();
1596        let contact = Contact::new(node, DiscoveryProtocol::Discv4);
1597        (node_id, contact)
1598    }
1599
1600    // --- KBucket::insert ---
1601
1602    #[test]
1603    fn insert_into_empty_bucket() {
1604        let mut bucket = KBucket::default();
1605        let (id, contact) = dummy_contact(1);
1606        assert!(bucket.insert(id, contact));
1607        assert_eq!(bucket.contacts.len(), 1);
1608        assert!(bucket.replacements.is_empty());
1609    }
1610
1611    #[test]
1612    fn insert_fills_bucket_then_goes_to_replacements() {
1613        let mut bucket = KBucket::default();
1614
1615        // Fill the main list to capacity.
1616        for i in 0..MAX_NODES_PER_BUCKET as u8 {
1617            let (id, contact) = dummy_contact(i);
1618            assert!(bucket.insert(id, contact), "contact {i} should go to main");
1619        }
1620        assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET);
1621
1622        // The next insert should go to the replacement list.
1623        let (id, contact) = dummy_contact(200);
1624        assert!(!bucket.insert(id, contact));
1625        assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET);
1626        assert_eq!(bucket.replacements.len(), 1);
1627    }
1628
1629    // --- KBucket::contains ---
1630
1631    #[test]
1632    fn contains_checks_main_and_replacement() {
1633        let mut bucket = KBucket::default();
1634
1635        let (id_main, contact_main) = dummy_contact(1);
1636        bucket.insert(id_main, contact_main);
1637        assert!(bucket.contains(&id_main));
1638
1639        // Fill bucket so next goes to replacement.
1640        for i in 2..=(MAX_NODES_PER_BUCKET as u8) {
1641            let (id, c) = dummy_contact(i);
1642            bucket.insert(id, c);
1643        }
1644        let (id_repl, contact_repl) = dummy_contact(100);
1645        bucket.insert(id_repl, contact_repl);
1646
1647        assert!(bucket.contains(&id_repl));
1648        assert!(!bucket.contains(&H256::zero()));
1649    }
1650
1651    // --- KBucket::get / get_any ---
1652
1653    #[test]
1654    fn get_returns_main_list_only() {
1655        let mut bucket = KBucket::default();
1656        let (id, contact) = dummy_contact(1);
1657        bucket.insert(id, contact);
1658        assert!(bucket.get(&id).is_some());
1659        assert!(bucket.get(&H256::zero()).is_none());
1660    }
1661
1662    #[test]
1663    fn get_any_returns_from_replacement() {
1664        let mut bucket = KBucket::default();
1665        // Fill main list.
1666        for i in 0..MAX_NODES_PER_BUCKET as u8 {
1667            let (id, c) = dummy_contact(i);
1668            bucket.insert(id, c);
1669        }
1670        // Insert into replacements.
1671        let (id_repl, c_repl) = dummy_contact(200);
1672        bucket.insert(id_repl, c_repl);
1673
1674        assert!(bucket.get(&id_repl).is_none()); // not in main
1675        assert!(bucket.get_any(&id_repl).is_some()); // found via replacement
1676    }
1677
1678    // --- KBucket::remove_and_promote ---
1679
1680    #[test]
1681    fn remove_and_promote_with_replacement() {
1682        let mut bucket = KBucket::default();
1683
1684        // Fill main list.
1685        let mut main_ids = Vec::new();
1686        for i in 0..MAX_NODES_PER_BUCKET as u8 {
1687            let (id, c) = dummy_contact(i);
1688            main_ids.push(id);
1689            bucket.insert(id, c);
1690        }
1691
1692        // Add a replacement.
1693        let (repl_id, repl_contact) = dummy_contact(200);
1694        bucket.insert(repl_id, repl_contact);
1695
1696        // Remove a main contact — the replacement should be promoted.
1697        let promoted = bucket.remove_and_promote(&main_ids[0]);
1698        assert_eq!(promoted, Some(repl_id));
1699        assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET);
1700        assert!(bucket.replacements.is_empty());
1701        assert!(!bucket.contains(&main_ids[0]));
1702        assert!(bucket.contains(&repl_id));
1703    }
1704
1705    #[test]
1706    fn remove_and_promote_without_replacement() {
1707        let mut bucket = KBucket::default();
1708        let (id, c) = dummy_contact(1);
1709        bucket.insert(id, c);
1710
1711        let promoted = bucket.remove_and_promote(&id);
1712        assert!(promoted.is_none());
1713        assert!(bucket.contacts.is_empty());
1714    }
1715
1716    #[test]
1717    fn remove_nonexistent_returns_none() {
1718        let mut bucket = KBucket::default();
1719        assert!(bucket.remove_and_promote(&H256::zero()).is_none());
1720    }
1721
1722    // --- Replacement eviction ---
1723
1724    #[test]
1725    fn replacement_list_evicts_oldest_when_full() {
1726        let mut bucket = KBucket::default();
1727        // Fill main list.
1728        for i in 0..MAX_NODES_PER_BUCKET as u8 {
1729            let (id, c) = dummy_contact(i);
1730            bucket.insert(id, c);
1731        }
1732
1733        // Fill replacement list beyond capacity.
1734        let mut repl_ids = Vec::new();
1735        for i in 0..(MAX_REPLACEMENTS_PER_BUCKET + 2) as u8 {
1736            let seed = 100 + i;
1737            let (id, c) = dummy_contact(seed);
1738            repl_ids.push(id);
1739            bucket.insert(id, c);
1740        }
1741
1742        assert_eq!(bucket.replacements.len(), MAX_REPLACEMENTS_PER_BUCKET);
1743        // The oldest two should have been evicted.
1744        assert!(!bucket.contains(&repl_ids[0]));
1745        assert!(!bucket.contains(&repl_ids[1]));
1746        // The most recent ones should still be there.
1747        assert!(bucket.contains(repl_ids.last().unwrap()));
1748    }
1749
1750    // --- bucket_index ---
1751
1752    #[test]
1753    fn bucket_index_self_is_none() {
1754        let id = H256::random();
1755        assert_eq!(bucket_index(&id, &id), None);
1756    }
1757
1758    #[test]
1759    fn bucket_index_minimal_distance() {
1760        let local = H256::zero();
1761        // XOR distance = 1 → highest bit is bit 0 → bucket 0
1762        let mut remote = H256::zero();
1763        remote.0[31] = 1;
1764        assert_eq!(bucket_index(&local, &remote), Some(0));
1765    }
1766
1767    #[test]
1768    fn bucket_index_maximal_distance() {
1769        let local = H256::zero();
1770        // XOR distance has highest bit at position 255 → bucket 255
1771        let mut remote = H256::zero();
1772        remote.0[0] = 0x80;
1773        assert_eq!(bucket_index(&local, &remote), Some(255));
1774    }
1775}