1use crate::{
13 backend,
14 metrics::METRICS,
15 rlpx::{connection::server::PeerConnection, p2p::Capability},
16 types::{Node, NodeRecord},
17 utils::distance,
18};
19use bytes::Bytes;
20use ethrex_common::{H256, U256};
21use ethrex_storage::Store;
22use indexmap::IndexMap;
23use rand::distributions::WeightedIndex;
24use rand::prelude::Distribution;
25use rand::seq::{IteratorRandom, SliceRandom};
26use rustc_hash::{FxHashMap, FxHashSet};
27use spawned_concurrency::{
28 actor,
29 error::ActorError,
30 protocol,
31 tasks::{Actor, ActorRef, ActorStart as _, Context, Handler, Response, send_message_on},
32};
33use std::{
34 net::IpAddr,
35 time::{Duration, Instant},
36};
37
38const MAX_SCORE: i64 = 50;
39const MIN_SCORE: i64 = -50;
40const MIN_SCORE_CRITICAL: i64 = MIN_SCORE * 3;
42const SCORE_WEIGHT: i64 = 1;
44const REQUESTS_WEIGHT: i64 = 1;
46const MAX_CONCURRENT_REQUESTS_PER_PEER: i64 = 100;
48pub const TARGET_PEERS: usize = 100;
50pub(crate) const MAX_NODES_IN_NEIGHBORS_PACKET: usize = 16;
52const MAX_ENRS_PER_FINDNODE_RESPONSE: usize = 16;
54
55const NUMBER_OF_BUCKETS: usize = 256;
57pub const MAX_NODES_PER_BUCKET: usize = 16;
59const MAX_REPLACEMENTS_PER_BUCKET: usize = 10;
61const MAX_CONNECTION_POOL_SIZE: usize = 10_000;
67
68#[derive(Debug, Clone, Default)]
71pub struct KBucket {
72 pub(crate) contacts: Vec<(H256, Contact)>,
73 pub(crate) replacements: Vec<(H256, Contact)>,
74}
75
76impl KBucket {
77 fn get(&self, node_id: &H256) -> Option<&Contact> {
79 self.contacts
80 .iter()
81 .find(|(id, _)| id == node_id)
82 .map(|(_, c)| c)
83 }
84
85 fn get_any(&self, node_id: &H256) -> Option<&Contact> {
87 self.get(node_id).or_else(|| {
88 self.replacements
89 .iter()
90 .find(|(id, _)| id == node_id)
91 .map(|(_, c)| c)
92 })
93 }
94
95 fn get_mut(&mut self, node_id: &H256) -> Option<&mut Contact> {
97 if let Some((_, c)) = self.contacts.iter_mut().find(|(id, _)| id == node_id) {
98 return Some(c);
99 }
100 self.replacements
101 .iter_mut()
102 .find(|(id, _)| id == node_id)
103 .map(|(_, c)| c)
104 }
105
106 fn contains(&self, node_id: &H256) -> bool {
108 self.contacts.iter().any(|(id, _)| id == node_id)
109 || self.replacements.iter().any(|(id, _)| id == node_id)
110 }
111
112 fn insert(&mut self, node_id: H256, contact: Contact) -> bool {
115 if self.contacts.len() < MAX_NODES_PER_BUCKET {
116 self.contacts.push((node_id, contact));
117 true
118 } else {
119 self.insert_replacement(node_id, contact);
120 false
121 }
122 }
123
124 fn insert_replacement(&mut self, node_id: H256, contact: Contact) {
126 if self.replacements.len() >= MAX_REPLACEMENTS_PER_BUCKET {
127 self.replacements.remove(0);
128 }
129 self.replacements.push((node_id, contact));
130 }
131
132 fn remove_and_promote(&mut self, node_id: &H256) -> Option<H256> {
135 let idx = self.contacts.iter().position(|(id, _)| id == node_id)?;
136 self.contacts.remove(idx);
137 if !self.replacements.is_empty() {
138 let (replacement_id, replacement) = self.replacements.remove(0);
139 self.contacts.push((replacement_id, replacement));
140 Some(replacement_id)
141 } else {
142 None
143 }
144 }
145}
146
147fn bucket_index(local_node_id: &H256, node_id: &H256) -> Option<usize> {
152 let xor = *local_node_id ^ *node_id;
153 let dist = U256::from_big_endian(xor.as_bytes());
154 if dist.is_zero() {
155 None
156 } else {
157 Some(dist.bits() - 1)
158 }
159}
160
161pub(crate) fn xor_distance(a: &H256, b: &H256) -> H256 {
165 *a ^ *b
166}
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
171pub enum DiscoveryProtocol {
172 Discv4,
174 Discv5,
176}
177
178pub use crate::discv5::session::Session;
181
182#[derive(Debug, Clone)]
183pub struct Contact {
184 pub node: Node,
185 pub is_discv4: bool,
187 pub is_discv5: bool,
189 pub validation_timestamp: Option<Instant>,
192 pub ping_id: Option<Bytes>,
197
198 pub enr_request_hash: Option<H256>,
201
202 pub record: Option<NodeRecord>,
204 pub disposable: bool,
206 pub knows_us: bool,
208 pub unwanted: bool,
210 pub is_fork_id_valid: Option<bool>,
212 session: Option<Session>,
214}
215
216impl Contact {
217 pub fn was_validated(&self) -> bool {
218 self.validation_timestamp.is_some() && !self.has_pending_ping()
219 }
220
221 pub fn has_pending_ping(&self) -> bool {
222 self.ping_id.is_some()
223 }
224
225 pub fn record_ping_sent(&mut self, ping_id: Bytes) {
226 self.validation_timestamp = Some(Instant::now());
227 self.ping_id = Some(ping_id);
228 }
229
230 pub fn record_enr_request_sent(&mut self, request_hash: H256) {
231 self.enr_request_hash = Some(request_hash);
232 }
233
234 pub fn record_enr_response_received(&mut self, request_hash: H256, record: NodeRecord) {
236 if self
237 .enr_request_hash
238 .take_if(|h| *h == request_hash)
239 .is_some()
240 {
241 self.record = Some(record);
242 }
243 }
244
245 pub fn has_pending_enr_request(&self) -> bool {
246 self.enr_request_hash.is_some()
247 }
248}
249
250impl Contact {
251 pub fn new(node: Node, protocol: DiscoveryProtocol) -> Self {
252 Self {
253 node,
254 is_discv4: protocol == DiscoveryProtocol::Discv4,
255 is_discv5: protocol == DiscoveryProtocol::Discv5,
256 validation_timestamp: None,
257 ping_id: None,
258 enr_request_hash: None,
259 record: None,
260 disposable: false,
261 knows_us: true,
262 unwanted: false,
263 is_fork_id_valid: None,
264 session: None,
265 }
266 }
267
268 pub fn supports_protocol(&self, protocol: DiscoveryProtocol) -> bool {
270 match protocol {
271 DiscoveryProtocol::Discv4 => self.is_discv4,
272 DiscoveryProtocol::Discv5 => self.is_discv5,
273 }
274 }
275
276 pub fn add_protocol(&mut self, protocol: DiscoveryProtocol) {
278 match protocol {
279 DiscoveryProtocol::Discv4 => self.is_discv4 = true,
280 DiscoveryProtocol::Discv5 => self.is_discv5 = true,
281 }
282 }
283}
284
285#[derive(Debug, Clone)]
286pub struct PeerData {
287 pub node: Node,
288 pub record: Option<NodeRecord>,
289 pub supported_capabilities: Vec<Capability>,
290 pub is_connection_inbound: bool,
293 pub connection: Option<PeerConnection>,
295 score: i64,
297 requests: i64,
299 pub last_response_time: Option<u64>,
301}
302
303impl PeerData {
304 pub fn new(
305 node: Node,
306 record: Option<NodeRecord>,
307 connection: Option<PeerConnection>,
308 capabilities: Vec<Capability>,
309 ) -> Self {
310 Self {
311 node,
312 record,
313 supported_capabilities: capabilities,
314 is_connection_inbound: false,
315 connection,
316 score: Default::default(),
317 requests: Default::default(),
318 last_response_time: None,
319 }
320 }
321}
322
323#[derive(Debug, Clone, serde::Serialize)]
325pub struct PeerDiagnostics {
326 pub peer_id: H256,
327 pub score: i64,
328 pub inflight_requests: i64,
329 pub eligible: bool,
330 pub capabilities: Vec<String>,
331 pub ip: IpAddr,
332 pub client_version: String,
333 pub connection_direction: String,
334 pub last_response_time: Option<u64>,
335}
336
337#[derive(Debug, Clone)]
339pub enum ContactValidation {
340 Valid(Box<Contact>),
341 InvalidContact,
342 UnknownContact,
343 IpMismatch,
344}
345
346#[must_use = "dropping this permit immediately releases the peer's request slot"]
358pub struct RequestPermit {
359 peer_table: PeerTable,
360 peer_id: H256,
361}
362
363impl RequestPermit {
364 pub(crate) fn new(peer_table: PeerTable, peer_id: H256) -> Self {
365 Self {
366 peer_table,
367 peer_id,
368 }
369 }
370}
371
372impl std::fmt::Debug for RequestPermit {
373 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
374 f.debug_struct("RequestPermit")
375 .field("peer_id", &self.peer_id)
376 .finish_non_exhaustive()
377 }
378}
379
380impl Drop for RequestPermit {
381 fn drop(&mut self) {
382 let _ = self.peer_table.dec_requests(self.peer_id);
385 }
386}
387
388#[protocol]
389pub trait PeerTableServerProtocol: Send + Sync {
390 fn new_contacts(&self, nodes: Vec<Node>, protocol: DiscoveryProtocol)
392 -> Result<(), ActorError>;
393 fn new_contact_records(&self, node_records: Vec<NodeRecord>) -> Result<(), ActorError>;
394 fn new_connected_peer(
395 &self,
396 node: Node,
397 connection: PeerConnection,
398 capabilities: Vec<Capability>,
399 ) -> Result<(), ActorError>;
400 fn set_session_info(&self, node_id: H256, session: Session) -> Result<(), ActorError>;
401 fn remove_peer(&self, node_id: H256) -> Result<(), ActorError>;
402 fn dec_requests(&self, node_id: H256) -> Result<(), ActorError>;
403 fn set_unwanted(&self, node_id: H256) -> Result<(), ActorError>;
404 fn set_is_fork_id_valid(&self, node_id: H256, valid: bool) -> Result<(), ActorError>;
405 fn record_success(&self, node_id: H256) -> Result<(), ActorError>;
406 fn record_failure(&self, node_id: H256) -> Result<(), ActorError>;
407 fn record_critical_failure(&self, node_id: H256) -> Result<(), ActorError>;
408 fn record_ping_sent(&self, node_id: H256, ping_id: Bytes) -> Result<(), ActorError>;
409 fn record_pong_received(&self, node_id: H256, ping_id: Bytes) -> Result<(), ActorError>;
410 fn record_enr_request_sent(&self, node_id: H256, request_hash: H256) -> Result<(), ActorError>;
411 fn record_enr_response_received(
412 &self,
413 node_id: H256,
414 request_hash: H256,
415 record: NodeRecord,
416 ) -> Result<(), ActorError>;
417 fn set_disposable(&self, node_id: H256) -> Result<(), ActorError>;
418 fn mark_knows_us(&self, node_id: H256) -> Result<(), ActorError>;
419 fn prune_table(&self) -> Result<(), ActorError>;
420 fn shutdown(&self) -> Result<(), ActorError>;
421
422 fn peer_count(&self) -> Response<usize>;
424 fn peer_count_by_capabilities(&self, capabilities: Vec<Capability>) -> Response<usize>;
425 fn target_reached(&self) -> Response<bool>;
426 fn target_peers_reached(&self) -> Response<bool>;
427 fn target_peers_completion(&self) -> Response<f64>;
428 fn get_contact_to_initiate(&self) -> Response<Option<Box<Contact>>>;
429 fn get_contact_for_enr_lookup(&self) -> Response<Option<Box<Contact>>>;
430 fn get_closest_from_pool(&self, target: H256, count: usize) -> Response<Vec<(H256, Node)>>;
431 fn get_contact(&self, node_id: H256) -> Response<Option<Box<Contact>>>;
432 fn get_contact_to_revalidate(
433 &self,
434 revalidation_interval: Duration,
435 protocol: DiscoveryProtocol,
436 ) -> Response<Option<Box<Contact>>>;
437 fn get_best_peer(
438 &self,
439 capabilities: Vec<Capability>,
440 ) -> Response<Option<(H256, PeerConnection, RequestPermit)>>;
441 fn get_best_peer_excluding(
442 &self,
443 capabilities: Vec<Capability>,
444 excluded: Vec<H256>,
445 ) -> Response<Option<(H256, PeerConnection, RequestPermit)>>;
446 fn get_best_n_peers(
447 &self,
448 capabilities: Vec<Capability>,
449 n: usize,
450 ) -> Response<Vec<(H256, PeerConnection, RequestPermit)>>;
451 fn has_eligible_peer(&self, capabilities: Vec<Capability>) -> Response<bool>;
454 fn get_score(&self, node_id: H256) -> Response<i64>;
455 fn get_connected_nodes(&self) -> Response<Vec<Node>>;
456 fn get_peers_with_capabilities(&self)
457 -> Response<Vec<(H256, PeerConnection, Vec<Capability>)>>;
458 fn insert_if_new(&self, node: Node, protocol: DiscoveryProtocol) -> Response<bool>;
459 fn validate_contact(&self, node_id: H256, sender_ip: IpAddr) -> Response<ContactValidation>;
460 fn get_closest_nodes(&self, node_id: H256) -> Response<Vec<Node>>;
461 fn get_nodes_at_distances(&self, distances: Vec<u32>) -> Response<Vec<NodeRecord>>;
462 fn get_peers_data(&self) -> Response<Vec<PeerData>>;
463 fn get_random_peer(
464 &self,
465 capabilities: Vec<Capability>,
466 ) -> Response<Option<(H256, PeerConnection, RequestPermit)>>;
467 fn get_session_info(&self, node_id: H256) -> Response<Option<Session>>;
468 fn get_peer_diagnostics(&self) -> Response<Vec<PeerDiagnostics>>;
469 fn get_peer_connection(&self, peer_id: H256) -> Response<Option<PeerConnection>>;
470}
471
472#[derive(Debug)]
473pub struct PeerTableServer {
474 local_node_id: H256,
475 buckets: Vec<KBucket>,
476 peers: IndexMap<H256, PeerData>,
477 already_tried_peers: FxHashSet<H256>,
478 target_peers: usize,
479 store: Store,
480 sessions: FxHashMap<H256, Session>,
483 connection_pool: IndexMap<H256, Node>,
489}
490
491#[actor(protocol = PeerTableServerProtocol)]
492impl PeerTableServer {
493 pub fn spawn(local_node_id: H256, target_peers: usize, store: Store) -> PeerTable {
494 PeerTableServer::new(local_node_id, target_peers, store).start()
495 }
496
497 pub(crate) fn new(local_node_id: H256, target_peers: usize, store: Store) -> Self {
498 Self {
499 local_node_id,
500 buckets: vec![KBucket::default(); NUMBER_OF_BUCKETS],
501 peers: Default::default(),
502 already_tried_peers: Default::default(),
503 target_peers,
504 store,
505 sessions: Default::default(),
506 connection_pool: IndexMap::with_capacity(MAX_CONNECTION_POOL_SIZE),
507 }
508 }
509
510 #[started]
511 async fn started(&mut self, ctx: &Context<Self>) {
512 send_message_on(
513 ctx.clone(),
514 tokio::signal::ctrl_c(),
515 peer_table_server_protocol::Shutdown,
516 );
517 }
518
519 #[send_handler]
522 async fn handle_new_contacts(
523 &mut self,
524 msg: peer_table_server_protocol::NewContacts,
525 _ctx: &Context<Self>,
526 ) {
527 self.do_new_contacts(msg.nodes, msg.protocol).await;
528 }
529
530 #[send_handler]
531 async fn handle_new_contact_records(
532 &mut self,
533 msg: peer_table_server_protocol::NewContactRecords,
534 _ctx: &Context<Self>,
535 ) {
536 self.do_new_contact_records(msg.node_records).await;
537 }
538
539 #[send_handler]
540 async fn handle_new_connected_peer(
541 &mut self,
542 msg: peer_table_server_protocol::NewConnectedPeer,
543 _ctx: &Context<Self>,
544 ) {
545 let new_peer_id = msg.node.node_id();
546 let new_peer = PeerData::new(msg.node, None, Some(msg.connection), msg.capabilities);
547 self.peers.insert(new_peer_id, new_peer);
548 }
549
550 #[send_handler]
551 async fn handle_set_session_info(
552 &mut self,
553 msg: peer_table_server_protocol::SetSessionInfo,
554 _ctx: &Context<Self>,
555 ) {
556 self.sessions.insert(msg.node_id, msg.session.clone());
558 if let Some(contact) = self.get_contact_mut(&msg.node_id) {
560 contact.session = Some(msg.session);
561 }
562 }
563
564 #[send_handler]
565 async fn handle_remove_peer(
566 &mut self,
567 msg: peer_table_server_protocol::RemovePeer,
568 _ctx: &Context<Self>,
569 ) {
570 self.peers.swap_remove(&msg.node_id);
571 }
572
573 #[send_handler]
574 async fn handle_dec_requests(
575 &mut self,
576 msg: peer_table_server_protocol::DecRequests,
577 _ctx: &Context<Self>,
578 ) {
579 self.peers.entry(msg.node_id).and_modify(|peer_data| {
580 if peer_data.requests <= 0 {
581 tracing::debug!(
585 peer_id = ?msg.node_id,
586 requests = peer_data.requests,
587 "dec_requests with counter already <= 0",
588 );
589 }
590 peer_data.requests = peer_data.requests.saturating_sub(1).max(0)
591 });
592 }
593
594 #[send_handler]
595 async fn handle_set_unwanted(
596 &mut self,
597 msg: peer_table_server_protocol::SetUnwanted,
598 _ctx: &Context<Self>,
599 ) {
600 if let Some(contact) = self.get_contact_mut(&msg.node_id) {
601 contact.unwanted = true;
602 }
603 }
604
605 #[send_handler]
606 async fn handle_set_is_fork_id_valid(
607 &mut self,
608 msg: peer_table_server_protocol::SetIsForkIdValid,
609 _ctx: &Context<Self>,
610 ) {
611 if let Some(contact) = self.get_contact_mut(&msg.node_id) {
612 contact.is_fork_id_valid = Some(msg.valid);
613 }
614 }
615
616 #[send_handler]
617 async fn handle_record_success(
618 &mut self,
619 msg: peer_table_server_protocol::RecordSuccess,
620 _ctx: &Context<Self>,
621 ) {
622 let now = std::time::SystemTime::now()
623 .duration_since(std::time::UNIX_EPOCH)
624 .unwrap_or_default()
625 .as_secs();
626 self.peers.entry(msg.node_id).and_modify(|peer_data| {
627 peer_data.score = (peer_data.score + 1).min(MAX_SCORE);
628 peer_data.last_response_time = Some(now);
629 });
630 }
631
632 #[send_handler]
633 async fn handle_record_failure(
634 &mut self,
635 msg: peer_table_server_protocol::RecordFailure,
636 _ctx: &Context<Self>,
637 ) {
638 self.peers
639 .entry(msg.node_id)
640 .and_modify(|peer_data| peer_data.score = (peer_data.score - 1).max(MIN_SCORE));
641 }
642
643 #[send_handler]
644 async fn handle_record_critical_failure(
645 &mut self,
646 msg: peer_table_server_protocol::RecordCriticalFailure,
647 _ctx: &Context<Self>,
648 ) {
649 self.peers
650 .entry(msg.node_id)
651 .and_modify(|peer_data| peer_data.score = MIN_SCORE_CRITICAL);
652 }
653
654 #[send_handler]
655 async fn handle_record_ping_sent(
656 &mut self,
657 msg: peer_table_server_protocol::RecordPingSent,
658 _ctx: &Context<Self>,
659 ) {
660 if let Some(contact) = self.get_contact_mut(&msg.node_id) {
661 contact.record_ping_sent(msg.ping_id);
662 }
663 }
664
665 #[send_handler]
666 async fn handle_record_pong_received(
667 &mut self,
668 msg: peer_table_server_protocol::RecordPongReceived,
669 _ctx: &Context<Self>,
670 ) {
671 if let Some(contact) = self.get_contact_mut(&msg.node_id)
672 && contact
673 .ping_id
674 .as_ref()
675 .map(|value| *value == msg.ping_id)
676 .unwrap_or(false)
677 {
678 contact.ping_id = None;
679 }
680 }
681
682 #[send_handler]
683 async fn handle_record_enr_request_sent(
684 &mut self,
685 msg: peer_table_server_protocol::RecordEnrRequestSent,
686 _ctx: &Context<Self>,
687 ) {
688 if let Some(contact) = self.get_contact_mut(&msg.node_id) {
689 contact.record_enr_request_sent(msg.request_hash);
690 }
691 }
692
693 #[send_handler]
694 async fn handle_record_enr_response_received(
695 &mut self,
696 msg: peer_table_server_protocol::RecordEnrResponseReceived,
697 _ctx: &Context<Self>,
698 ) {
699 if let Some(contact) = self.get_contact_mut(&msg.node_id) {
700 contact.record_enr_response_received(msg.request_hash, msg.record);
701 }
702 }
703
704 #[send_handler]
705 async fn handle_set_disposable(
706 &mut self,
707 msg: peer_table_server_protocol::SetDisposable,
708 _ctx: &Context<Self>,
709 ) {
710 if let Some(contact) = self.get_contact_mut(&msg.node_id) {
711 contact.disposable = true;
712 }
713 }
714
715 #[send_handler]
716 async fn handle_mark_knows_us(
717 &mut self,
718 msg: peer_table_server_protocol::MarkKnowsUs,
719 _ctx: &Context<Self>,
720 ) {
721 if let Some(contact) = self.get_contact_mut(&msg.node_id) {
722 contact.knows_us = true;
723 }
724 }
725
726 #[send_handler]
727 async fn handle_prune_table(
728 &mut self,
729 _msg: peer_table_server_protocol::PruneTable,
730 _ctx: &Context<Self>,
731 ) {
732 self.prune();
733 }
734
735 #[send_handler]
736 async fn handle_shutdown(
737 &mut self,
738 _msg: peer_table_server_protocol::Shutdown,
739 ctx: &Context<Self>,
740 ) {
741 ctx.stop();
742 }
743
744 #[request_handler]
747 async fn handle_peer_count(
748 &mut self,
749 _msg: peer_table_server_protocol::PeerCount,
750 _ctx: &Context<Self>,
751 ) -> usize {
752 self.peers.len()
753 }
754
755 #[request_handler]
756 async fn handle_peer_count_by_capabilities(
757 &mut self,
758 msg: peer_table_server_protocol::PeerCountByCapabilities,
759 _ctx: &Context<Self>,
760 ) -> usize {
761 self.do_peer_count_by_capabilities(msg.capabilities)
762 }
763
764 #[request_handler]
765 async fn handle_target_reached(
766 &mut self,
767 _msg: peer_table_server_protocol::TargetReached,
768 _ctx: &Context<Self>,
769 ) -> bool {
770 self.peers.len() >= self.target_peers
771 }
772
773 #[request_handler]
774 async fn handle_target_peers_reached(
775 &mut self,
776 _msg: peer_table_server_protocol::TargetPeersReached,
777 _ctx: &Context<Self>,
778 ) -> bool {
779 self.peers.len() >= self.target_peers
780 }
781
782 #[request_handler]
783 async fn handle_target_peers_completion(
784 &mut self,
785 _msg: peer_table_server_protocol::TargetPeersCompletion,
786 _ctx: &Context<Self>,
787 ) -> f64 {
788 self.peers.len() as f64 / self.target_peers as f64
789 }
790
791 #[request_handler]
792 async fn handle_get_contact_to_initiate(
793 &mut self,
794 _msg: peer_table_server_protocol::GetContactToInitiate,
795 _ctx: &Context<Self>,
796 ) -> Option<Box<Contact>> {
797 self.do_get_contact_to_initiate().map(Box::new)
798 }
799
800 #[request_handler]
801 async fn handle_get_closest_from_pool(
802 &mut self,
803 msg: peer_table_server_protocol::GetClosestFromPool,
804 _ctx: &Context<Self>,
805 ) -> Vec<(H256, Node)> {
806 self.do_get_closest_from_pool(msg.target, msg.count)
807 }
808
809 #[request_handler]
810 async fn handle_get_contact_for_enr_lookup(
811 &mut self,
812 _msg: peer_table_server_protocol::GetContactForEnrLookup,
813 _ctx: &Context<Self>,
814 ) -> Option<Box<Contact>> {
815 self.do_get_contact_for_enr_lookup().map(Box::new)
816 }
817
818 #[request_handler]
819 async fn handle_get_contact(
820 &mut self,
821 msg: peer_table_server_protocol::GetContact,
822 _ctx: &Context<Self>,
823 ) -> Option<Box<Contact>> {
824 self.get_contact(&msg.node_id).cloned().map(Box::new)
825 }
826
827 #[request_handler]
828 async fn handle_get_contact_to_revalidate(
829 &mut self,
830 msg: peer_table_server_protocol::GetContactToRevalidate,
831 _ctx: &Context<Self>,
832 ) -> Option<Box<Contact>> {
833 self.do_get_contact_to_revalidate(msg.revalidation_interval, msg.protocol)
834 }
835
836 #[request_handler]
837 async fn handle_get_best_peer(
838 &mut self,
839 msg: peer_table_server_protocol::GetBestPeer,
840 ctx: &Context<Self>,
841 ) -> Option<(H256, PeerConnection, RequestPermit)> {
842 let (peer_id, conn) = self.do_get_best_peer(&msg.capabilities)?;
843 self.peers
844 .get_mut(&peer_id)
845 .expect("peer returned by do_get_best_peer must be present in self.peers")
846 .requests += 1;
847 Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id)))
848 }
849
850 #[request_handler]
851 async fn handle_get_best_peer_excluding(
852 &mut self,
853 msg: peer_table_server_protocol::GetBestPeerExcluding,
854 ctx: &Context<Self>,
855 ) -> Option<(H256, PeerConnection, RequestPermit)> {
856 let (peer_id, conn) = self.do_get_best_peer_excluding(&msg.capabilities, &msg.excluded)?;
857 self.peers
858 .get_mut(&peer_id)
859 .expect("peer returned by do_get_best_peer_excluding must be present in self.peers")
860 .requests += 1;
861 Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id)))
862 }
863
864 #[request_handler]
865 async fn handle_get_best_n_peers(
866 &mut self,
867 msg: peer_table_server_protocol::GetBestNPeers,
868 ctx: &Context<Self>,
869 ) -> Vec<(H256, PeerConnection, RequestPermit)> {
870 let picks = self.do_get_best_n_peers(&msg.capabilities, msg.n);
871 let mut out = Vec::with_capacity(picks.len());
872 for (peer_id, conn) in picks {
873 self.peers
874 .get_mut(&peer_id)
875 .expect("peer returned by do_get_best_n_peers must be present in self.peers")
876 .requests += 1;
877 out.push((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id)));
878 }
879 out
880 }
881
882 #[request_handler]
883 async fn handle_has_eligible_peer(
884 &mut self,
885 msg: peer_table_server_protocol::HasEligiblePeer,
886 _ctx: &Context<Self>,
887 ) -> bool {
888 self.peers.values().any(|peer_data| {
889 peer_data.connection.is_some()
890 && self.can_try_more_requests(&peer_data.score, &peer_data.requests)
891 && msg
892 .capabilities
893 .iter()
894 .any(|cap| peer_data.supported_capabilities.contains(cap))
895 })
896 }
897
898 #[request_handler]
899 async fn handle_get_score(
900 &mut self,
901 msg: peer_table_server_protocol::GetScore,
902 _ctx: &Context<Self>,
903 ) -> i64 {
904 self.peers
905 .get(&msg.node_id)
906 .map(|peer_data| peer_data.score)
907 .unwrap_or_default()
908 }
909
910 #[request_handler]
911 async fn handle_get_connected_nodes(
912 &mut self,
913 _msg: peer_table_server_protocol::GetConnectedNodes,
914 _ctx: &Context<Self>,
915 ) -> Vec<Node> {
916 self.peers
917 .values()
918 .map(|peer_data| peer_data.node.clone())
919 .collect()
920 }
921
922 #[request_handler]
923 async fn handle_get_peers_with_capabilities(
924 &mut self,
925 _msg: peer_table_server_protocol::GetPeersWithCapabilities,
926 _ctx: &Context<Self>,
927 ) -> Vec<(H256, PeerConnection, Vec<Capability>)> {
928 self.peers
929 .iter()
930 .filter_map(|(peer_id, peer_data)| {
931 peer_data.connection.clone().map(|connection| {
932 (
933 *peer_id,
934 connection,
935 peer_data.supported_capabilities.clone(),
936 )
937 })
938 })
939 .collect()
940 }
941
942 #[request_handler]
943 async fn handle_insert_if_new(
944 &mut self,
945 msg: peer_table_server_protocol::InsertIfNew,
946 _ctx: &Context<Self>,
947 ) -> bool {
948 let node_id = msg.node.node_id();
949 self.insert_to_connection_pool(node_id, msg.node.clone());
951 if self.contact_exists(&node_id) {
952 return false;
953 }
954 let contact = Contact::new(msg.node, msg.protocol);
955 self.insert_contact(node_id, contact);
959 METRICS.record_new_discovery().await;
960 true
961 }
962
963 #[request_handler]
964 async fn handle_validate_contact(
965 &mut self,
966 msg: peer_table_server_protocol::ValidateContact,
967 _ctx: &Context<Self>,
968 ) -> ContactValidation {
969 self.do_validate_contact(msg.node_id, msg.sender_ip)
970 }
971
972 #[request_handler]
973 async fn handle_get_closest_nodes(
974 &mut self,
975 msg: peer_table_server_protocol::GetClosestNodes,
976 _ctx: &Context<Self>,
977 ) -> Vec<Node> {
978 self.do_get_closest_nodes(msg.node_id)
979 }
980
981 #[request_handler]
982 async fn handle_get_nodes_at_distances(
983 &mut self,
984 msg: peer_table_server_protocol::GetNodesAtDistances,
985 _ctx: &Context<Self>,
986 ) -> Vec<NodeRecord> {
987 self.do_get_nodes_at_distances(&msg.distances)
988 }
989
990 #[request_handler]
991 async fn handle_get_peers_data(
992 &mut self,
993 _msg: peer_table_server_protocol::GetPeersData,
994 _ctx: &Context<Self>,
995 ) -> Vec<PeerData> {
996 self.peers.values().cloned().collect()
997 }
998
999 #[request_handler]
1000 async fn handle_get_random_peer(
1001 &mut self,
1002 msg: peer_table_server_protocol::GetRandomPeer,
1003 ctx: &Context<Self>,
1004 ) -> Option<(H256, PeerConnection, RequestPermit)> {
1005 let (peer_id, conn) = self.do_get_random_peer(msg.capabilities)?;
1006 self.peers
1007 .get_mut(&peer_id)
1008 .expect("peer returned by do_get_random_peer must be present in self.peers")
1009 .requests += 1;
1010 Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id)))
1011 }
1012
1013 #[request_handler]
1014 async fn handle_get_session_info(
1015 &mut self,
1016 msg: peer_table_server_protocol::GetSessionInfo,
1017 _ctx: &Context<Self>,
1018 ) -> Option<Session> {
1019 self.sessions
1021 .get(&msg.node_id)
1022 .cloned()
1023 .or_else(|| self.get_contact(&msg.node_id)?.session.clone())
1024 }
1025
1026 #[request_handler]
1027 async fn handle_get_peer_connection(
1028 &mut self,
1029 msg: peer_table_server_protocol::GetPeerConnection,
1030 _ctx: &Context<Self>,
1031 ) -> Option<PeerConnection> {
1032 self.peers
1033 .get(&msg.peer_id)
1034 .and_then(|peer_data| peer_data.connection.clone())
1035 }
1036
1037 #[request_handler]
1038 async fn handle_get_peer_diagnostics(
1039 &mut self,
1040 _msg: peer_table_server_protocol::GetPeerDiagnostics,
1041 _ctx: &Context<Self>,
1042 ) -> Vec<PeerDiagnostics> {
1043 self.peers
1044 .iter()
1045 .map(|(id, peer_data)| PeerDiagnostics {
1046 peer_id: *id,
1047 score: peer_data.score,
1048 inflight_requests: peer_data.requests,
1049 eligible: self.can_try_more_requests(&peer_data.score, &peer_data.requests),
1050 capabilities: peer_data
1051 .supported_capabilities
1052 .iter()
1053 .map(|c| format!("{}/{}", c.protocol(), c.version))
1054 .collect(),
1055 ip: peer_data.node.ip,
1056 client_version: peer_data.node.version.clone().unwrap_or_default(),
1057 connection_direction: if peer_data.is_connection_inbound {
1058 "inbound".to_string()
1059 } else {
1060 "outbound".to_string()
1061 },
1062 last_response_time: peer_data.last_response_time,
1063 })
1064 .collect()
1065 }
1066
1067 fn bucket_for(&self, node_id: &H256) -> Option<usize> {
1073 bucket_index(&self.local_node_id, node_id)
1074 }
1075
1076 fn get_contact(&self, node_id: &H256) -> Option<&Contact> {
1078 let idx = self.bucket_for(node_id)?;
1079 self.buckets[idx].get_any(node_id)
1080 }
1081
1082 fn get_contact_mut(&mut self, node_id: &H256) -> Option<&mut Contact> {
1084 let idx = self.bucket_for(node_id)?;
1085 self.buckets[idx].get_mut(node_id)
1086 }
1087
1088 fn contact_exists(&self, node_id: &H256) -> bool {
1090 let Some(idx) = self.bucket_for(node_id) else {
1091 return false;
1092 };
1093 self.buckets[idx].contains(node_id)
1094 }
1095
1096 fn insert_contact(&mut self, node_id: H256, contact: Contact) -> bool {
1100 #[cfg(feature = "metrics")]
1101 let start = std::time::Instant::now();
1102
1103 let Some(idx) = self.bucket_for(&node_id) else {
1104 return false;
1105 };
1106 let result = self.buckets[idx].insert(node_id, contact);
1107
1108 #[cfg(feature = "metrics")]
1109 {
1110 use ethrex_metrics::p2p::METRICS_P2P;
1111 METRICS_P2P.observe_insert_contact_duration(start.elapsed().as_secs_f64());
1112 }
1113
1114 result
1115 }
1116
1117 fn insert_to_connection_pool(&mut self, node_id: H256, node: Node) {
1120 if self.connection_pool.contains_key(&node_id) {
1121 return;
1122 }
1123 if self.connection_pool.len() >= MAX_CONNECTION_POOL_SIZE {
1124 self.connection_pool.shift_remove_index(0);
1125 }
1126 self.connection_pool.insert(node_id, node);
1127 }
1128
1129 fn get_contact_or_replacement(&self, node_id: &H256) -> Option<&Contact> {
1131 let idx = self.bucket_for(node_id)?;
1132 self.buckets[idx].get_any(node_id)
1133 }
1134
1135 fn get_contact_or_replacement_mut(&mut self, node_id: &H256) -> Option<&mut Contact> {
1137 let idx = self.bucket_for(node_id)?;
1138 let bucket = &mut self.buckets[idx];
1139 if let Some(pos) = bucket.contacts.iter().position(|(id, _)| id == node_id) {
1142 return Some(&mut bucket.contacts[pos].1);
1143 }
1144 if let Some(pos) = bucket.replacements.iter().position(|(id, _)| id == node_id) {
1145 return Some(&mut bucket.replacements[pos].1);
1146 }
1147 None
1148 }
1149
1150 fn iter_contacts(&self) -> impl Iterator<Item = (&H256, &Contact)> {
1152 self.buckets.iter().flat_map(|bucket| {
1153 bucket
1154 .contacts
1155 .iter()
1156 .chain(bucket.replacements.iter())
1157 .map(|(id, c)| (id, c))
1158 })
1159 }
1160
1161 fn weight_peer(&self, score: &i64, requests: &i64) -> i64 {
1164 score * SCORE_WEIGHT - requests * REQUESTS_WEIGHT
1165 }
1166
1167 fn can_try_more_requests(&self, score: &i64, requests: &i64) -> bool {
1168 let score_ratio = (score - MIN_SCORE) as f64 / (MAX_SCORE - MIN_SCORE) as f64;
1169 let max_requests = (MAX_CONCURRENT_REQUESTS_PER_PEER as f64 * score_ratio).max(1.0);
1170 (*requests as f64) < max_requests
1171 }
1172
1173 fn do_get_best_peer(&self, capabilities: &[Capability]) -> Option<(H256, PeerConnection)> {
1174 self.do_get_best_peer_excluding(capabilities, &[])
1175 }
1176
1177 fn do_get_best_peer_excluding(
1180 &self,
1181 capabilities: &[Capability],
1182 excluded: &[H256],
1183 ) -> Option<(H256, PeerConnection)> {
1184 self.peers
1185 .iter()
1186 .filter_map(|(id, peer_data)| {
1187 if excluded.contains(id)
1188 || !self.can_try_more_requests(&peer_data.score, &peer_data.requests)
1189 || !capabilities
1190 .iter()
1191 .any(|cap| peer_data.supported_capabilities.contains(cap))
1192 {
1193 None
1194 } else {
1195 let connection = peer_data.connection.clone()?;
1196 Some((*id, peer_data.score, peer_data.requests, connection))
1197 }
1198 })
1199 .max_by_key(|(_, score, reqs, _)| self.weight_peer(score, reqs))
1200 .map(|(k, _, _, v)| (k, v))
1201 }
1202
1203 fn do_get_best_n_peers(
1209 &self,
1210 capabilities: &[Capability],
1211 n: usize,
1212 ) -> Vec<(H256, PeerConnection)> {
1213 let mut candidates: Vec<(H256, i64, i64, PeerConnection)> = self
1214 .peers
1215 .iter()
1216 .filter_map(|(id, peer_data)| {
1217 if !self.can_try_more_requests(&peer_data.score, &peer_data.requests)
1218 || !capabilities
1219 .iter()
1220 .any(|cap| peer_data.supported_capabilities.contains(cap))
1221 {
1222 None
1223 } else {
1224 let connection = peer_data.connection.clone()?;
1225 Some((*id, peer_data.score, peer_data.requests, connection))
1226 }
1227 })
1228 .collect();
1229
1230 candidates.sort_by_key(|(_, score, reqs, _)| -self.weight_peer(score, reqs));
1231 candidates
1232 .into_iter()
1233 .take(n)
1234 .map(|(id, _, _, conn)| (id, conn))
1235 .collect()
1236 }
1237
1238 fn prune(&mut self) {
1245 for bucket in &mut self.buckets {
1246 let main_disposable: Vec<H256> = bucket
1248 .contacts
1249 .iter()
1250 .filter(|(_, c)| c.disposable)
1251 .map(|(id, _)| *id)
1252 .collect();
1253
1254 for node_id in main_disposable {
1256 bucket.remove_and_promote(&node_id);
1257 }
1258
1259 bucket.replacements.retain(|(_, c)| !c.disposable);
1262 }
1263 }
1264
1265 fn do_get_contact_to_initiate(&mut self) -> Option<Contact> {
1266 let pool_len = self.connection_pool.len();
1270 if pool_len == 0 {
1271 return None;
1272 }
1273
1274 let start = rand::random::<usize>() % pool_len;
1275 for offset in 0..pool_len {
1276 let idx = (start + offset) % pool_len;
1277 let Some((node_id, node)) = self.connection_pool.get_index(idx) else {
1278 continue;
1279 };
1280 let node_id = *node_id;
1281
1282 if self.peers.contains_key(&node_id)
1283 || self.already_tried_peers.contains(&node_id)
1284 || self
1285 .get_contact_or_replacement(&node_id)
1286 .map(|c| !c.knows_us || c.unwanted || c.is_fork_id_valid == Some(false))
1287 .unwrap_or(false)
1288 {
1289 continue;
1290 }
1291
1292 let node = node.clone();
1293 self.already_tried_peers.insert(node_id);
1294 let contact = self
1295 .get_contact_or_replacement(&node_id)
1296 .cloned()
1297 .unwrap_or_else(|| Contact::new(node, DiscoveryProtocol::Discv4));
1298 return Some(contact);
1299 }
1300
1301 tracing::trace!("Resetting list of tried peers.");
1303 self.already_tried_peers.clear();
1304 None
1305 }
1306
1307 fn do_get_closest_from_pool(&self, target: H256, count: usize) -> Vec<(H256, Node)> {
1309 let mut nodes: Vec<(H256, Node, H256)> = Vec::with_capacity(count);
1310
1311 for (node_id, node) in &self.connection_pool {
1312 let dist = xor_distance(&target, node_id);
1313 if nodes.len() < count {
1314 nodes.push((*node_id, node.clone(), dist));
1315 } else if let Some((farthest_idx, _)) =
1316 nodes.iter().enumerate().max_by_key(|(_, (_, _, d))| *d)
1317 && dist < nodes[farthest_idx].2
1318 {
1319 nodes[farthest_idx] = (*node_id, node.clone(), dist);
1320 }
1321 }
1322
1323 nodes.sort_by(|a, b| a.2.cmp(&b.2));
1324 nodes.into_iter().map(|(id, node, _)| (id, node)).collect()
1325 }
1326
1327 fn do_get_contact_for_enr_lookup(&mut self) -> Option<Contact> {
1329 self.iter_contacts()
1330 .filter(|(_, c)| {
1331 c.is_discv4
1332 && c.was_validated()
1333 && !c.has_pending_enr_request()
1334 && c.record.is_none()
1335 && !c.disposable
1336 })
1337 .map(|(_, c)| c)
1338 .collect::<Vec<_>>()
1339 .choose(&mut rand::rngs::OsRng)
1340 .cloned()
1341 .cloned()
1342 }
1343
1344 fn do_get_contact_to_revalidate(
1345 &self,
1346 revalidation_interval: Duration,
1347 protocol: DiscoveryProtocol,
1348 ) -> Option<Box<Contact>> {
1349 self.iter_contacts()
1350 .filter(|(_, c)| {
1351 c.supports_protocol(protocol)
1352 && Self::is_validation_needed(c, revalidation_interval)
1353 })
1354 .map(|(_, c)| c)
1355 .choose(&mut rand::rngs::OsRng)
1356 .cloned()
1357 .map(Box::new)
1358 }
1359
1360 fn do_validate_contact(&self, node_id: H256, sender_ip: IpAddr) -> ContactValidation {
1361 let Some(contact) = self.get_contact(&node_id) else {
1362 return ContactValidation::UnknownContact;
1363 };
1364 if !contact.was_validated() {
1365 return ContactValidation::InvalidContact;
1366 }
1367
1368 if sender_ip != contact.node.ip {
1371 return ContactValidation::IpMismatch;
1372 }
1373 ContactValidation::Valid(Box::new(contact.clone()))
1374 }
1375
1376 fn do_get_closest_nodes(&self, node_id: H256) -> Vec<Node> {
1378 #[cfg(feature = "metrics")]
1379 let scan_start = std::time::Instant::now();
1380
1381 let mut nodes: Vec<(Node, H256)> = vec![];
1382
1383 for (contact_id, contact) in self.iter_contacts() {
1384 let dist = xor_distance(&node_id, contact_id);
1385 if nodes.len() < MAX_NODES_IN_NEIGHBORS_PACKET {
1386 nodes.push((contact.node.clone(), dist));
1387 } else if let Some((farthest_idx, _)) =
1388 nodes.iter().enumerate().max_by_key(|(_, (_, d))| *d)
1389 && dist < nodes[farthest_idx].1
1390 {
1391 nodes[farthest_idx] = (contact.node.clone(), dist);
1392 }
1393 }
1394
1395 #[cfg(feature = "metrics")]
1396 {
1397 use ethrex_metrics::p2p::METRICS_P2P;
1398 METRICS_P2P.observe_iter_contacts_duration(scan_start.elapsed().as_secs_f64());
1399 }
1400
1401 nodes.into_iter().map(|(node, _)| node).collect()
1402 }
1403
1404 fn do_get_nodes_at_distances(&self, distances: &[u32]) -> Vec<NodeRecord> {
1409 self.iter_contacts()
1410 .filter_map(|(contact_id, contact)| {
1411 let dist = distance(&self.local_node_id, contact_id) as u32;
1412 if distances.contains(&dist) {
1413 contact.record.clone()
1414 } else {
1415 None
1416 }
1417 })
1418 .take(MAX_ENRS_PER_FINDNODE_RESPONSE)
1419 .collect()
1420 }
1421
1422 async fn do_new_contacts(&mut self, nodes: Vec<Node>, protocol: DiscoveryProtocol) {
1423 for node in nodes {
1424 let node_id = node.node_id();
1425 if node_id == self.local_node_id {
1426 continue;
1427 }
1428 #[cfg(feature = "metrics")]
1429 let insert_start = std::time::Instant::now();
1430
1431 self.insert_to_connection_pool(node_id, node.clone());
1433
1434 if self.contact_exists(&node_id) {
1435 if let Some(contact) = self.get_contact_or_replacement_mut(&node_id) {
1437 contact.add_protocol(protocol);
1438 }
1439 } else {
1440 let contact = Contact::new(node, protocol);
1441 self.insert_contact(node_id, contact);
1442 METRICS.record_new_discovery().await;
1443 }
1444
1445 #[cfg(feature = "metrics")]
1446 {
1447 use ethrex_metrics::p2p::METRICS_P2P;
1448 METRICS_P2P.observe_insert_contact_duration(insert_start.elapsed().as_secs_f64());
1449 }
1450 }
1451 }
1452
1453 async fn do_new_contact_records(&mut self, node_records: Vec<NodeRecord>) {
1454 for node_record in node_records {
1455 if !node_record.verify_signature() {
1456 continue;
1457 }
1458 if let Ok(node) = Node::from_enr(&node_record) {
1459 let node_id = node.node_id();
1460 if node_id == self.local_node_id {
1461 continue;
1462 }
1463
1464 self.insert_to_connection_pool(node_id, node.clone());
1466
1467 if self.contact_exists(&node_id) {
1468 let should_update = self
1471 .get_contact_or_replacement(&node_id)
1472 .map(|c| match c.record.as_ref() {
1473 None => true,
1474 Some(r) => node_record.seq > r.seq,
1475 })
1476 .unwrap_or(false);
1477 let is_fork_id_valid = if should_update {
1478 Self::evaluate_fork_id(&node_record, &self.store).await
1479 } else {
1480 None
1481 };
1482 if let Some(contact) = self.get_contact_or_replacement_mut(&node_id) {
1483 contact.add_protocol(DiscoveryProtocol::Discv5);
1484 if should_update {
1485 if contact.node.ip != node.ip || contact.node.udp_port != node.udp_port
1486 {
1487 contact.validation_timestamp = None;
1488 contact.ping_id = None;
1489 }
1490 contact.node = node;
1491 contact.record = Some(node_record);
1492 contact.is_fork_id_valid = is_fork_id_valid;
1493 }
1494 }
1495 } else {
1496 let is_fork_id_valid = Self::evaluate_fork_id(&node_record, &self.store).await;
1497 let mut contact = Contact::new(node, DiscoveryProtocol::Discv5);
1498 contact.is_fork_id_valid = is_fork_id_valid;
1499 contact.record = Some(node_record);
1500 self.insert_contact(node_id, contact);
1501 METRICS.record_new_discovery().await;
1502 }
1503 }
1504 }
1505 }
1506
1507 async fn evaluate_fork_id(record: &NodeRecord, store: &Store) -> Option<bool> {
1508 if let Some(remote_fork_id) = record.get_fork_id() {
1509 backend::is_fork_id_valid(store, remote_fork_id)
1510 .await
1511 .ok()
1512 .or(Some(false))
1513 } else {
1514 Some(false)
1515 }
1516 }
1517
1518 fn do_peer_count_by_capabilities(&self, capabilities: Vec<Capability>) -> usize {
1519 self.peers
1520 .values()
1521 .filter(|peer_data| {
1522 capabilities
1523 .iter()
1524 .any(|cap| peer_data.supported_capabilities.contains(cap))
1525 })
1526 .count()
1527 }
1528
1529 fn do_get_random_peer(&self, capabilities: Vec<Capability>) -> Option<(H256, PeerConnection)> {
1530 let peers: Vec<(H256, &PeerConnection, i64)> = self
1531 .peers
1532 .iter()
1533 .filter_map(|(node_id, peer_data)| {
1534 if !capabilities
1535 .iter()
1536 .any(|cap| peer_data.supported_capabilities.contains(cap))
1537 {
1538 return None;
1539 }
1540 peer_data
1541 .connection
1542 .as_ref()
1543 .map(|connection| (*node_id, connection, peer_data.score))
1544 })
1545 .collect();
1546 if peers.is_empty() {
1547 return None;
1548 }
1549 let weights: Vec<u64> = peers
1551 .iter()
1552 .map(|(_, _, score)| (score.max(&MIN_SCORE_CRITICAL) - MIN_SCORE_CRITICAL + 1) as u64)
1553 .collect();
1554 let dist = WeightedIndex::new(&weights).ok()?;
1555 let idx = dist.sample(&mut rand::rngs::OsRng);
1556 Some((peers[idx].0, peers[idx].1.clone()))
1557 }
1558
1559 fn is_validation_needed(contact: &Contact, revalidation_interval: Duration) -> bool {
1560 if contact.disposable {
1561 return false;
1562 }
1563
1564 let sent_ping_ttl = Duration::from_secs(30);
1565
1566 if contact.has_pending_ping() {
1567 contact
1569 .validation_timestamp
1570 .map(|ts| Instant::now().saturating_duration_since(ts) > sent_ping_ttl)
1571 .unwrap_or(false)
1572 } else {
1573 !contact.was_validated()
1575 || contact
1576 .validation_timestamp
1577 .map(|ts| Instant::now().saturating_duration_since(ts) > revalidation_interval)
1578 .unwrap_or(false)
1579 }
1580 }
1581}
1582
1583pub type PeerTable = ActorRef<PeerTableServer>;
1584
1585#[cfg(test)]
1586mod tests {
1587 use super::*;
1588 use ethrex_common::H512;
1589 use std::net::Ipv4Addr;
1590
1591 fn dummy_contact(seed: u8) -> (H256, Contact) {
1593 let pk = H512::from_low_u64_be(seed as u64 + 1);
1594 let node = Node::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, seed)), 30303, 30303, pk);
1595 let node_id = node.node_id();
1596 let contact = Contact::new(node, DiscoveryProtocol::Discv4);
1597 (node_id, contact)
1598 }
1599
1600 #[test]
1603 fn insert_into_empty_bucket() {
1604 let mut bucket = KBucket::default();
1605 let (id, contact) = dummy_contact(1);
1606 assert!(bucket.insert(id, contact));
1607 assert_eq!(bucket.contacts.len(), 1);
1608 assert!(bucket.replacements.is_empty());
1609 }
1610
1611 #[test]
1612 fn insert_fills_bucket_then_goes_to_replacements() {
1613 let mut bucket = KBucket::default();
1614
1615 for i in 0..MAX_NODES_PER_BUCKET as u8 {
1617 let (id, contact) = dummy_contact(i);
1618 assert!(bucket.insert(id, contact), "contact {i} should go to main");
1619 }
1620 assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET);
1621
1622 let (id, contact) = dummy_contact(200);
1624 assert!(!bucket.insert(id, contact));
1625 assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET);
1626 assert_eq!(bucket.replacements.len(), 1);
1627 }
1628
1629 #[test]
1632 fn contains_checks_main_and_replacement() {
1633 let mut bucket = KBucket::default();
1634
1635 let (id_main, contact_main) = dummy_contact(1);
1636 bucket.insert(id_main, contact_main);
1637 assert!(bucket.contains(&id_main));
1638
1639 for i in 2..=(MAX_NODES_PER_BUCKET as u8) {
1641 let (id, c) = dummy_contact(i);
1642 bucket.insert(id, c);
1643 }
1644 let (id_repl, contact_repl) = dummy_contact(100);
1645 bucket.insert(id_repl, contact_repl);
1646
1647 assert!(bucket.contains(&id_repl));
1648 assert!(!bucket.contains(&H256::zero()));
1649 }
1650
1651 #[test]
1654 fn get_returns_main_list_only() {
1655 let mut bucket = KBucket::default();
1656 let (id, contact) = dummy_contact(1);
1657 bucket.insert(id, contact);
1658 assert!(bucket.get(&id).is_some());
1659 assert!(bucket.get(&H256::zero()).is_none());
1660 }
1661
1662 #[test]
1663 fn get_any_returns_from_replacement() {
1664 let mut bucket = KBucket::default();
1665 for i in 0..MAX_NODES_PER_BUCKET as u8 {
1667 let (id, c) = dummy_contact(i);
1668 bucket.insert(id, c);
1669 }
1670 let (id_repl, c_repl) = dummy_contact(200);
1672 bucket.insert(id_repl, c_repl);
1673
1674 assert!(bucket.get(&id_repl).is_none()); assert!(bucket.get_any(&id_repl).is_some()); }
1677
1678 #[test]
1681 fn remove_and_promote_with_replacement() {
1682 let mut bucket = KBucket::default();
1683
1684 let mut main_ids = Vec::new();
1686 for i in 0..MAX_NODES_PER_BUCKET as u8 {
1687 let (id, c) = dummy_contact(i);
1688 main_ids.push(id);
1689 bucket.insert(id, c);
1690 }
1691
1692 let (repl_id, repl_contact) = dummy_contact(200);
1694 bucket.insert(repl_id, repl_contact);
1695
1696 let promoted = bucket.remove_and_promote(&main_ids[0]);
1698 assert_eq!(promoted, Some(repl_id));
1699 assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET);
1700 assert!(bucket.replacements.is_empty());
1701 assert!(!bucket.contains(&main_ids[0]));
1702 assert!(bucket.contains(&repl_id));
1703 }
1704
1705 #[test]
1706 fn remove_and_promote_without_replacement() {
1707 let mut bucket = KBucket::default();
1708 let (id, c) = dummy_contact(1);
1709 bucket.insert(id, c);
1710
1711 let promoted = bucket.remove_and_promote(&id);
1712 assert!(promoted.is_none());
1713 assert!(bucket.contacts.is_empty());
1714 }
1715
1716 #[test]
1717 fn remove_nonexistent_returns_none() {
1718 let mut bucket = KBucket::default();
1719 assert!(bucket.remove_and_promote(&H256::zero()).is_none());
1720 }
1721
1722 #[test]
1725 fn replacement_list_evicts_oldest_when_full() {
1726 let mut bucket = KBucket::default();
1727 for i in 0..MAX_NODES_PER_BUCKET as u8 {
1729 let (id, c) = dummy_contact(i);
1730 bucket.insert(id, c);
1731 }
1732
1733 let mut repl_ids = Vec::new();
1735 for i in 0..(MAX_REPLACEMENTS_PER_BUCKET + 2) as u8 {
1736 let seed = 100 + i;
1737 let (id, c) = dummy_contact(seed);
1738 repl_ids.push(id);
1739 bucket.insert(id, c);
1740 }
1741
1742 assert_eq!(bucket.replacements.len(), MAX_REPLACEMENTS_PER_BUCKET);
1743 assert!(!bucket.contains(&repl_ids[0]));
1745 assert!(!bucket.contains(&repl_ids[1]));
1746 assert!(bucket.contains(repl_ids.last().unwrap()));
1748 }
1749
1750 #[test]
1753 fn bucket_index_self_is_none() {
1754 let id = H256::random();
1755 assert_eq!(bucket_index(&id, &id), None);
1756 }
1757
1758 #[test]
1759 fn bucket_index_minimal_distance() {
1760 let local = H256::zero();
1761 let mut remote = H256::zero();
1763 remote.0[31] = 1;
1764 assert_eq!(bucket_index(&local, &remote), Some(0));
1765 }
1766
1767 #[test]
1768 fn bucket_index_maximal_distance() {
1769 let local = H256::zero();
1770 let mut remote = H256::zero();
1772 remote.0[0] = 0x80;
1773 assert_eq!(bucket_index(&local, &remote), Some(255));
1774 }
1775}