1use crate::{
2 discovery::lookup::{IterativeLookup, LOOKUP_ALPHA, LOOKUP_BUCKET_SIZE},
3 discv5::{
4 messages::{
5 DISTANCES_PER_FIND_NODE_MSG, FindNodeMessage, Handshake, HandshakeAuthdata, Message,
6 NodesMessage, Ordinary, Packet, PacketTrait as _, PingMessage, PongMessage,
7 TalkResMessage, WhoAreYou, decrypt_message,
8 },
9 server::{Discv5Message, Discv5State, update_local_ip},
10 session::{
11 build_challenge_data, create_id_signature, derive_session_keys, verify_id_signature,
12 },
13 },
14 metrics::METRICS,
15 peer_table::{ContactValidation, DiscoveryProtocol, PeerTableServerProtocol as _},
16 rlpx::utils::compress_pubkey,
17 types::{Node, NodeRecord},
18 utils::{distance, node_id},
19};
20use bytes::{Bytes, BytesMut};
21use ethrex_common::{H256, H512};
22use rand::{Rng, rngs::OsRng};
23use secp256k1::{PublicKey, SecretKey, ecdsa::Signature};
24use std::{
25 net::SocketAddr,
26 time::{Duration, Instant},
27};
28use tracing::{debug, trace, warn};
29
30use super::server::{DiscoveryServer, DiscoveryServerError};
31
32const MAX_ENRS_PER_MESSAGE: usize = 3;
34const REVALIDATION_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); const WHOAREYOU_RATE_LIMIT: Duration = Duration::from_secs(1);
38const GLOBAL_WHOAREYOU_RATE_LIMIT: u32 = 100;
40
41impl DiscoveryServer {
42 pub(crate) async fn discv5_handle_packet(
43 &mut self,
44 Discv5Message { packet, from }: Discv5Message,
45 ) -> Result<(), DiscoveryServerError> {
46 #[cfg(feature = "metrics")]
47 {
48 use ethrex_metrics::p2p::METRICS_P2P;
49 match packet.header.flag {
50 0x01 => METRICS_P2P.inc_discv5_incoming("WhoAreYou"),
51 0x02 => METRICS_P2P.inc_discv5_incoming("Handshake"),
52 _ => {}
53 }
54 }
55 match packet.header.flag {
56 0x00 => self.discv5_handle_ordinary(packet, from).await,
57 0x01 => self.discv5_handle_who_are_you(packet, from).await,
58 0x02 => self.discv5_handle_handshake(packet, from).await,
59 f => {
60 tracing::trace!(protocol = "discv5", "Unexpected flag {f}");
61 Err(crate::discv5::messages::PacketCodecError::MalformedData.into())
62 }
63 }
64 }
65
66 async fn discv5_handle_ordinary(
67 &mut self,
68 packet: Packet,
69 addr: SocketAddr,
70 ) -> Result<(), DiscoveryServerError> {
71 let src_id = H256::from_slice(&packet.header.authdata);
72
73 let decrypt_key = self
74 .peer_table
75 .get_session_info(src_id)
76 .await?
77 .map(|s| s.inbound_key);
78
79 let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
80
81 let ordinary = match decrypt_key {
82 Some(key) => match Ordinary::decode(&packet, &key) {
83 Ok(ordinary) => {
84 if let Some(session_ip) = discv5.session_ips.get(&src_id)
85 && addr.ip() != *session_ip
86 {
87 trace!(
88 protocol = "discv5",
89 from = %src_id,
90 %addr,
91 expected_ip = %session_ip,
92 "IP mismatch for existing session, sending WhoAreYou"
93 );
94 discv5.whoareyou_rate_limit.pop(&(addr.ip(), src_id));
95 return self
96 .discv5_send_who_are_you(packet.header.nonce, src_id, addr)
97 .await;
98 }
99 ordinary
100 }
101 Err(_) => {
102 trace!(protocol = "discv5", from = %src_id, %addr, "Decryption failed, sending WhoAreYou");
103 return self
104 .discv5_send_who_are_you(packet.header.nonce, src_id, addr)
105 .await;
106 }
107 },
108 None => {
109 trace!(protocol = "discv5", from = %src_id, %addr, "No session, sending WhoAreYou");
110 return self
111 .discv5_send_who_are_you(packet.header.nonce, src_id, addr)
112 .await;
113 }
114 };
115
116 tracing::trace!(protocol = "discv5", received = %ordinary.message, from = %src_id, %addr);
117
118 self.discv5_handle_message(ordinary, addr, None).await
119 }
120
121 async fn discv5_handle_who_are_you(
122 &mut self,
123 packet: Packet,
124 addr: SocketAddr,
125 ) -> Result<(), DiscoveryServerError> {
126 let nonce = packet.header.nonce;
127 let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
128 let Some((node, message, _)) = discv5.pending_by_nonce.remove(&nonce) else {
129 tracing::trace!(
130 protocol = "discv5",
131 "Received unexpected WhoAreYou packet. Ignoring it"
132 );
133 return Ok(());
134 };
135 tracing::trace!(protocol = "discv5", received = "WhoAreYou", from = %node.node_id(), %addr);
136
137 let challenge_data = build_challenge_data(
138 &packet.masking_iv,
139 &packet.header.static_header,
140 &packet.header.authdata,
141 );
142
143 let ephemeral_key = SecretKey::new(&mut OsRng);
144 let ephemeral_pubkey = ephemeral_key.public_key(secp256k1::SECP256K1).serialize();
145
146 let Some(dest_pubkey) = compress_pubkey(node.public_key) else {
147 return Err(DiscoveryServerError::CryptographyError(
148 "Invalid public key".to_string(),
149 ));
150 };
151
152 let session = derive_session_keys(
153 &ephemeral_key,
154 &dest_pubkey,
155 &self.local_node.node_id(),
156 &node.node_id(),
157 &challenge_data,
158 true,
159 );
160
161 let signature = create_id_signature(
162 &self.signer,
163 &challenge_data,
164 &ephemeral_pubkey,
165 &node.node_id(),
166 );
167
168 self.peer_table.set_session_info(node.node_id(), session)?;
169
170 let whoareyou = WhoAreYou::decode(&packet)?;
171 let record = (self.local_node_record.seq != whoareyou.enr_seq)
172 .then(|| self.local_node_record.clone());
173 self.discv5_send_handshake(message, signature, &ephemeral_pubkey, node, record)
174 .await
175 }
176
177 async fn discv5_handle_handshake(
178 &mut self,
179 packet: Packet,
180 addr: SocketAddr,
181 ) -> Result<(), DiscoveryServerError> {
182 let authdata = HandshakeAuthdata::decode(&packet.header.authdata)?;
183 let src_id = authdata.src_id;
184
185 let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
186 let Some((challenge_data, _, _)) = discv5.pending_challenges.remove(&src_id) else {
187 trace!(protocol = "discv5", from = %src_id, %addr, "Received unexpected Handshake packet");
188 return Ok(());
189 };
190
191 let eph_pubkey = PublicKey::from_slice(&authdata.eph_pubkey).map_err(|_| {
192 DiscoveryServerError::CryptographyError("Invalid ephemeral pubkey".into())
193 })?;
194
195 let src_pubkey = if let Some(contact) = self.peer_table.get_contact(src_id).await? {
196 compress_pubkey(contact.node.public_key)
197 } else if let Some(record) = &authdata.record {
198 if !record.verify_signature() {
199 trace!(from = %src_id, "Handshake ENR signature verification failed");
200 return Ok(());
201 }
202 let pairs = record.pairs();
203 let pubkey = pairs
204 .secp256k1
205 .and_then(|pk| PublicKey::from_slice(pk.as_bytes()).ok());
206
207 if let Some(pk) = &pubkey {
208 let uncompressed = pk.serialize_uncompressed();
209 let derived_node_id = node_id(&H512::from_slice(&uncompressed[1..]));
210 if derived_node_id != src_id {
211 trace!(from = %src_id, "Handshake ENR node_id mismatch");
212 return Ok(());
213 }
214 }
215
216 pubkey
217 } else {
218 None
219 };
220
221 let Some(src_pubkey) = src_pubkey else {
222 trace!(protocol = "discv5", from = %src_id, "Cannot verify handshake: unknown sender public key");
223 return Ok(());
224 };
225
226 let signature = Signature::from_compact(&authdata.id_signature).map_err(|_| {
227 DiscoveryServerError::CryptographyError("Invalid signature format".into())
228 })?;
229
230 if !verify_id_signature(
231 &src_pubkey,
232 &challenge_data,
233 &authdata.eph_pubkey,
234 &self.local_node.node_id(),
235 &signature,
236 ) {
237 trace!(protocol = "discv5", from = %src_id, "Handshake signature verification failed");
238 return Ok(());
239 }
240
241 if let Some(record) = &authdata.record {
242 self.peer_table.new_contact_records(vec![record.clone()])?;
243 }
244
245 let session = derive_session_keys(
246 &self.signer,
247 &eph_pubkey,
248 &src_id,
249 &self.local_node.node_id(),
250 &challenge_data,
251 false,
252 );
253
254 self.peer_table.set_session_info(src_id, session.clone())?;
255 let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
256 discv5.session_ips.insert(src_id, addr.ip());
257
258 let mut encrypted = packet.encrypted_message.clone();
259 decrypt_message(&session.inbound_key, &packet, &mut encrypted)?;
260 let message = Message::decode(&encrypted)?;
261 trace!(protocol = "discv5", received = %message, from = %src_id, %addr, "Handshake completed");
262
263 let ordinary = Ordinary { src_id, message };
264 self.discv5_handle_message(ordinary, addr, Some(session.outbound_key))
265 .await
266 }
267
268 pub(crate) async fn discv5_revalidate(&mut self) -> Result<(), DiscoveryServerError> {
269 if let Some(contact) = self
270 .peer_table
271 .get_contact_to_revalidate(REVALIDATION_INTERVAL, DiscoveryProtocol::Discv5)
272 .await?
273 && let Err(e) = self.discv5_send_ping(&contact.node).await
274 {
275 trace!(protocol = "discv5", node = %contact.node.node_id(), err = ?e, "Failed to send revalidation PING");
276 }
277 Ok(())
278 }
279
280 pub(crate) async fn discv5_lookup(&mut self) -> Result<(), DiscoveryServerError> {
281 if self.discv5.is_none() {
282 return Ok(());
283 }
284
285 self.discv5
287 .as_mut()
288 .expect("discv5 state must exist")
289 .active_lookups
290 .retain(|l| !l.is_finished());
291
292 if !self
297 .discv5
298 .as_ref()
299 .expect("discv5 state must exist")
300 .active_lookups
301 .is_empty()
302 {
303 return self.advance_v5_lookup().await;
304 }
305
306 let mut rng = OsRng;
307 let target_id: H256 = rng.r#gen();
308
309 let seed = self
311 .peer_table
312 .get_closest_from_pool(target_id, LOOKUP_BUCKET_SIZE)
313 .await?;
314 if seed.is_empty() {
315 trace!(
316 protocol = "discv5",
317 "No seeds for lookup, connection pool empty"
318 );
319 return Ok(());
320 }
321
322 trace!(
323 protocol = "discv5",
324 seeds = seed.len(),
325 "Starting new iterative lookup"
326 );
327 let lookup = IterativeLookup::new(target_id, seed);
328 let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
329 discv5.active_lookups.push(lookup);
330
331 self.advance_v5_lookup().await
333 }
334
335 async fn advance_v5_lookup(&mut self) -> Result<(), DiscoveryServerError> {
336 let discv5 = match &mut self.discv5 {
337 Some(s) => s,
338 None => return Ok(()),
339 };
340
341 if discv5.active_lookups.is_empty() {
342 return Ok(());
343 }
344
345 let mut queries: Vec<(usize, H256, H256, Node)> = Vec::new();
347 for (idx, lookup) in discv5.active_lookups.iter_mut().enumerate() {
348 let target = lookup.target;
349 for (node_id, node) in lookup.next_to_query(LOOKUP_ALPHA) {
350 queries.push((idx, target, node_id, node));
351 }
352 }
353
354 for (idx, target, node_id, node) in queries {
355 let find_node_msg = self.discv5_build_find_node_for_target(target, &node);
356 if let Err(e) = self.discv5_send_ordinary(find_node_msg, &node).await {
357 debug!(protocol = "discv5", sending = "FindNode", addr = ?node.udp_addr(), err=?e, "Error sending message");
358 self.peer_table.set_disposable(node_id)?;
359 METRICS.record_new_discarded_node();
360 if let Some(discv5) = &mut self.discv5
361 && let Some(lookup) = discv5.active_lookups.get_mut(idx)
362 {
363 lookup.record_timeout();
364 }
365 }
366 }
367 Ok(())
368 }
369
370 fn discv5_build_find_node_for_target(&self, target: H256, node: &Node) -> Message {
371 let center_distance = distance(&target, &node.node_id()) as u8;
372 let mut distances = Vec::new();
373 distances.push(center_distance as u32);
374 for i in 0..DISTANCES_PER_FIND_NODE_MSG / 2 {
375 if let Some(d) = center_distance.checked_add(i + 1) {
376 distances.push(d as u32)
377 }
378 if let Some(d) = center_distance.checked_sub(i + 1) {
379 distances.push(d as u32)
380 }
381 }
382 Message::FindNode(FindNodeMessage {
383 req_id: generate_req_id(),
384 distances,
385 })
386 }
387
388 async fn discv5_handle_ping(
389 &mut self,
390 ping_message: PingMessage,
391 sender_id: H256,
392 sender_addr: SocketAddr,
393 outbound_key: Option<[u8; 16]>,
394 ) -> Result<(), DiscoveryServerError> {
395 trace!(protocol = "discv5", from = %sender_id, enr_seq = ping_message.enr_seq, "Received PING");
396
397 let pong = Message::Pong(PongMessage {
398 req_id: ping_message.req_id,
399 enr_seq: self.local_node_record.seq,
400 recipient_addr: sender_addr,
401 });
402
403 if outbound_key.is_none()
404 && let Some(contact) = self.peer_table.get_contact(sender_id).await?
405 {
406 return self.discv5_send_ordinary(pong, &contact.node).await;
407 }
408 let key = self
409 .discv5_resolve_outbound_key(&sender_id, outbound_key)
410 .await?;
411 self.discv5_send_ordinary_to(pong, &sender_id, sender_addr, &key)
412 .await?;
413
414 Ok(())
415 }
416
417 pub async fn discv5_handle_pong(
418 &mut self,
419 pong_message: PongMessage,
420 sender_id: H256,
421 ) -> Result<(), DiscoveryServerError> {
422 self.peer_table
423 .record_pong_received(sender_id, pong_message.req_id)?;
424
425 if let Some(contact) = self.peer_table.get_contact(sender_id).await? {
426 let cached_seq = contact.record.as_ref().map_or(0, |r| r.seq);
427 if pong_message.enr_seq > cached_seq {
428 trace!(
429 protocol = "discv5",
430 from = %sender_id,
431 cached_seq,
432 pong_seq = pong_message.enr_seq,
433 "ENR seq mismatch, requesting updated ENR (FINDNODE distance 0)"
434 );
435 let find_node = Message::FindNode(FindNodeMessage {
436 req_id: generate_req_id(),
437 distances: vec![0],
438 });
439 self.discv5_send_ordinary(find_node, &contact.node).await?;
440 }
441 }
442
443 let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
444 if let Some(winning_ip) = discv5.record_ip_vote(pong_message.recipient_addr.ip(), sender_id)
445 && winning_ip != self.local_node.ip
446 {
447 tracing::info!(
448 protocol = "discv5",
449 old_ip = %self.local_node.ip,
450 new_ip = %winning_ip,
451 "External IP detected via PONG voting, updating local ENR"
452 );
453 update_local_ip(
454 &mut self.local_node,
455 &mut self.local_node_record,
456 &self.signer,
457 winning_ip,
458 );
459 }
460
461 Ok(())
462 }
463
464 async fn discv5_handle_find_node(
465 &mut self,
466 find_node_message: FindNodeMessage,
467 sender_id: H256,
468 sender_addr: SocketAddr,
469 outbound_key: Option<[u8; 16]>,
470 ) -> Result<(), DiscoveryServerError> {
471 let send_to_contact = match self
472 .peer_table
473 .validate_contact(sender_id, sender_addr.ip())
474 .await?
475 {
476 ContactValidation::Valid(contact) => Some(*contact),
477 ContactValidation::UnknownContact => None,
478 reason => {
479 trace!(from = %sender_id, ?reason, "Rejected FINDNODE");
480 return Ok(());
481 }
482 };
483
484 let mut nodes = self
485 .peer_table
486 .get_nodes_at_distances(find_node_message.distances.clone())
487 .await?;
488 if find_node_message.distances.contains(&0) {
489 nodes.push(self.local_node_record.clone());
490 }
491
492 let key = self
493 .discv5_resolve_outbound_key(&sender_id, outbound_key)
494 .await?;
495
496 let chunks: Vec<_> = nodes.chunks(MAX_ENRS_PER_MESSAGE).collect();
497 if chunks.is_empty() {
498 let nodes_message = Message::Nodes(NodesMessage {
499 req_id: find_node_message.req_id,
500 total: 1,
501 nodes: vec![],
502 });
503 if let Some(contact) = &send_to_contact {
504 self.discv5_send_ordinary(nodes_message, &contact.node)
505 .await?;
506 } else {
507 self.discv5_send_ordinary_to(nodes_message, &sender_id, sender_addr, &key)
508 .await?;
509 }
510 } else {
511 for chunk in &chunks {
512 let nodes_message = Message::Nodes(NodesMessage {
513 req_id: find_node_message.req_id.clone(),
514 total: chunks.len() as u64,
515 nodes: chunk.to_vec(),
516 });
517 if let Some(contact) = &send_to_contact {
518 self.discv5_send_ordinary(nodes_message, &contact.node)
519 .await?;
520 } else {
521 self.discv5_send_ordinary_to(nodes_message, &sender_id, sender_addr, &key)
522 .await?;
523 }
524 }
525 }
526
527 Ok(())
528 }
529
530 async fn discv5_handle_nodes_message(
531 &mut self,
532 nodes_message: NodesMessage,
533 ) -> Result<(), DiscoveryServerError> {
534 self.peer_table
535 .new_contact_records(nodes_message.nodes.clone())?;
536
537 if let Some(discv5) = &mut self.discv5 {
540 let entries: Vec<(H256, Node)> = nodes_message
541 .nodes
542 .iter()
543 .filter_map(|r| Node::from_enr(r).ok().map(|n| (n.node_id(), n)))
544 .collect();
545 for lookup in &mut discv5.active_lookups {
546 lookup.feed_results(entries.clone());
547 }
548 if let Some(lookup) = discv5.active_lookups.first_mut() {
549 lookup.record_response();
550 }
551 }
552
553 Ok(())
554 }
555
556 async fn discv5_send_ping(&mut self, node: &Node) -> Result<(), DiscoveryServerError> {
557 let req_id = generate_req_id();
558
559 let ping = Message::Ping(PingMessage {
560 req_id: req_id.clone(),
561 enr_seq: self.local_node_record.seq,
562 });
563
564 self.discv5_send_ordinary(ping, node).await?;
565 self.peer_table.record_ping_sent(node.node_id(), req_id)?;
566
567 Ok(())
568 }
569
570 async fn discv5_send_ordinary(
571 &mut self,
572 message: Message,
573 node: &Node,
574 ) -> Result<(), DiscoveryServerError> {
575 #[cfg(feature = "metrics")]
576 {
577 use ethrex_metrics::p2p::METRICS_P2P;
578 METRICS_P2P.inc_discv5_outgoing(message.metric_label());
579 }
580 let ordinary = Ordinary {
581 src_id: self.local_node.node_id(),
582 message: message.clone(),
583 };
584 let encrypt_key = match self.peer_table.get_session_info(node.node_id()).await? {
585 Some(s) => s.outbound_key,
586 None => {
587 trace!(
588 protocol = "discv5",
589 node = %node.node_id(),
590 "No session found in send_ordinary, using zeroed key to trigger handshake"
591 );
592 [0; 16]
593 }
594 };
595
596 let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
597 let mut rng = OsRng;
598 let masking_iv: u128 = rng.r#gen();
599 let nonce = discv5.next_nonce(&mut rng);
600
601 let packet = ordinary.encode(&nonce, masking_iv.to_be_bytes(), &encrypt_key)?;
602
603 self.discv5_send_packet(&packet, &node.node_id(), node.udp_addr())
604 .await?;
605 let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
606 discv5
607 .pending_by_nonce
608 .insert(nonce, (node.clone(), message, Instant::now()));
609 Ok(())
610 }
611
612 async fn discv5_resolve_outbound_key(
613 &self,
614 node_id: &H256,
615 key: Option<[u8; 16]>,
616 ) -> Result<[u8; 16], DiscoveryServerError> {
617 if let Some(key) = key {
618 return Ok(key);
619 }
620 match self.peer_table.get_session_info(*node_id).await? {
621 Some(s) => Ok(s.outbound_key),
622 None => {
623 trace!(
624 protocol = "discv5",
625 node = %node_id,
626 "No session found in resolve_outbound_key, using zeroed key"
627 );
628 Ok([0; 16])
629 }
630 }
631 }
632
633 async fn discv5_send_ordinary_to(
634 &mut self,
635 message: Message,
636 dest_id: &H256,
637 addr: SocketAddr,
638 encrypt_key: &[u8; 16],
639 ) -> Result<(), DiscoveryServerError> {
640 #[cfg(feature = "metrics")]
641 {
642 use ethrex_metrics::p2p::METRICS_P2P;
643 METRICS_P2P.inc_discv5_outgoing(message.metric_label());
644 }
645 let ordinary = Ordinary {
646 src_id: self.local_node.node_id(),
647 message,
648 };
649
650 let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
651 let mut rng = OsRng;
652 let masking_iv: u128 = rng.r#gen();
653 let nonce = discv5.next_nonce(&mut rng);
654
655 let packet = ordinary.encode(&nonce, masking_iv.to_be_bytes(), encrypt_key)?;
656
657 self.discv5_send_packet(&packet, dest_id, addr).await?;
658 Ok(())
659 }
660
661 async fn discv5_send_handshake(
662 &mut self,
663 message: Message,
664 signature: Signature,
665 eph_pubkey: &[u8],
666 node: Node,
667 record: Option<NodeRecord>,
668 ) -> Result<(), DiscoveryServerError> {
669 #[cfg(feature = "metrics")]
670 {
671 use ethrex_metrics::p2p::METRICS_P2P;
672 METRICS_P2P.inc_discv5_outgoing("Handshake");
673 }
674 let handshake = Handshake {
675 src_id: self.local_node.node_id(),
676 id_signature: signature.serialize_compact().to_vec(),
677 eph_pubkey: eph_pubkey.to_vec(),
678 record,
679 message: message.clone(),
680 };
681 let encrypt_key = match self.peer_table.get_session_info(node.node_id()).await? {
682 Some(s) => s.outbound_key,
683 None => {
684 trace!(
685 protocol = "discv5",
686 node = %node.node_id(),
687 "No session found in send_handshake, using zeroed key"
688 );
689 [0; 16]
690 }
691 };
692
693 let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
694 let mut rng = OsRng;
695 let masking_iv: u128 = rng.r#gen();
696 let nonce = discv5.next_nonce(&mut rng);
697
698 let packet = handshake.encode(&nonce, masking_iv.to_be_bytes(), &encrypt_key)?;
699
700 self.discv5_send_packet(&packet, &node.node_id(), node.udp_addr())
701 .await?;
702 let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
703 discv5
704 .pending_by_nonce
705 .insert(nonce, (node, message, Instant::now()));
706 Ok(())
707 }
708
709 pub async fn discv5_send_who_are_you(
710 &mut self,
711 nonce: [u8; 12],
712 src_id: H256,
713 addr: SocketAddr,
714 ) -> Result<(), DiscoveryServerError> {
715 #[cfg(feature = "metrics")]
716 {
717 use ethrex_metrics::p2p::METRICS_P2P;
718 METRICS_P2P.inc_discv5_outgoing("WhoAreYou");
719 }
720 let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
721
722 let rate_key = (addr.ip(), src_id);
723 let now = Instant::now();
724
725 if now.duration_since(discv5.whoareyou_global_window_start) >= Duration::from_secs(1) {
727 discv5.whoareyou_global_count = 0;
728 discv5.whoareyou_global_window_start = now;
729 }
730 if discv5.whoareyou_global_count >= GLOBAL_WHOAREYOU_RATE_LIMIT {
731 if discv5.whoareyou_global_count == GLOBAL_WHOAREYOU_RATE_LIMIT {
732 discv5.whoareyou_global_count = GLOBAL_WHOAREYOU_RATE_LIMIT + 1;
733 warn!(
734 protocol = "discv5",
735 "Global WHOAREYOU rate limit reached ({GLOBAL_WHOAREYOU_RATE_LIMIT}/s), \
736 dropping excess packets. This is normal during initial discovery or \
737 network churn; persistent occurrences may indicate a DoS attempt"
738 );
739 }
740 return Ok(());
741 }
742
743 if let Some((_, _, raw_bytes)) = discv5.pending_challenges.get(&src_id) {
745 trace!(
746 protocol = "discv5",
747 to = %src_id,
748 %addr,
749 "Resending existing WhoAreYou challenge"
750 );
751 self.udp_socket.send_to(raw_bytes, addr).await?;
752 return Ok(());
753 }
754
755 if !Discv5State::is_private_ip(addr.ip())
757 && let Some(last_sent) = discv5.whoareyou_rate_limit.get(&rate_key)
758 && now.duration_since(*last_sent) < WHOAREYOU_RATE_LIMIT
759 {
760 trace!(
761 protocol = "discv5",
762 to_ip = %addr.ip(),
763 "Rate limiting WHOAREYOU packet (amplification attack prevention)"
764 );
765 return Ok(());
766 }
767
768 discv5.whoareyou_rate_limit.push(rate_key, now);
769 discv5.whoareyou_global_count += 1;
770
771 let mut rng = OsRng;
772
773 let enr_seq = self
774 .peer_table
775 .get_contact(src_id)
776 .await?
777 .map_or(0, |c| c.record.as_ref().map_or(0, |r| r.seq));
778
779 let who_are_you = WhoAreYou {
780 id_nonce: rng.r#gen(),
781 enr_seq,
782 };
783
784 let masking_iv: u128 = rng.r#gen();
785 let packet = who_are_you.encode(&nonce, masking_iv.to_be_bytes(), &[0; 16])?;
786
787 let mut raw_buf = BytesMut::new();
788 packet.encode(&mut raw_buf, &src_id)?;
789 let raw_bytes = raw_buf.to_vec();
790
791 let challenge_data = build_challenge_data(
792 &masking_iv.to_be_bytes(),
793 &packet.header.static_header,
794 &packet.header.authdata,
795 );
796 let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
797 discv5
798 .pending_challenges
799 .insert(src_id, (challenge_data, Instant::now(), raw_bytes.clone()));
800
801 self.udp_socket.send_to(&raw_bytes, addr).await?;
802 trace!(protocol = "discv5", to = %src_id, %addr, flag = packet.header.flag, "Sent packet");
803
804 Ok(())
805 }
806
807 async fn discv5_send_packet(
808 &self,
809 packet: &Packet,
810 dest_id: &H256,
811 addr: SocketAddr,
812 ) -> Result<(), DiscoveryServerError> {
813 let mut buf = BytesMut::new();
814 packet.encode(&mut buf, dest_id)?;
815 self.udp_socket.send_to(&buf, addr).await?;
816 trace!(protocol = "discv5", to = %dest_id, %addr, flag = packet.header.flag, "Sent packet");
817 Ok(())
818 }
819
820 async fn discv5_handle_message(
821 &mut self,
822 ordinary: Ordinary,
823 sender_addr: SocketAddr,
824 outbound_key: Option<[u8; 16]>,
825 ) -> Result<(), DiscoveryServerError> {
826 let sender_id = ordinary.src_id;
827 if sender_id == self.local_node.node_id() {
828 return Ok(());
829 }
830 #[cfg(feature = "metrics")]
831 {
832 use ethrex_metrics::p2p::METRICS_P2P;
833 METRICS_P2P.inc_discv5_incoming(ordinary.message.metric_label());
834 }
835 match ordinary.message {
836 Message::Ping(ping_message) => {
837 if ping_message.req_id.len() > 8 {
838 trace!(protocol = "discv5", from = %sender_id, "Dropping PING with oversized req_id");
839 return Ok(());
840 }
841 self.discv5_handle_ping(ping_message, sender_id, sender_addr, outbound_key)
842 .await?
843 }
844 Message::Pong(pong_message) => {
845 self.discv5_handle_pong(pong_message, sender_id).await?;
846 }
847 Message::FindNode(find_node_message) => {
848 if find_node_message.req_id.len() > 8 {
849 trace!(protocol = "discv5", from = %sender_id, "Dropping FINDNODE with oversized req_id");
850 return Ok(());
851 }
852 self.discv5_handle_find_node(
853 find_node_message,
854 sender_id,
855 sender_addr,
856 outbound_key,
857 )
858 .await?;
859 }
860 Message::Nodes(nodes_message) => {
861 self.discv5_handle_nodes_message(nodes_message).await?;
862 }
863 Message::TalkReq(talk_req_message) => {
864 if talk_req_message.req_id.len() > 8 {
865 trace!(protocol = "discv5", from = %sender_id, "Dropping TALKREQ with oversized req_id");
866 return Ok(());
867 }
868 let talk_res = Message::TalkRes(TalkResMessage {
869 req_id: talk_req_message.req_id,
870 response: vec![],
871 });
872 let key = self
873 .discv5_resolve_outbound_key(&sender_id, outbound_key)
874 .await?;
875 self.discv5_send_ordinary_to(talk_res, &sender_id, sender_addr, &key)
876 .await?;
877 }
878 Message::TalkRes(_talk_res_message) => (),
879 Message::Ticket(_ticket_message) => (),
880 }
881 Ok(())
882 }
883}
884
885fn generate_req_id() -> Bytes {
886 let mut rng = OsRng;
887 Bytes::from(rng.r#gen::<u64>().to_be_bytes().to_vec())
888}