1use crate::{
2 discv4::{
3 messages::Packet as Discv4Packet,
4 server::{Discv4Message, Discv4State},
5 },
6 discv5::{
7 messages::{Packet as Discv5Packet, PacketCodecError},
8 server::{Discv5Message, Discv5State, update_local_ip},
9 },
10 peer_table::{DiscoveryProtocol, PeerTable, PeerTableServerProtocol as _},
11 types::{INITIAL_ENR_SEQ, Node, NodeRecord},
12};
13use bytes::BytesMut;
14use ethrex_common::utils::keccak;
15use ethrex_storage::Store;
16use futures::StreamExt;
17use secp256k1::SecretKey;
18use spawned_concurrency::{
19 actor,
20 error::ActorError,
21 protocol,
22 tasks::{
23 Actor, ActorStart as _, Context, Handler, send_after, send_interval, send_message_on,
24 spawn_listener,
25 },
26};
27use std::{net::SocketAddr, sync::Arc, time::Duration};
28use thiserror::Error;
29use tokio::net::UdpSocket;
30use tokio_util::udp::UdpFramed;
31use tracing::{debug, error, info, trace};
32
33use super::{DiscoveryConfig, codec::DiscriminatingCodec, lookup_interval_function};
34
35const DISCV4_MIN_PACKET_SIZE: usize = 98;
38
39const REVALIDATION_CHECK_INTERVAL: Duration = Duration::from_secs(1);
41const PRUNE_INTERVAL: Duration = Duration::from_secs(5);
42
43const ITERATIVE_LOOKUP_INITIAL_MS: f64 = 500.0; const ITERATIVE_LOOKUP_INTERVAL_MS: f64 = 10_000.0; #[derive(Debug, Error)]
50pub enum DiscoveryServerError {
51 #[error(transparent)]
52 IoError(#[from] std::io::Error),
53 #[error("Failed to decode discv4 packet")]
54 Discv4Decode(#[from] crate::discv4::messages::PacketDecodeErr),
55 #[error("Failed to decode discv5 packet")]
56 Discv5Decode(#[from] crate::discv5::messages::PacketCodecError),
57 #[error("Only partial message was sent")]
58 PartialMessageSent,
59 #[error("Unknown or invalid contact")]
60 InvalidContact,
61 #[error(transparent)]
62 PeerTable(#[from] ActorError),
63 #[error(transparent)]
64 Store(#[from] ethrex_storage::error::StoreError),
65 #[error("Internal error {0}")]
66 InternalError(String),
67 #[error("Cryptography Error {0}")]
68 CryptographyError(String),
69 #[error(transparent)]
70 RlpDecode(#[from] ethrex_rlp::error::RLPDecodeError),
71}
72
73#[protocol]
74pub trait DiscoveryServerProtocol: Send + Sync {
75 fn raw_packet(&self, data: BytesMut, from: SocketAddr) -> Result<(), ActorError>;
76 fn revalidate_v4(&self) -> Result<(), ActorError>;
77 fn revalidate_v5(&self) -> Result<(), ActorError>;
78 fn lookup_v4(&self) -> Result<(), ActorError>;
79 fn lookup_v5(&self) -> Result<(), ActorError>;
80 fn enr_lookup(&self) -> Result<(), ActorError>;
81 fn prune(&self) -> Result<(), ActorError>;
82 fn shutdown(&self) -> Result<(), ActorError>;
83}
84
85pub struct DiscoveryServer {
86 pub local_node: Node,
87 pub local_node_record: NodeRecord,
88 pub(crate) signer: SecretKey,
89 pub(crate) udp_socket: Arc<UdpSocket>,
90 pub(crate) store: Store,
91 pub peer_table: PeerTable,
92 pub(crate) config: DiscoveryConfig,
93 pub discv4: Option<Discv4State>,
94 pub discv5: Option<Discv5State>,
95}
96
97impl std::fmt::Debug for DiscoveryServer {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_struct("DiscoveryServer")
100 .field("local_node", &self.local_node)
101 .field("discv4_enabled", &self.discv4.is_some())
102 .field("discv5_enabled", &self.discv5.is_some())
103 .finish()
104 }
105}
106
107#[actor(protocol = DiscoveryServerProtocol)]
108impl DiscoveryServer {
109 pub async fn spawn(
110 storage: Store,
111 local_node: Node,
112 signer: SecretKey,
113 udp_socket: Arc<UdpSocket>,
114 peer_table: PeerTable,
115 bootnodes: Vec<Node>,
116 config: DiscoveryConfig,
117 ) -> Result<(), DiscoveryServerError> {
118 debug!("Starting discovery server");
119
120 let mut local_node_record = NodeRecord::from_node(&local_node, INITIAL_ENR_SEQ, &signer)
121 .expect("Failed to create local node record");
122 if let Ok(fork_id) = storage.get_fork_id().await {
123 local_node_record
124 .set_fork_id(fork_id, &signer)
125 .expect("Failed to set fork_id on local node record");
126 }
127
128 let discv4 = if config.discv4_enabled {
129 info!(
130 protocol = "discv4",
131 count = bootnodes.len(),
132 "Adding bootnodes"
133 );
134 peer_table.new_contacts(bootnodes.clone(), DiscoveryProtocol::Discv4)?;
135 Some(Discv4State::default())
136 } else {
137 None
138 };
139
140 let discv5 = if config.discv5_enabled {
141 info!(
142 protocol = "discv5",
143 count = bootnodes.len(),
144 "Adding bootnodes"
145 );
146 peer_table.new_contacts(bootnodes.clone(), DiscoveryProtocol::Discv5)?;
147 Some(Discv5State::default())
148 } else {
149 None
150 };
151
152 let mut server = Self {
153 local_node: local_node.clone(),
154 local_node_record,
155 signer,
156 udp_socket: udp_socket.clone(),
157 store: storage,
158 peer_table: peer_table.clone(),
159 config,
160 discv4,
161 discv5,
162 };
163
164 if server.discv4.is_some() {
166 for bootnode in &bootnodes {
167 server.discv4_send_ping(bootnode).await?;
168 }
169 }
170
171 server.start();
172
173 Ok(())
174 }
175
176 #[started]
177 async fn started(&mut self, ctx: &Context<Self>) {
178 let local_addr = self.udp_socket.local_addr();
179 info!(
180 local_addr=?local_addr,
181 discv4_enabled=self.config.discv4_enabled,
182 discv5_enabled=self.config.discv5_enabled,
183 "Discovery server started, listening for UDP packets"
184 );
185
186 let stream = UdpFramed::new(self.udp_socket.clone(), DiscriminatingCodec::new());
188 spawn_listener(
189 ctx.clone(),
190 stream.filter_map(|result| async move {
191 match result {
192 Ok((data, from)) => Some(discovery_server_protocol::RawPacket { data, from }),
193 Err(e) => {
194 debug!(error=?e, "Error receiving packet in discovery server");
195 None
196 }
197 }
198 }),
199 );
200
201 if self.discv4.is_some() {
203 send_interval(
204 REVALIDATION_CHECK_INTERVAL,
205 ctx.clone(),
206 discovery_server_protocol::RevalidateV4,
207 );
208 let _ = ctx.send(discovery_server_protocol::LookupV4);
209 let _ = ctx.send(discovery_server_protocol::EnrLookup);
210 }
211
212 if self.discv5.is_some() {
214 send_interval(
215 REVALIDATION_CHECK_INTERVAL,
216 ctx.clone(),
217 discovery_server_protocol::RevalidateV5,
218 );
219 let _ = ctx.send(discovery_server_protocol::LookupV5);
220 }
221
222 send_interval(
224 PRUNE_INTERVAL,
225 ctx.clone(),
226 discovery_server_protocol::Prune,
227 );
228
229 send_message_on(
231 ctx.clone(),
232 tokio::signal::ctrl_c(),
233 discovery_server_protocol::Shutdown,
234 );
235 }
236
237 #[send_handler]
238 async fn handle_raw_packet(
239 &mut self,
240 msg: discovery_server_protocol::RawPacket,
241 _ctx: &Context<Self>,
242 ) {
243 self.route_packet(&msg.data, msg.from).await;
244 }
245
246 #[send_handler]
247 async fn handle_revalidate_v4(
248 &mut self,
249 _msg: discovery_server_protocol::RevalidateV4,
250 _ctx: &Context<Self>,
251 ) {
252 trace!(protocol = "discv4", received = "Revalidate");
253 let _ = self.discv4_revalidate().await.inspect_err(
254 |e| error!(protocol = "discv4", err=?e, "Error revalidating discovered peers"),
255 );
256 }
257
258 #[send_handler]
259 async fn handle_revalidate_v5(
260 &mut self,
261 _msg: discovery_server_protocol::RevalidateV5,
262 _ctx: &Context<Self>,
263 ) {
264 trace!(protocol = "discv5", received = "Revalidate");
265 let _ = self.discv5_revalidate().await.inspect_err(
266 |e| error!(protocol = "discv5", err=?e, "Error revalidating discovered peers"),
267 );
268 }
269
270 #[send_handler]
271 async fn handle_lookup_v4(
272 &mut self,
273 _msg: discovery_server_protocol::LookupV4,
274 ctx: &Context<Self>,
275 ) {
276 trace!(protocol = "discv4", received = "Lookup");
277 let _ = self.discv4_lookup().await.inspect_err(
278 |e| error!(protocol = "discv4", err=?e, "Error performing Discovery lookup"),
279 );
280 let interval = self.get_lookup_interval().await;
281 send_after(interval, ctx.clone(), discovery_server_protocol::LookupV4);
282 }
283
284 #[send_handler]
285 async fn handle_lookup_v5(
286 &mut self,
287 _msg: discovery_server_protocol::LookupV5,
288 ctx: &Context<Self>,
289 ) {
290 trace!(protocol = "discv5", received = "Lookup");
291 let _ = self.discv5_lookup().await.inspect_err(
292 |e| error!(protocol = "discv5", err=?e, "Error performing Discovery lookup"),
293 );
294 let interval = self.get_lookup_interval().await;
295 send_after(interval, ctx.clone(), discovery_server_protocol::LookupV5);
296 }
297
298 #[send_handler]
299 async fn handle_enr_lookup(
300 &mut self,
301 _msg: discovery_server_protocol::EnrLookup,
302 ctx: &Context<Self>,
303 ) {
304 trace!(protocol = "discv4", received = "EnrLookup");
305 let _ = self.discv4_enr_lookup().await.inspect_err(
306 |e| error!(protocol = "discv4", err=?e, "Error performing Discovery lookup"),
307 );
308 let interval = self.get_lookup_interval().await;
309 send_after(interval, ctx.clone(), discovery_server_protocol::EnrLookup);
310 }
311
312 #[send_handler]
313 async fn handle_prune(&mut self, _msg: discovery_server_protocol::Prune, _ctx: &Context<Self>) {
314 trace!(received = "Prune");
315 let _ = self
316 .prune()
317 .await
318 .inspect_err(|e| error!(err=?e, "Error Pruning peer table"));
319 }
320
321 #[send_handler]
322 async fn handle_shutdown(
323 &mut self,
324 _msg: discovery_server_protocol::Shutdown,
325 ctx: &Context<Self>,
326 ) {
327 ctx.stop();
328 }
329
330 async fn route_packet(&mut self, data: &[u8], from: SocketAddr) {
333 if is_discv4_packet(data) {
334 self.route_to_discv4(data, from).await;
335 } else {
336 self.route_to_discv5(data, from).await;
337 }
338 }
339
340 async fn route_to_discv4(&mut self, data: &[u8], from: SocketAddr) {
341 if self.discv4.is_none() {
342 return;
343 }
344 match Discv4Packet::decode(data) {
345 Ok(packet) => {
346 let msg = Discv4Message::from(packet, from);
347 let _ = self.discv4_process_message(msg).await.inspect_err(
348 |e| error!(protocol = "discv4", err=?e, "Error handling discovery message"),
349 );
350 }
351 Err(e) => {
352 debug!(error=?e, "Failed to decode discv4 packet");
353 }
354 }
355 }
356
357 async fn route_to_discv5(&mut self, data: &[u8], from: SocketAddr) {
358 if self.discv5.is_none() {
359 return;
360 }
361 match Discv5Packet::decode(&self.local_node.node_id(), data) {
362 Ok(packet) => {
363 let msg = Discv5Message::from(packet, from);
364 let _ = self.discv5_handle_packet(msg).await.inspect_err(
365 |e| trace!(protocol = "discv5", err=?e, "Error handling discovery message"),
366 );
367 }
368 Err(
369 PacketCodecError::InvalidProtocol(_)
370 | PacketCodecError::InvalidHeader
371 | PacketCodecError::InvalidSize
372 | PacketCodecError::CipherError(_),
373 ) => {
374 trace!(from=?from, "Dropping unrecognized UDP packet");
375 }
376 Err(e) => {
377 debug!(error=?e, "Failed to decode discv5 packet");
378 }
379 }
380 }
381
382 async fn prune(&mut self) -> Result<(), DiscoveryServerError> {
383 self.peer_table.prune_table()?;
384 if let Some(discv4) = &mut self.discv4 {
385 let expiration = Duration::from_secs(crate::discv4::server::EXPIRATION_SECONDS);
386 discv4
387 .pending_find_node
388 .retain(|_, sent_at| sent_at.elapsed() < expiration);
389 }
390 let winning_ip = self
391 .discv5
392 .as_mut()
393 .and_then(|discv5| discv5.cleanup_stale_entries());
394 if let Some(winning_ip) = winning_ip
395 && winning_ip != self.local_node.ip
396 {
397 info!(
398 protocol = "discv5",
399 old_ip = %self.local_node.ip,
400 new_ip = %winning_ip,
401 "External IP detected via PONG voting, updating local ENR"
402 );
403 update_local_ip(
404 &mut self.local_node,
405 &mut self.local_node_record,
406 &self.signer,
407 winning_ip,
408 );
409 }
410 Ok(())
411 }
412
413 pub(crate) async fn get_lookup_interval(&self) -> Duration {
414 let peer_completion = self
415 .peer_table
416 .target_peers_completion()
417 .await
418 .unwrap_or_default();
419 lookup_interval_function(
420 peer_completion,
421 ITERATIVE_LOOKUP_INITIAL_MS,
422 ITERATIVE_LOOKUP_INTERVAL_MS,
423 )
424 }
425}
426
427pub fn is_discv4_packet(data: &[u8]) -> bool {
429 if data.len() < DISCV4_MIN_PACKET_SIZE {
430 return false;
431 }
432 let packet_hash = &data[0..32];
433 let computed_hash = keccak(&data[32..]);
434 packet_hash == computed_hash.as_bytes()
435}
436
437#[cfg(any(test, feature = "test-utils"))]
438impl DiscoveryServer {
439 pub fn new_for_discv5_test(
443 local_node: Node,
444 local_node_record: NodeRecord,
445 signer: SecretKey,
446 udp_socket: Arc<UdpSocket>,
447 peer_table: PeerTable,
448 ) -> Self {
449 Self {
450 local_node,
451 local_node_record,
452 signer,
453 udp_socket,
454 store: Store::new("", ethrex_storage::EngineType::InMemory)
455 .expect("Failed to create store"),
456 peer_table,
457 config: DiscoveryConfig {
458 discv4_enabled: false,
459 discv5_enabled: true,
460 initial_lookup_interval: 1000.0,
461 },
462 discv4: None,
463 discv5: Some(Discv5State::default()),
464 }
465 }
466}