Skip to main content

ethrex_p2p/discovery/
server.rs

1use crate::{
2    discv4::{
3        messages::Packet as Discv4Packet,
4        server::{Discv4Message, Discv4State},
5    },
6    discv5::{
7        messages::{Packet as Discv5Packet, PacketCodecError},
8        server::{Discv5Message, Discv5State, update_local_ip},
9    },
10    peer_table::{DiscoveryProtocol, PeerTable, PeerTableServerProtocol as _},
11    types::{INITIAL_ENR_SEQ, Node, NodeRecord},
12};
13use bytes::BytesMut;
14use ethrex_common::utils::keccak;
15use ethrex_storage::Store;
16use futures::StreamExt;
17use secp256k1::SecretKey;
18use spawned_concurrency::{
19    actor,
20    error::ActorError,
21    protocol,
22    tasks::{
23        Actor, ActorStart as _, Context, Handler, send_after, send_interval, send_message_on,
24        spawn_listener,
25    },
26};
27use std::{net::SocketAddr, sync::Arc, time::Duration};
28use thiserror::Error;
29use tokio::net::UdpSocket;
30use tokio_util::udp::UdpFramed;
31use tracing::{debug, error, info, trace};
32
33use super::{DiscoveryConfig, codec::DiscriminatingCodec, lookup_interval_function};
34
35/// Minimum packet size for a valid discv4 packet.
36/// hash (32) + signature (65) + type (1) = 98 bytes
37const DISCV4_MIN_PACKET_SIZE: usize = 98;
38
39// Shared constants
40const REVALIDATION_CHECK_INTERVAL: Duration = Duration::from_secs(1);
41const PRUNE_INTERVAL: Duration = Duration::from_secs(5);
42
43/// Lookup interval bounds for iterative lookups. Each iterative lookup
44/// generates ~16 FindNode messages (vs 1 in the old approach), so we use
45/// longer intervals to produce similar per-second traffic.
46const ITERATIVE_LOOKUP_INITIAL_MS: f64 = 500.0; // 6 FindNode/sec at startup (alpha=3 × 2 ticks/sec)
47const ITERATIVE_LOOKUP_INTERVAL_MS: f64 = 10_000.0; // ~6 lookups/min at steady-state
48
49#[derive(Debug, Error)]
50pub enum DiscoveryServerError {
51    #[error(transparent)]
52    IoError(#[from] std::io::Error),
53    #[error("Failed to decode discv4 packet")]
54    Discv4Decode(#[from] crate::discv4::messages::PacketDecodeErr),
55    #[error("Failed to decode discv5 packet")]
56    Discv5Decode(#[from] crate::discv5::messages::PacketCodecError),
57    #[error("Only partial message was sent")]
58    PartialMessageSent,
59    #[error("Unknown or invalid contact")]
60    InvalidContact,
61    #[error(transparent)]
62    PeerTable(#[from] ActorError),
63    #[error(transparent)]
64    Store(#[from] ethrex_storage::error::StoreError),
65    #[error("Internal error {0}")]
66    InternalError(String),
67    #[error("Cryptography Error {0}")]
68    CryptographyError(String),
69    #[error(transparent)]
70    RlpDecode(#[from] ethrex_rlp::error::RLPDecodeError),
71}
72
73#[protocol]
74pub trait DiscoveryServerProtocol: Send + Sync {
75    fn raw_packet(&self, data: BytesMut, from: SocketAddr) -> Result<(), ActorError>;
76    fn revalidate_v4(&self) -> Result<(), ActorError>;
77    fn revalidate_v5(&self) -> Result<(), ActorError>;
78    fn lookup_v4(&self) -> Result<(), ActorError>;
79    fn lookup_v5(&self) -> Result<(), ActorError>;
80    fn enr_lookup(&self) -> Result<(), ActorError>;
81    fn prune(&self) -> Result<(), ActorError>;
82    fn shutdown(&self) -> Result<(), ActorError>;
83}
84
85pub struct DiscoveryServer {
86    pub local_node: Node,
87    pub local_node_record: NodeRecord,
88    pub(crate) signer: SecretKey,
89    pub(crate) udp_socket: Arc<UdpSocket>,
90    pub(crate) store: Store,
91    pub peer_table: PeerTable,
92    pub(crate) config: DiscoveryConfig,
93    pub discv4: Option<Discv4State>,
94    pub discv5: Option<Discv5State>,
95}
96
97impl std::fmt::Debug for DiscoveryServer {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_struct("DiscoveryServer")
100            .field("local_node", &self.local_node)
101            .field("discv4_enabled", &self.discv4.is_some())
102            .field("discv5_enabled", &self.discv5.is_some())
103            .finish()
104    }
105}
106
107#[actor(protocol = DiscoveryServerProtocol)]
108impl DiscoveryServer {
109    pub async fn spawn(
110        storage: Store,
111        local_node: Node,
112        signer: SecretKey,
113        udp_socket: Arc<UdpSocket>,
114        peer_table: PeerTable,
115        bootnodes: Vec<Node>,
116        config: DiscoveryConfig,
117    ) -> Result<(), DiscoveryServerError> {
118        debug!("Starting discovery server");
119
120        let mut local_node_record = NodeRecord::from_node(&local_node, INITIAL_ENR_SEQ, &signer)
121            .expect("Failed to create local node record");
122        if let Ok(fork_id) = storage.get_fork_id().await {
123            local_node_record
124                .set_fork_id(fork_id, &signer)
125                .expect("Failed to set fork_id on local node record");
126        }
127
128        let discv4 = if config.discv4_enabled {
129            info!(
130                protocol = "discv4",
131                count = bootnodes.len(),
132                "Adding bootnodes"
133            );
134            peer_table.new_contacts(bootnodes.clone(), DiscoveryProtocol::Discv4)?;
135            Some(Discv4State::default())
136        } else {
137            None
138        };
139
140        let discv5 = if config.discv5_enabled {
141            info!(
142                protocol = "discv5",
143                count = bootnodes.len(),
144                "Adding bootnodes"
145            );
146            peer_table.new_contacts(bootnodes.clone(), DiscoveryProtocol::Discv5)?;
147            Some(Discv5State::default())
148        } else {
149            None
150        };
151
152        let mut server = Self {
153            local_node: local_node.clone(),
154            local_node_record,
155            signer,
156            udp_socket: udp_socket.clone(),
157            store: storage,
158            peer_table: peer_table.clone(),
159            config,
160            discv4,
161            discv5,
162        };
163
164        // Ping discv4 bootnodes
165        if server.discv4.is_some() {
166            for bootnode in &bootnodes {
167                server.discv4_send_ping(bootnode).await?;
168            }
169        }
170
171        server.start();
172
173        Ok(())
174    }
175
176    #[started]
177    async fn started(&mut self, ctx: &Context<Self>) {
178        let local_addr = self.udp_socket.local_addr();
179        info!(
180            local_addr=?local_addr,
181            discv4_enabled=self.config.discv4_enabled,
182            discv5_enabled=self.config.discv5_enabled,
183            "Discovery server started, listening for UDP packets"
184        );
185
186        // Set up UDP listener
187        let stream = UdpFramed::new(self.udp_socket.clone(), DiscriminatingCodec::new());
188        spawn_listener(
189            ctx.clone(),
190            stream.filter_map(|result| async move {
191                match result {
192                    Ok((data, from)) => Some(discovery_server_protocol::RawPacket { data, from }),
193                    Err(e) => {
194                        debug!(error=?e, "Error receiving packet in discovery server");
195                        None
196                    }
197                }
198            }),
199        );
200
201        // Discv4 timers
202        if self.discv4.is_some() {
203            send_interval(
204                REVALIDATION_CHECK_INTERVAL,
205                ctx.clone(),
206                discovery_server_protocol::RevalidateV4,
207            );
208            let _ = ctx.send(discovery_server_protocol::LookupV4);
209            let _ = ctx.send(discovery_server_protocol::EnrLookup);
210        }
211
212        // Discv5 timers
213        if self.discv5.is_some() {
214            send_interval(
215                REVALIDATION_CHECK_INTERVAL,
216                ctx.clone(),
217                discovery_server_protocol::RevalidateV5,
218            );
219            let _ = ctx.send(discovery_server_protocol::LookupV5);
220        }
221
222        // Shared prune timer
223        send_interval(
224            PRUNE_INTERVAL,
225            ctx.clone(),
226            discovery_server_protocol::Prune,
227        );
228
229        // Shutdown handler
230        send_message_on(
231            ctx.clone(),
232            tokio::signal::ctrl_c(),
233            discovery_server_protocol::Shutdown,
234        );
235    }
236
237    #[send_handler]
238    async fn handle_raw_packet(
239        &mut self,
240        msg: discovery_server_protocol::RawPacket,
241        _ctx: &Context<Self>,
242    ) {
243        self.route_packet(&msg.data, msg.from).await;
244    }
245
246    #[send_handler]
247    async fn handle_revalidate_v4(
248        &mut self,
249        _msg: discovery_server_protocol::RevalidateV4,
250        _ctx: &Context<Self>,
251    ) {
252        trace!(protocol = "discv4", received = "Revalidate");
253        let _ = self.discv4_revalidate().await.inspect_err(
254            |e| error!(protocol = "discv4", err=?e, "Error revalidating discovered peers"),
255        );
256    }
257
258    #[send_handler]
259    async fn handle_revalidate_v5(
260        &mut self,
261        _msg: discovery_server_protocol::RevalidateV5,
262        _ctx: &Context<Self>,
263    ) {
264        trace!(protocol = "discv5", received = "Revalidate");
265        let _ = self.discv5_revalidate().await.inspect_err(
266            |e| error!(protocol = "discv5", err=?e, "Error revalidating discovered peers"),
267        );
268    }
269
270    #[send_handler]
271    async fn handle_lookup_v4(
272        &mut self,
273        _msg: discovery_server_protocol::LookupV4,
274        ctx: &Context<Self>,
275    ) {
276        trace!(protocol = "discv4", received = "Lookup");
277        let _ = self.discv4_lookup().await.inspect_err(
278            |e| error!(protocol = "discv4", err=?e, "Error performing Discovery lookup"),
279        );
280        let interval = self.get_lookup_interval().await;
281        send_after(interval, ctx.clone(), discovery_server_protocol::LookupV4);
282    }
283
284    #[send_handler]
285    async fn handle_lookup_v5(
286        &mut self,
287        _msg: discovery_server_protocol::LookupV5,
288        ctx: &Context<Self>,
289    ) {
290        trace!(protocol = "discv5", received = "Lookup");
291        let _ = self.discv5_lookup().await.inspect_err(
292            |e| error!(protocol = "discv5", err=?e, "Error performing Discovery lookup"),
293        );
294        let interval = self.get_lookup_interval().await;
295        send_after(interval, ctx.clone(), discovery_server_protocol::LookupV5);
296    }
297
298    #[send_handler]
299    async fn handle_enr_lookup(
300        &mut self,
301        _msg: discovery_server_protocol::EnrLookup,
302        ctx: &Context<Self>,
303    ) {
304        trace!(protocol = "discv4", received = "EnrLookup");
305        let _ = self.discv4_enr_lookup().await.inspect_err(
306            |e| error!(protocol = "discv4", err=?e, "Error performing Discovery lookup"),
307        );
308        let interval = self.get_lookup_interval().await;
309        send_after(interval, ctx.clone(), discovery_server_protocol::EnrLookup);
310    }
311
312    #[send_handler]
313    async fn handle_prune(&mut self, _msg: discovery_server_protocol::Prune, _ctx: &Context<Self>) {
314        trace!(received = "Prune");
315        let _ = self
316            .prune()
317            .await
318            .inspect_err(|e| error!(err=?e, "Error Pruning peer table"));
319    }
320
321    #[send_handler]
322    async fn handle_shutdown(
323        &mut self,
324        _msg: discovery_server_protocol::Shutdown,
325        ctx: &Context<Self>,
326    ) {
327        ctx.stop();
328    }
329
330    // --- Shared logic ---
331
332    async fn route_packet(&mut self, data: &[u8], from: SocketAddr) {
333        if is_discv4_packet(data) {
334            self.route_to_discv4(data, from).await;
335        } else {
336            self.route_to_discv5(data, from).await;
337        }
338    }
339
340    async fn route_to_discv4(&mut self, data: &[u8], from: SocketAddr) {
341        if self.discv4.is_none() {
342            return;
343        }
344        match Discv4Packet::decode(data) {
345            Ok(packet) => {
346                let msg = Discv4Message::from(packet, from);
347                let _ = self.discv4_process_message(msg).await.inspect_err(
348                    |e| error!(protocol = "discv4", err=?e, "Error handling discovery message"),
349                );
350            }
351            Err(e) => {
352                debug!(error=?e, "Failed to decode discv4 packet");
353            }
354        }
355    }
356
357    async fn route_to_discv5(&mut self, data: &[u8], from: SocketAddr) {
358        if self.discv5.is_none() {
359            return;
360        }
361        match Discv5Packet::decode(&self.local_node.node_id(), data) {
362            Ok(packet) => {
363                let msg = Discv5Message::from(packet, from);
364                let _ = self.discv5_handle_packet(msg).await.inspect_err(
365                    |e| trace!(protocol = "discv5", err=?e, "Error handling discovery message"),
366                );
367            }
368            Err(
369                PacketCodecError::InvalidProtocol(_)
370                | PacketCodecError::InvalidHeader
371                | PacketCodecError::InvalidSize
372                | PacketCodecError::CipherError(_),
373            ) => {
374                trace!(from=?from, "Dropping unrecognized UDP packet");
375            }
376            Err(e) => {
377                debug!(error=?e, "Failed to decode discv5 packet");
378            }
379        }
380    }
381
382    async fn prune(&mut self) -> Result<(), DiscoveryServerError> {
383        self.peer_table.prune_table()?;
384        if let Some(discv4) = &mut self.discv4 {
385            let expiration = Duration::from_secs(crate::discv4::server::EXPIRATION_SECONDS);
386            discv4
387                .pending_find_node
388                .retain(|_, sent_at| sent_at.elapsed() < expiration);
389        }
390        let winning_ip = self
391            .discv5
392            .as_mut()
393            .and_then(|discv5| discv5.cleanup_stale_entries());
394        if let Some(winning_ip) = winning_ip
395            && winning_ip != self.local_node.ip
396        {
397            info!(
398                protocol = "discv5",
399                old_ip = %self.local_node.ip,
400                new_ip = %winning_ip,
401                "External IP detected via PONG voting, updating local ENR"
402            );
403            update_local_ip(
404                &mut self.local_node,
405                &mut self.local_node_record,
406                &self.signer,
407                winning_ip,
408            );
409        }
410        Ok(())
411    }
412
413    pub(crate) async fn get_lookup_interval(&self) -> Duration {
414        let peer_completion = self
415            .peer_table
416            .target_peers_completion()
417            .await
418            .unwrap_or_default();
419        lookup_interval_function(
420            peer_completion,
421            ITERATIVE_LOOKUP_INITIAL_MS,
422            ITERATIVE_LOOKUP_INTERVAL_MS,
423        )
424    }
425}
426
427/// Check if a packet is a discv4 packet by verifying the hash.
428pub fn is_discv4_packet(data: &[u8]) -> bool {
429    if data.len() < DISCV4_MIN_PACKET_SIZE {
430        return false;
431    }
432    let packet_hash = &data[0..32];
433    let computed_hash = keccak(&data[32..]);
434    packet_hash == computed_hash.as_bytes()
435}
436
437#[cfg(any(test, feature = "test-utils"))]
438impl DiscoveryServer {
439    /// Builds a DiscoveryServer suitable for unit tests of discv5 handlers.
440    /// Only discv5 state is initialized; discv4 is disabled.
441    /// Uses an in-memory store and a dummy initial lookup interval.
442    pub fn new_for_discv5_test(
443        local_node: Node,
444        local_node_record: NodeRecord,
445        signer: SecretKey,
446        udp_socket: Arc<UdpSocket>,
447        peer_table: PeerTable,
448    ) -> Self {
449        Self {
450            local_node,
451            local_node_record,
452            signer,
453            udp_socket,
454            store: Store::new("", ethrex_storage::EngineType::InMemory)
455                .expect("Failed to create store"),
456            peer_table,
457            config: DiscoveryConfig {
458                discv4_enabled: false,
459                discv5_enabled: true,
460                initial_lookup_interval: 1000.0,
461            },
462            discv4: None,
463            discv5: Some(Discv5State::default()),
464        }
465    }
466}