Skip to main content

ethrex_p2p/discovery/
discv5_handlers.rs

1use crate::{
2    discovery::lookup::{IterativeLookup, LOOKUP_ALPHA, LOOKUP_BUCKET_SIZE},
3    discv5::{
4        messages::{
5            DISTANCES_PER_FIND_NODE_MSG, FindNodeMessage, Handshake, HandshakeAuthdata, Message,
6            NodesMessage, Ordinary, Packet, PacketTrait as _, PingMessage, PongMessage,
7            TalkResMessage, WhoAreYou, decrypt_message,
8        },
9        server::{Discv5Message, Discv5State, update_local_ip},
10        session::{
11            build_challenge_data, create_id_signature, derive_session_keys, verify_id_signature,
12        },
13    },
14    metrics::METRICS,
15    peer_table::{ContactValidation, DiscoveryProtocol, PeerTableServerProtocol as _},
16    rlpx::utils::compress_pubkey,
17    types::{Node, NodeRecord},
18    utils::{distance, node_id},
19};
20use bytes::{Bytes, BytesMut};
21use ethrex_common::{H256, H512};
22use rand::{Rng, rngs::OsRng};
23use secp256k1::{PublicKey, SecretKey, ecdsa::Signature};
24use std::{
25    net::SocketAddr,
26    time::{Duration, Instant},
27};
28use tracing::{debug, trace, warn};
29
30use super::server::{DiscoveryServer, DiscoveryServerError};
31
32/// Maximum number of ENRs per NODES message (limited by UDP packet size).
33const MAX_ENRS_PER_MESSAGE: usize = 3;
34/// Nodes not validated within this interval are candidates for revalidation.
35const REVALIDATION_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); // 12 hours
36/// Minimum interval between WHOAREYOU packets to the same IP address.
37const WHOAREYOU_RATE_LIMIT: Duration = Duration::from_secs(1);
38/// Maximum number of WHOAREYOU packets sent globally per second.
39const GLOBAL_WHOAREYOU_RATE_LIMIT: u32 = 100;
40
41impl DiscoveryServer {
42    pub(crate) async fn discv5_handle_packet(
43        &mut self,
44        Discv5Message { packet, from }: Discv5Message,
45    ) -> Result<(), DiscoveryServerError> {
46        #[cfg(feature = "metrics")]
47        {
48            use ethrex_metrics::p2p::METRICS_P2P;
49            match packet.header.flag {
50                0x01 => METRICS_P2P.inc_discv5_incoming("WhoAreYou"),
51                0x02 => METRICS_P2P.inc_discv5_incoming("Handshake"),
52                _ => {}
53            }
54        }
55        match packet.header.flag {
56            0x00 => self.discv5_handle_ordinary(packet, from).await,
57            0x01 => self.discv5_handle_who_are_you(packet, from).await,
58            0x02 => self.discv5_handle_handshake(packet, from).await,
59            f => {
60                tracing::trace!(protocol = "discv5", "Unexpected flag {f}");
61                Err(crate::discv5::messages::PacketCodecError::MalformedData.into())
62            }
63        }
64    }
65
66    async fn discv5_handle_ordinary(
67        &mut self,
68        packet: Packet,
69        addr: SocketAddr,
70    ) -> Result<(), DiscoveryServerError> {
71        let src_id = H256::from_slice(&packet.header.authdata);
72
73        let decrypt_key = self
74            .peer_table
75            .get_session_info(src_id)
76            .await?
77            .map(|s| s.inbound_key);
78
79        let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
80
81        let ordinary = match decrypt_key {
82            Some(key) => match Ordinary::decode(&packet, &key) {
83                Ok(ordinary) => {
84                    if let Some(session_ip) = discv5.session_ips.get(&src_id)
85                        && addr.ip() != *session_ip
86                    {
87                        trace!(
88                            protocol = "discv5",
89                            from = %src_id,
90                            %addr,
91                            expected_ip = %session_ip,
92                            "IP mismatch for existing session, sending WhoAreYou"
93                        );
94                        discv5.whoareyou_rate_limit.pop(&(addr.ip(), src_id));
95                        return self
96                            .discv5_send_who_are_you(packet.header.nonce, src_id, addr)
97                            .await;
98                    }
99                    ordinary
100                }
101                Err(_) => {
102                    trace!(protocol = "discv5", from = %src_id, %addr, "Decryption failed, sending WhoAreYou");
103                    return self
104                        .discv5_send_who_are_you(packet.header.nonce, src_id, addr)
105                        .await;
106                }
107            },
108            None => {
109                trace!(protocol = "discv5", from = %src_id, %addr, "No session, sending WhoAreYou");
110                return self
111                    .discv5_send_who_are_you(packet.header.nonce, src_id, addr)
112                    .await;
113            }
114        };
115
116        tracing::trace!(protocol = "discv5", received = %ordinary.message, from = %src_id, %addr);
117
118        self.discv5_handle_message(ordinary, addr, None).await
119    }
120
121    async fn discv5_handle_who_are_you(
122        &mut self,
123        packet: Packet,
124        addr: SocketAddr,
125    ) -> Result<(), DiscoveryServerError> {
126        let nonce = packet.header.nonce;
127        let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
128        let Some((node, message, _)) = discv5.pending_by_nonce.remove(&nonce) else {
129            tracing::trace!(
130                protocol = "discv5",
131                "Received unexpected WhoAreYou packet. Ignoring it"
132            );
133            return Ok(());
134        };
135        tracing::trace!(protocol = "discv5", received = "WhoAreYou", from = %node.node_id(), %addr);
136
137        let challenge_data = build_challenge_data(
138            &packet.masking_iv,
139            &packet.header.static_header,
140            &packet.header.authdata,
141        );
142
143        let ephemeral_key = SecretKey::new(&mut OsRng);
144        let ephemeral_pubkey = ephemeral_key.public_key(secp256k1::SECP256K1).serialize();
145
146        let Some(dest_pubkey) = compress_pubkey(node.public_key) else {
147            return Err(DiscoveryServerError::CryptographyError(
148                "Invalid public key".to_string(),
149            ));
150        };
151
152        let session = derive_session_keys(
153            &ephemeral_key,
154            &dest_pubkey,
155            &self.local_node.node_id(),
156            &node.node_id(),
157            &challenge_data,
158            true,
159        );
160
161        let signature = create_id_signature(
162            &self.signer,
163            &challenge_data,
164            &ephemeral_pubkey,
165            &node.node_id(),
166        );
167
168        self.peer_table.set_session_info(node.node_id(), session)?;
169
170        let whoareyou = WhoAreYou::decode(&packet)?;
171        let record = (self.local_node_record.seq != whoareyou.enr_seq)
172            .then(|| self.local_node_record.clone());
173        self.discv5_send_handshake(message, signature, &ephemeral_pubkey, node, record)
174            .await
175    }
176
177    async fn discv5_handle_handshake(
178        &mut self,
179        packet: Packet,
180        addr: SocketAddr,
181    ) -> Result<(), DiscoveryServerError> {
182        let authdata = HandshakeAuthdata::decode(&packet.header.authdata)?;
183        let src_id = authdata.src_id;
184
185        let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
186        let Some((challenge_data, _, _)) = discv5.pending_challenges.remove(&src_id) else {
187            trace!(protocol = "discv5", from = %src_id, %addr, "Received unexpected Handshake packet");
188            return Ok(());
189        };
190
191        let eph_pubkey = PublicKey::from_slice(&authdata.eph_pubkey).map_err(|_| {
192            DiscoveryServerError::CryptographyError("Invalid ephemeral pubkey".into())
193        })?;
194
195        let src_pubkey = if let Some(contact) = self.peer_table.get_contact(src_id).await? {
196            compress_pubkey(contact.node.public_key)
197        } else if let Some(record) = &authdata.record {
198            if !record.verify_signature() {
199                trace!(from = %src_id, "Handshake ENR signature verification failed");
200                return Ok(());
201            }
202            let pairs = record.pairs();
203            let pubkey = pairs
204                .secp256k1
205                .and_then(|pk| PublicKey::from_slice(pk.as_bytes()).ok());
206
207            if let Some(pk) = &pubkey {
208                let uncompressed = pk.serialize_uncompressed();
209                let derived_node_id = node_id(&H512::from_slice(&uncompressed[1..]));
210                if derived_node_id != src_id {
211                    trace!(from = %src_id, "Handshake ENR node_id mismatch");
212                    return Ok(());
213                }
214            }
215
216            pubkey
217        } else {
218            None
219        };
220
221        let Some(src_pubkey) = src_pubkey else {
222            trace!(protocol = "discv5", from = %src_id, "Cannot verify handshake: unknown sender public key");
223            return Ok(());
224        };
225
226        let signature = Signature::from_compact(&authdata.id_signature).map_err(|_| {
227            DiscoveryServerError::CryptographyError("Invalid signature format".into())
228        })?;
229
230        if !verify_id_signature(
231            &src_pubkey,
232            &challenge_data,
233            &authdata.eph_pubkey,
234            &self.local_node.node_id(),
235            &signature,
236        ) {
237            trace!(protocol = "discv5", from = %src_id, "Handshake signature verification failed");
238            return Ok(());
239        }
240
241        if let Some(record) = &authdata.record {
242            self.peer_table.new_contact_records(vec![record.clone()])?;
243        }
244
245        let session = derive_session_keys(
246            &self.signer,
247            &eph_pubkey,
248            &src_id,
249            &self.local_node.node_id(),
250            &challenge_data,
251            false,
252        );
253
254        self.peer_table.set_session_info(src_id, session.clone())?;
255        let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
256        discv5.session_ips.insert(src_id, addr.ip());
257
258        let mut encrypted = packet.encrypted_message.clone();
259        decrypt_message(&session.inbound_key, &packet, &mut encrypted)?;
260        let message = Message::decode(&encrypted)?;
261        trace!(protocol = "discv5", received = %message, from = %src_id, %addr, "Handshake completed");
262
263        let ordinary = Ordinary { src_id, message };
264        self.discv5_handle_message(ordinary, addr, Some(session.outbound_key))
265            .await
266    }
267
268    pub(crate) async fn discv5_revalidate(&mut self) -> Result<(), DiscoveryServerError> {
269        if let Some(contact) = self
270            .peer_table
271            .get_contact_to_revalidate(REVALIDATION_INTERVAL, DiscoveryProtocol::Discv5)
272            .await?
273            && let Err(e) = self.discv5_send_ping(&contact.node).await
274        {
275            trace!(protocol = "discv5", node = %contact.node.node_id(), err = ?e, "Failed to send revalidation PING");
276        }
277        Ok(())
278    }
279
280    pub(crate) async fn discv5_lookup(&mut self) -> Result<(), DiscoveryServerError> {
281        if self.discv5.is_none() {
282            return Ok(());
283        }
284
285        // Remove finished lookups
286        self.discv5
287            .as_mut()
288            .expect("discv5 state must exist")
289            .active_lookups
290            .retain(|l| !l.is_finished());
291
292        // If a lookup is already active, advance it instead of starting a new
293        // one. Lookups are timer-driven: each tick sends the next alpha queries.
294        // Responses feed results into the lookup but don't trigger new queries,
295        // which naturally throttles traffic.
296        if !self
297            .discv5
298            .as_ref()
299            .expect("discv5 state must exist")
300            .active_lookups
301            .is_empty()
302        {
303            return self.advance_v5_lookup().await;
304        }
305
306        let mut rng = OsRng;
307        let target_id: H256 = rng.r#gen();
308
309        // Seed with closest known nodes from the connection pool
310        let seed = self
311            .peer_table
312            .get_closest_from_pool(target_id, LOOKUP_BUCKET_SIZE)
313            .await?;
314        if seed.is_empty() {
315            trace!(
316                protocol = "discv5",
317                "No seeds for lookup, connection pool empty"
318            );
319            return Ok(());
320        }
321
322        trace!(
323            protocol = "discv5",
324            seeds = seed.len(),
325            "Starting new iterative lookup"
326        );
327        let lookup = IterativeLookup::new(target_id, seed);
328        let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
329        discv5.active_lookups.push(lookup);
330
331        // Fire the initial queries for the new lookup
332        self.advance_v5_lookup().await
333    }
334
335    async fn advance_v5_lookup(&mut self) -> Result<(), DiscoveryServerError> {
336        let discv5 = match &mut self.discv5 {
337            Some(s) => s,
338            None => return Ok(()),
339        };
340
341        if discv5.active_lookups.is_empty() {
342            return Ok(());
343        }
344
345        // Collect queries from all active lookups
346        let mut queries: Vec<(usize, H256, H256, Node)> = Vec::new();
347        for (idx, lookup) in discv5.active_lookups.iter_mut().enumerate() {
348            let target = lookup.target;
349            for (node_id, node) in lookup.next_to_query(LOOKUP_ALPHA) {
350                queries.push((idx, target, node_id, node));
351            }
352        }
353
354        for (idx, target, node_id, node) in queries {
355            let find_node_msg = self.discv5_build_find_node_for_target(target, &node);
356            if let Err(e) = self.discv5_send_ordinary(find_node_msg, &node).await {
357                debug!(protocol = "discv5", sending = "FindNode", addr = ?node.udp_addr(), err=?e, "Error sending message");
358                self.peer_table.set_disposable(node_id)?;
359                METRICS.record_new_discarded_node();
360                if let Some(discv5) = &mut self.discv5
361                    && let Some(lookup) = discv5.active_lookups.get_mut(idx)
362                {
363                    lookup.record_timeout();
364                }
365            }
366        }
367        Ok(())
368    }
369
370    fn discv5_build_find_node_for_target(&self, target: H256, node: &Node) -> Message {
371        let center_distance = distance(&target, &node.node_id()) as u8;
372        let mut distances = Vec::new();
373        distances.push(center_distance as u32);
374        for i in 0..DISTANCES_PER_FIND_NODE_MSG / 2 {
375            if let Some(d) = center_distance.checked_add(i + 1) {
376                distances.push(d as u32)
377            }
378            if let Some(d) = center_distance.checked_sub(i + 1) {
379                distances.push(d as u32)
380            }
381        }
382        Message::FindNode(FindNodeMessage {
383            req_id: generate_req_id(),
384            distances,
385        })
386    }
387
388    async fn discv5_handle_ping(
389        &mut self,
390        ping_message: PingMessage,
391        sender_id: H256,
392        sender_addr: SocketAddr,
393        outbound_key: Option<[u8; 16]>,
394    ) -> Result<(), DiscoveryServerError> {
395        trace!(protocol = "discv5", from = %sender_id, enr_seq = ping_message.enr_seq, "Received PING");
396
397        let pong = Message::Pong(PongMessage {
398            req_id: ping_message.req_id,
399            enr_seq: self.local_node_record.seq,
400            recipient_addr: sender_addr,
401        });
402
403        if outbound_key.is_none()
404            && let Some(contact) = self.peer_table.get_contact(sender_id).await?
405        {
406            return self.discv5_send_ordinary(pong, &contact.node).await;
407        }
408        let key = self
409            .discv5_resolve_outbound_key(&sender_id, outbound_key)
410            .await?;
411        self.discv5_send_ordinary_to(pong, &sender_id, sender_addr, &key)
412            .await?;
413
414        Ok(())
415    }
416
417    pub async fn discv5_handle_pong(
418        &mut self,
419        pong_message: PongMessage,
420        sender_id: H256,
421    ) -> Result<(), DiscoveryServerError> {
422        self.peer_table
423            .record_pong_received(sender_id, pong_message.req_id)?;
424
425        if let Some(contact) = self.peer_table.get_contact(sender_id).await? {
426            let cached_seq = contact.record.as_ref().map_or(0, |r| r.seq);
427            if pong_message.enr_seq > cached_seq {
428                trace!(
429                    protocol = "discv5",
430                    from = %sender_id,
431                    cached_seq,
432                    pong_seq = pong_message.enr_seq,
433                    "ENR seq mismatch, requesting updated ENR (FINDNODE distance 0)"
434                );
435                let find_node = Message::FindNode(FindNodeMessage {
436                    req_id: generate_req_id(),
437                    distances: vec![0],
438                });
439                self.discv5_send_ordinary(find_node, &contact.node).await?;
440            }
441        }
442
443        let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
444        if let Some(winning_ip) = discv5.record_ip_vote(pong_message.recipient_addr.ip(), sender_id)
445            && winning_ip != self.local_node.ip
446        {
447            tracing::info!(
448                protocol = "discv5",
449                old_ip = %self.local_node.ip,
450                new_ip = %winning_ip,
451                "External IP detected via PONG voting, updating local ENR"
452            );
453            update_local_ip(
454                &mut self.local_node,
455                &mut self.local_node_record,
456                &self.signer,
457                winning_ip,
458            );
459        }
460
461        Ok(())
462    }
463
464    async fn discv5_handle_find_node(
465        &mut self,
466        find_node_message: FindNodeMessage,
467        sender_id: H256,
468        sender_addr: SocketAddr,
469        outbound_key: Option<[u8; 16]>,
470    ) -> Result<(), DiscoveryServerError> {
471        let send_to_contact = match self
472            .peer_table
473            .validate_contact(sender_id, sender_addr.ip())
474            .await?
475        {
476            ContactValidation::Valid(contact) => Some(*contact),
477            ContactValidation::UnknownContact => None,
478            reason => {
479                trace!(from = %sender_id, ?reason, "Rejected FINDNODE");
480                return Ok(());
481            }
482        };
483
484        let mut nodes = self
485            .peer_table
486            .get_nodes_at_distances(find_node_message.distances.clone())
487            .await?;
488        if find_node_message.distances.contains(&0) {
489            nodes.push(self.local_node_record.clone());
490        }
491
492        let key = self
493            .discv5_resolve_outbound_key(&sender_id, outbound_key)
494            .await?;
495
496        let chunks: Vec<_> = nodes.chunks(MAX_ENRS_PER_MESSAGE).collect();
497        if chunks.is_empty() {
498            let nodes_message = Message::Nodes(NodesMessage {
499                req_id: find_node_message.req_id,
500                total: 1,
501                nodes: vec![],
502            });
503            if let Some(contact) = &send_to_contact {
504                self.discv5_send_ordinary(nodes_message, &contact.node)
505                    .await?;
506            } else {
507                self.discv5_send_ordinary_to(nodes_message, &sender_id, sender_addr, &key)
508                    .await?;
509            }
510        } else {
511            for chunk in &chunks {
512                let nodes_message = Message::Nodes(NodesMessage {
513                    req_id: find_node_message.req_id.clone(),
514                    total: chunks.len() as u64,
515                    nodes: chunk.to_vec(),
516                });
517                if let Some(contact) = &send_to_contact {
518                    self.discv5_send_ordinary(nodes_message, &contact.node)
519                        .await?;
520                } else {
521                    self.discv5_send_ordinary_to(nodes_message, &sender_id, sender_addr, &key)
522                        .await?;
523                }
524            }
525        }
526
527        Ok(())
528    }
529
530    async fn discv5_handle_nodes_message(
531        &mut self,
532        nodes_message: NodesMessage,
533    ) -> Result<(), DiscoveryServerError> {
534        self.peer_table
535            .new_contact_records(nodes_message.nodes.clone())?;
536
537        // Feed results into ALL active lookups (but don't advance — the timer
538        // drives lookup progress so that traffic stays controlled).
539        if let Some(discv5) = &mut self.discv5 {
540            let entries: Vec<(H256, Node)> = nodes_message
541                .nodes
542                .iter()
543                .filter_map(|r| Node::from_enr(r).ok().map(|n| (n.node_id(), n)))
544                .collect();
545            for lookup in &mut discv5.active_lookups {
546                lookup.feed_results(entries.clone());
547            }
548            if let Some(lookup) = discv5.active_lookups.first_mut() {
549                lookup.record_response();
550            }
551        }
552
553        Ok(())
554    }
555
556    async fn discv5_send_ping(&mut self, node: &Node) -> Result<(), DiscoveryServerError> {
557        let req_id = generate_req_id();
558
559        let ping = Message::Ping(PingMessage {
560            req_id: req_id.clone(),
561            enr_seq: self.local_node_record.seq,
562        });
563
564        self.discv5_send_ordinary(ping, node).await?;
565        self.peer_table.record_ping_sent(node.node_id(), req_id)?;
566
567        Ok(())
568    }
569
570    async fn discv5_send_ordinary(
571        &mut self,
572        message: Message,
573        node: &Node,
574    ) -> Result<(), DiscoveryServerError> {
575        #[cfg(feature = "metrics")]
576        {
577            use ethrex_metrics::p2p::METRICS_P2P;
578            METRICS_P2P.inc_discv5_outgoing(message.metric_label());
579        }
580        let ordinary = Ordinary {
581            src_id: self.local_node.node_id(),
582            message: message.clone(),
583        };
584        let encrypt_key = match self.peer_table.get_session_info(node.node_id()).await? {
585            Some(s) => s.outbound_key,
586            None => {
587                trace!(
588                    protocol = "discv5",
589                    node = %node.node_id(),
590                    "No session found in send_ordinary, using zeroed key to trigger handshake"
591                );
592                [0; 16]
593            }
594        };
595
596        let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
597        let mut rng = OsRng;
598        let masking_iv: u128 = rng.r#gen();
599        let nonce = discv5.next_nonce(&mut rng);
600
601        let packet = ordinary.encode(&nonce, masking_iv.to_be_bytes(), &encrypt_key)?;
602
603        self.discv5_send_packet(&packet, &node.node_id(), node.udp_addr())
604            .await?;
605        let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
606        discv5
607            .pending_by_nonce
608            .insert(nonce, (node.clone(), message, Instant::now()));
609        Ok(())
610    }
611
612    async fn discv5_resolve_outbound_key(
613        &self,
614        node_id: &H256,
615        key: Option<[u8; 16]>,
616    ) -> Result<[u8; 16], DiscoveryServerError> {
617        if let Some(key) = key {
618            return Ok(key);
619        }
620        match self.peer_table.get_session_info(*node_id).await? {
621            Some(s) => Ok(s.outbound_key),
622            None => {
623                trace!(
624                    protocol = "discv5",
625                    node = %node_id,
626                    "No session found in resolve_outbound_key, using zeroed key"
627                );
628                Ok([0; 16])
629            }
630        }
631    }
632
633    async fn discv5_send_ordinary_to(
634        &mut self,
635        message: Message,
636        dest_id: &H256,
637        addr: SocketAddr,
638        encrypt_key: &[u8; 16],
639    ) -> Result<(), DiscoveryServerError> {
640        #[cfg(feature = "metrics")]
641        {
642            use ethrex_metrics::p2p::METRICS_P2P;
643            METRICS_P2P.inc_discv5_outgoing(message.metric_label());
644        }
645        let ordinary = Ordinary {
646            src_id: self.local_node.node_id(),
647            message,
648        };
649
650        let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
651        let mut rng = OsRng;
652        let masking_iv: u128 = rng.r#gen();
653        let nonce = discv5.next_nonce(&mut rng);
654
655        let packet = ordinary.encode(&nonce, masking_iv.to_be_bytes(), encrypt_key)?;
656
657        self.discv5_send_packet(&packet, dest_id, addr).await?;
658        Ok(())
659    }
660
661    async fn discv5_send_handshake(
662        &mut self,
663        message: Message,
664        signature: Signature,
665        eph_pubkey: &[u8],
666        node: Node,
667        record: Option<NodeRecord>,
668    ) -> Result<(), DiscoveryServerError> {
669        #[cfg(feature = "metrics")]
670        {
671            use ethrex_metrics::p2p::METRICS_P2P;
672            METRICS_P2P.inc_discv5_outgoing("Handshake");
673        }
674        let handshake = Handshake {
675            src_id: self.local_node.node_id(),
676            id_signature: signature.serialize_compact().to_vec(),
677            eph_pubkey: eph_pubkey.to_vec(),
678            record,
679            message: message.clone(),
680        };
681        let encrypt_key = match self.peer_table.get_session_info(node.node_id()).await? {
682            Some(s) => s.outbound_key,
683            None => {
684                trace!(
685                    protocol = "discv5",
686                    node = %node.node_id(),
687                    "No session found in send_handshake, using zeroed key"
688                );
689                [0; 16]
690            }
691        };
692
693        let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
694        let mut rng = OsRng;
695        let masking_iv: u128 = rng.r#gen();
696        let nonce = discv5.next_nonce(&mut rng);
697
698        let packet = handshake.encode(&nonce, masking_iv.to_be_bytes(), &encrypt_key)?;
699
700        self.discv5_send_packet(&packet, &node.node_id(), node.udp_addr())
701            .await?;
702        let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
703        discv5
704            .pending_by_nonce
705            .insert(nonce, (node, message, Instant::now()));
706        Ok(())
707    }
708
709    pub async fn discv5_send_who_are_you(
710        &mut self,
711        nonce: [u8; 12],
712        src_id: H256,
713        addr: SocketAddr,
714    ) -> Result<(), DiscoveryServerError> {
715        #[cfg(feature = "metrics")]
716        {
717            use ethrex_metrics::p2p::METRICS_P2P;
718            METRICS_P2P.inc_discv5_outgoing("WhoAreYou");
719        }
720        let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
721
722        let rate_key = (addr.ip(), src_id);
723        let now = Instant::now();
724
725        // Global rate limit
726        if now.duration_since(discv5.whoareyou_global_window_start) >= Duration::from_secs(1) {
727            discv5.whoareyou_global_count = 0;
728            discv5.whoareyou_global_window_start = now;
729        }
730        if discv5.whoareyou_global_count >= GLOBAL_WHOAREYOU_RATE_LIMIT {
731            if discv5.whoareyou_global_count == GLOBAL_WHOAREYOU_RATE_LIMIT {
732                discv5.whoareyou_global_count = GLOBAL_WHOAREYOU_RATE_LIMIT + 1;
733                warn!(
734                    protocol = "discv5",
735                    "Global WHOAREYOU rate limit reached ({GLOBAL_WHOAREYOU_RATE_LIMIT}/s), \
736                     dropping excess packets. This is normal during initial discovery or \
737                     network churn; persistent occurrences may indicate a DoS attempt"
738                );
739            }
740            return Ok(());
741        }
742
743        // Resend existing challenge if pending
744        if let Some((_, _, raw_bytes)) = discv5.pending_challenges.get(&src_id) {
745            trace!(
746                protocol = "discv5",
747                to = %src_id,
748                %addr,
749                "Resending existing WhoAreYou challenge"
750            );
751            self.udp_socket.send_to(raw_bytes, addr).await?;
752            return Ok(());
753        }
754
755        // Per-(IP, node) rate limit
756        if !Discv5State::is_private_ip(addr.ip())
757            && let Some(last_sent) = discv5.whoareyou_rate_limit.get(&rate_key)
758            && now.duration_since(*last_sent) < WHOAREYOU_RATE_LIMIT
759        {
760            trace!(
761                protocol = "discv5",
762                to_ip = %addr.ip(),
763                "Rate limiting WHOAREYOU packet (amplification attack prevention)"
764            );
765            return Ok(());
766        }
767
768        discv5.whoareyou_rate_limit.push(rate_key, now);
769        discv5.whoareyou_global_count += 1;
770
771        let mut rng = OsRng;
772
773        let enr_seq = self
774            .peer_table
775            .get_contact(src_id)
776            .await?
777            .map_or(0, |c| c.record.as_ref().map_or(0, |r| r.seq));
778
779        let who_are_you = WhoAreYou {
780            id_nonce: rng.r#gen(),
781            enr_seq,
782        };
783
784        let masking_iv: u128 = rng.r#gen();
785        let packet = who_are_you.encode(&nonce, masking_iv.to_be_bytes(), &[0; 16])?;
786
787        let mut raw_buf = BytesMut::new();
788        packet.encode(&mut raw_buf, &src_id)?;
789        let raw_bytes = raw_buf.to_vec();
790
791        let challenge_data = build_challenge_data(
792            &masking_iv.to_be_bytes(),
793            &packet.header.static_header,
794            &packet.header.authdata,
795        );
796        let discv5 = self.discv5.as_mut().expect("discv5 state must exist");
797        discv5
798            .pending_challenges
799            .insert(src_id, (challenge_data, Instant::now(), raw_bytes.clone()));
800
801        self.udp_socket.send_to(&raw_bytes, addr).await?;
802        trace!(protocol = "discv5", to = %src_id, %addr, flag = packet.header.flag, "Sent packet");
803
804        Ok(())
805    }
806
807    async fn discv5_send_packet(
808        &self,
809        packet: &Packet,
810        dest_id: &H256,
811        addr: SocketAddr,
812    ) -> Result<(), DiscoveryServerError> {
813        let mut buf = BytesMut::new();
814        packet.encode(&mut buf, dest_id)?;
815        self.udp_socket.send_to(&buf, addr).await?;
816        trace!(protocol = "discv5", to = %dest_id, %addr, flag = packet.header.flag, "Sent packet");
817        Ok(())
818    }
819
820    async fn discv5_handle_message(
821        &mut self,
822        ordinary: Ordinary,
823        sender_addr: SocketAddr,
824        outbound_key: Option<[u8; 16]>,
825    ) -> Result<(), DiscoveryServerError> {
826        let sender_id = ordinary.src_id;
827        if sender_id == self.local_node.node_id() {
828            return Ok(());
829        }
830        #[cfg(feature = "metrics")]
831        {
832            use ethrex_metrics::p2p::METRICS_P2P;
833            METRICS_P2P.inc_discv5_incoming(ordinary.message.metric_label());
834        }
835        match ordinary.message {
836            Message::Ping(ping_message) => {
837                if ping_message.req_id.len() > 8 {
838                    trace!(protocol = "discv5", from = %sender_id, "Dropping PING with oversized req_id");
839                    return Ok(());
840                }
841                self.discv5_handle_ping(ping_message, sender_id, sender_addr, outbound_key)
842                    .await?
843            }
844            Message::Pong(pong_message) => {
845                self.discv5_handle_pong(pong_message, sender_id).await?;
846            }
847            Message::FindNode(find_node_message) => {
848                if find_node_message.req_id.len() > 8 {
849                    trace!(protocol = "discv5", from = %sender_id, "Dropping FINDNODE with oversized req_id");
850                    return Ok(());
851                }
852                self.discv5_handle_find_node(
853                    find_node_message,
854                    sender_id,
855                    sender_addr,
856                    outbound_key,
857                )
858                .await?;
859            }
860            Message::Nodes(nodes_message) => {
861                self.discv5_handle_nodes_message(nodes_message).await?;
862            }
863            Message::TalkReq(talk_req_message) => {
864                if talk_req_message.req_id.len() > 8 {
865                    trace!(protocol = "discv5", from = %sender_id, "Dropping TALKREQ with oversized req_id");
866                    return Ok(());
867                }
868                let talk_res = Message::TalkRes(TalkResMessage {
869                    req_id: talk_req_message.req_id,
870                    response: vec![],
871                });
872                let key = self
873                    .discv5_resolve_outbound_key(&sender_id, outbound_key)
874                    .await?;
875                self.discv5_send_ordinary_to(talk_res, &sender_id, sender_addr, &key)
876                    .await?;
877            }
878            Message::TalkRes(_talk_res_message) => (),
879            Message::Ticket(_ticket_message) => (),
880        }
881        Ok(())
882    }
883}
884
885fn generate_req_id() -> Bytes {
886    let mut rng = OsRng;
887    Bytes::from(rng.r#gen::<u64>().to_be_bytes().to_vec())
888}