Skip to main content

ethrex_p2p/discv5/
server.rs

1use crate::discovery::lookup::IterativeLookup;
2use crate::discv5::messages::Message;
3use crate::{
4    discv5::messages::Packet,
5    types::{Node, NodeRecord},
6};
7use ethrex_common::H256;
8use lru::LruCache;
9use rand::RngCore;
10use rustc_hash::{FxHashMap, FxHashSet};
11use std::{
12    net::{IpAddr, SocketAddr},
13    num::NonZero,
14    time::{Duration, Instant},
15};
16use tracing::trace;
17
18/// Maximum number of entries in the per-IP WHOAREYOU rate limit cache.
19pub const MAX_WHOAREYOU_RATE_LIMIT_ENTRIES: usize = 10_000;
20/// Time window for collecting IP votes from PONG recipient_addr.
21const IP_VOTE_WINDOW: Duration = Duration::from_secs(300);
22/// Minimum number of agreeing votes required to update external IP.
23const IP_VOTE_THRESHOLD: usize = 3;
24/// Timeout for pending messages awaiting WhoAreYou response.
25const MESSAGE_CACHE_TIMEOUT: Duration = Duration::from_secs(2);
26
27/// Discv5-specific state held within the unified DiscoveryServer.
28#[derive(Debug)]
29pub struct Discv5State {
30    /// Outgoing message count, used for nonce generation as per the spec.
31    pub counter: u32,
32    /// Pending outgoing messages awaiting WhoAreYou response, keyed by nonce.
33    pub pending_by_nonce: FxHashMap<[u8; 12], (Node, Message, Instant)>,
34    /// Pending WhoAreYou challenges awaiting Handshake response, keyed by src_id.
35    /// Tuple: (challenge_data, timestamp, encoded_packet_bytes).
36    pub pending_challenges: FxHashMap<H256, (Vec<u8>, Instant, Vec<u8>)>,
37    /// Tracks last WHOAREYOU send time per (source IP, node ID) to prevent amplification attacks.
38    pub whoareyou_rate_limit: LruCache<(IpAddr, H256), Instant>,
39    /// Global WHOAREYOU rate limit: count of packets sent in the current second.
40    pub whoareyou_global_count: u32,
41    /// Start of the current global rate limit window.
42    pub whoareyou_global_window_start: Instant,
43    /// Tracks the source IP that each session was established from.
44    pub session_ips: FxHashMap<H256, IpAddr>,
45    /// Collects recipient_addr IPs from PONGs for external IP detection via majority voting.
46    pub ip_votes: FxHashMap<IpAddr, FxHashSet<H256>>,
47    /// When the current IP voting period started. None if no votes received yet.
48    pub ip_vote_period_start: Option<Instant>,
49    /// Whether the first (fast) voting round has completed.
50    pub first_ip_vote_round_completed: bool,
51    /// Currently active iterative lookups.
52    pub active_lookups: Vec<IterativeLookup>,
53}
54
55impl Default for Discv5State {
56    fn default() -> Self {
57        Self {
58            counter: 0,
59            pending_by_nonce: Default::default(),
60            pending_challenges: Default::default(),
61            whoareyou_rate_limit: LruCache::new(
62                NonZero::new(MAX_WHOAREYOU_RATE_LIMIT_ENTRIES)
63                    .expect("MAX_WHOAREYOU_RATE_LIMIT_ENTRIES must be non-zero"),
64            ),
65            whoareyou_global_count: 0,
66            whoareyou_global_window_start: Instant::now(),
67            session_ips: Default::default(),
68            ip_votes: Default::default(),
69            ip_vote_period_start: None,
70            first_ip_vote_round_completed: false,
71            active_lookups: Vec::new(),
72        }
73    }
74}
75
76impl Discv5State {
77    /// Generates a 96-bit AES-GCM nonce.
78    /// Encodes the current outgoing message count into the first 32 bits
79    /// and fills the remaining 64 bits with random data.
80    pub fn next_nonce<R: RngCore>(&mut self, rng: &mut R) -> [u8; 12] {
81        let counter = self.counter;
82        self.counter = self.counter.wrapping_add(1);
83
84        let mut nonce = [0u8; 12];
85        nonce[..4].copy_from_slice(&counter.to_be_bytes());
86        rng.fill_bytes(&mut nonce[4..]);
87        nonce
88    }
89
90    /// Remove stale entries from caches.
91    /// Returns `Some(ip)` if a timed-out IP voting round produced a winning IP to apply.
92    pub fn cleanup_stale_entries(&mut self) -> Option<IpAddr> {
93        let now = Instant::now();
94
95        let before_messages = self.pending_by_nonce.len();
96        self.pending_by_nonce
97            .retain(|_nonce, (_node, _message, timestamp)| {
98                now.duration_since(*timestamp) < MESSAGE_CACHE_TIMEOUT
99            });
100        let removed_messages = before_messages - self.pending_by_nonce.len();
101
102        let before_challenges = self.pending_challenges.len();
103        self.pending_challenges
104            .retain(|_src_id, (_challenge_data, timestamp, _raw)| {
105                now.duration_since(*timestamp) < MESSAGE_CACHE_TIMEOUT
106            });
107        let removed_challenges = before_challenges - self.pending_challenges.len();
108
109        let total_removed = removed_messages + removed_challenges;
110        if total_removed > 0 {
111            trace!(
112                protocol = "discv5",
113                "Cleaned up {} stale entries ({} messages, {} challenges)",
114                total_removed,
115                removed_messages,
116                removed_challenges,
117            );
118        }
119
120        if let Some(start) = self.ip_vote_period_start
121            && now.duration_since(start) >= IP_VOTE_WINDOW
122        {
123            return self.finalize_ip_vote_round();
124        }
125        None
126    }
127
128    /// Records an IP vote from a PONG recipient_addr.
129    /// Returns `Some(ip)` if the voting round ended with a winning IP to apply.
130    pub fn record_ip_vote(&mut self, reported_ip: IpAddr, voter_id: H256) -> Option<IpAddr> {
131        if Self::is_private_ip(reported_ip) {
132            return None;
133        }
134
135        let now = Instant::now();
136
137        if self.ip_vote_period_start.is_none() {
138            self.ip_vote_period_start = Some(now);
139        }
140
141        self.ip_votes
142            .entry(reported_ip)
143            .or_default()
144            .insert(voter_id);
145
146        let total_votes: usize = self.ip_votes.values().map(|v| v.len()).sum();
147        let round_ended = if !self.first_ip_vote_round_completed {
148            total_votes >= IP_VOTE_THRESHOLD
149        } else {
150            self.ip_vote_period_start
151                .is_some_and(|start| now.duration_since(start) >= IP_VOTE_WINDOW)
152        };
153
154        if round_ended {
155            return self.finalize_ip_vote_round();
156        }
157        None
158    }
159
160    /// Finalizes the current voting round.
161    /// Returns `Some(winning_ip)` if a winner reached the threshold and should be applied.
162    fn finalize_ip_vote_round(&mut self) -> Option<IpAddr> {
163        let winner = self
164            .ip_votes
165            .iter()
166            .map(|(ip, voters)| (*ip, voters.len()))
167            .max_by_key(|(_, count)| *count);
168
169        let result = winner.and_then(|(winning_ip, vote_count)| {
170            (vote_count >= IP_VOTE_THRESHOLD).then_some(winning_ip)
171        });
172
173        self.ip_votes.clear();
174        self.ip_vote_period_start = Some(Instant::now());
175        self.first_ip_vote_round_completed = true;
176
177        result
178    }
179
180    /// Returns true if the IP is private/local (not useful for external connectivity).
181    pub fn is_private_ip(ip: IpAddr) -> bool {
182        match ip {
183            IpAddr::V4(v4) => v4.is_private() || v4.is_loopback() || v4.is_link_local(),
184            IpAddr::V6(v6) => {
185                v6.is_loopback()
186                    || v6.is_unspecified()
187                    // unique local (fc00::/7)
188                    || (v6.segments()[0] & 0xfe00) == 0xfc00
189                    // link-local (fe80::/10)
190                    || (v6.segments()[0] & 0xffc0) == 0xfe80
191            }
192        }
193    }
194}
195
196/// Updates local node IP and re-signs the ENR with incremented seq.
197pub(crate) fn update_local_ip(
198    local_node: &mut Node,
199    local_node_record: &mut NodeRecord,
200    signer: &secp256k1::SecretKey,
201    new_ip: IpAddr,
202) {
203    let mut updated_node = local_node.clone();
204    updated_node.ip = new_ip;
205    let new_seq = local_node_record.seq + 1;
206    let Ok(mut new_record) = NodeRecord::from_node(&updated_node, new_seq, signer) else {
207        tracing::error!(%new_ip, "Failed to create new ENR for IP update");
208        return;
209    };
210    if let Some(fork_id) = local_node_record.get_fork_id().cloned()
211        && new_record.set_fork_id(fork_id, signer).is_err()
212    {
213        tracing::error!(%new_ip, "Failed to set fork_id in new ENR, aborting IP update");
214        return;
215    }
216    local_node.ip = new_ip;
217    *local_node_record = new_record;
218}
219
220#[derive(Debug, Clone)]
221pub struct Discv5Message {
222    pub(crate) from: SocketAddr,
223    pub(crate) packet: Packet,
224}
225
226impl Discv5Message {
227    pub fn from(packet: Packet, from: SocketAddr) -> Self {
228        Self { from, packet }
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use rand::{SeedableRng, rngs::StdRng};
236
237    fn make_test_state() -> Discv5State {
238        Discv5State::default()
239    }
240
241    #[tokio::test]
242    async fn test_next_nonce_counter() {
243        let mut rng = StdRng::seed_from_u64(7);
244        let mut state = make_test_state();
245
246        let n1 = state.next_nonce(&mut rng);
247        let n2 = state.next_nonce(&mut rng);
248
249        assert_eq!(&n1[..4], &[0, 0, 0, 0]);
250        assert_eq!(&n2[..4], &[0, 0, 0, 1]);
251        assert_ne!(&n1[4..], &n2[4..]);
252    }
253
254    #[tokio::test]
255    async fn test_ip_voting_returns_winning_ip() {
256        let mut state = make_test_state();
257
258        let new_ip: IpAddr = "203.0.113.50".parse().unwrap();
259        let voter1 = H256::from_low_u64_be(1);
260        let voter2 = H256::from_low_u64_be(2);
261        let voter3 = H256::from_low_u64_be(3);
262
263        assert_eq!(state.record_ip_vote(new_ip, voter1), None);
264        assert_eq!(state.record_ip_vote(new_ip, voter2), None);
265        // Third vote triggers round end, returns the winning IP
266        assert_eq!(state.record_ip_vote(new_ip, voter3), Some(new_ip));
267        assert!(state.ip_votes.is_empty());
268    }
269
270    #[tokio::test]
271    async fn test_ip_voting_same_peer_votes_once() {
272        let mut state = make_test_state();
273
274        let new_ip: IpAddr = "203.0.113.50".parse().unwrap();
275        let same_voter = H256::from_low_u64_be(1);
276
277        state.record_ip_vote(new_ip, same_voter);
278        state.record_ip_vote(new_ip, same_voter);
279        state.record_ip_vote(new_ip, same_voter);
280
281        assert_eq!(state.ip_votes.get(&new_ip).map(|v| v.len()), Some(1));
282    }
283
284    #[tokio::test]
285    async fn test_ip_voting_ignores_private_ips() {
286        let mut state = make_test_state();
287
288        let voter1 = H256::from_low_u64_be(1);
289
290        let private_ip: IpAddr = "192.168.1.100".parse().unwrap();
291        state.record_ip_vote(private_ip, voter1);
292        assert!(state.ip_votes.is_empty());
293
294        let loopback: IpAddr = "127.0.0.1".parse().unwrap();
295        state.record_ip_vote(loopback, voter1);
296        assert!(state.ip_votes.is_empty());
297
298        let public_ip: IpAddr = "203.0.113.50".parse().unwrap();
299        state.record_ip_vote(public_ip, voter1);
300        assert_eq!(state.ip_votes.get(&public_ip).map(|v| v.len()), Some(1));
301    }
302
303    #[tokio::test]
304    async fn test_ip_voting_split_votes_no_winner() {
305        let mut state = make_test_state();
306
307        let ip1: IpAddr = "203.0.113.50".parse().unwrap();
308        let ip2: IpAddr = "203.0.113.51".parse().unwrap();
309        let voter1 = H256::from_low_u64_be(1);
310        let voter2 = H256::from_low_u64_be(2);
311        let voter3 = H256::from_low_u64_be(3);
312
313        state.record_ip_vote(ip1, voter1);
314        state.record_ip_vote(ip2, voter2);
315        // ip1 has 2 votes, ip2 has 1 — ip1 wins but only has 2 < threshold 3
316        assert_eq!(state.record_ip_vote(ip1, voter3), None);
317        assert!(state.ip_votes.is_empty());
318        assert!(state.first_ip_vote_round_completed);
319    }
320
321    #[tokio::test]
322    async fn test_ip_vote_cleanup() {
323        let mut state = make_test_state();
324
325        let ip: IpAddr = "203.0.113.50".parse().unwrap();
326        let voter1 = H256::from_low_u64_be(1);
327
328        let mut voters = FxHashSet::default();
329        voters.insert(voter1);
330        state.ip_votes.insert(ip, voters);
331        state.ip_vote_period_start = Some(Instant::now());
332        assert_eq!(state.ip_votes.len(), 1);
333
334        // Cleanup should retain votes (round hasn't timed out yet)
335        assert_eq!(state.cleanup_stale_entries(), None);
336        assert_eq!(state.ip_votes.len(), 1);
337        assert!(!state.first_ip_vote_round_completed);
338    }
339}