1use crate::discovery::lookup::IterativeLookup;
2use crate::discv5::messages::Message;
3use crate::{
4 discv5::messages::Packet,
5 types::{Node, NodeRecord},
6};
7use ethrex_common::H256;
8use lru::LruCache;
9use rand::RngCore;
10use rustc_hash::{FxHashMap, FxHashSet};
11use std::{
12 net::{IpAddr, SocketAddr},
13 num::NonZero,
14 time::{Duration, Instant},
15};
16use tracing::trace;
17
18pub const MAX_WHOAREYOU_RATE_LIMIT_ENTRIES: usize = 10_000;
20const IP_VOTE_WINDOW: Duration = Duration::from_secs(300);
22const IP_VOTE_THRESHOLD: usize = 3;
24const MESSAGE_CACHE_TIMEOUT: Duration = Duration::from_secs(2);
26
27#[derive(Debug)]
29pub struct Discv5State {
30 pub counter: u32,
32 pub pending_by_nonce: FxHashMap<[u8; 12], (Node, Message, Instant)>,
34 pub pending_challenges: FxHashMap<H256, (Vec<u8>, Instant, Vec<u8>)>,
37 pub whoareyou_rate_limit: LruCache<(IpAddr, H256), Instant>,
39 pub whoareyou_global_count: u32,
41 pub whoareyou_global_window_start: Instant,
43 pub session_ips: FxHashMap<H256, IpAddr>,
45 pub ip_votes: FxHashMap<IpAddr, FxHashSet<H256>>,
47 pub ip_vote_period_start: Option<Instant>,
49 pub first_ip_vote_round_completed: bool,
51 pub active_lookups: Vec<IterativeLookup>,
53}
54
55impl Default for Discv5State {
56 fn default() -> Self {
57 Self {
58 counter: 0,
59 pending_by_nonce: Default::default(),
60 pending_challenges: Default::default(),
61 whoareyou_rate_limit: LruCache::new(
62 NonZero::new(MAX_WHOAREYOU_RATE_LIMIT_ENTRIES)
63 .expect("MAX_WHOAREYOU_RATE_LIMIT_ENTRIES must be non-zero"),
64 ),
65 whoareyou_global_count: 0,
66 whoareyou_global_window_start: Instant::now(),
67 session_ips: Default::default(),
68 ip_votes: Default::default(),
69 ip_vote_period_start: None,
70 first_ip_vote_round_completed: false,
71 active_lookups: Vec::new(),
72 }
73 }
74}
75
76impl Discv5State {
77 pub fn next_nonce<R: RngCore>(&mut self, rng: &mut R) -> [u8; 12] {
81 let counter = self.counter;
82 self.counter = self.counter.wrapping_add(1);
83
84 let mut nonce = [0u8; 12];
85 nonce[..4].copy_from_slice(&counter.to_be_bytes());
86 rng.fill_bytes(&mut nonce[4..]);
87 nonce
88 }
89
90 pub fn cleanup_stale_entries(&mut self) -> Option<IpAddr> {
93 let now = Instant::now();
94
95 let before_messages = self.pending_by_nonce.len();
96 self.pending_by_nonce
97 .retain(|_nonce, (_node, _message, timestamp)| {
98 now.duration_since(*timestamp) < MESSAGE_CACHE_TIMEOUT
99 });
100 let removed_messages = before_messages - self.pending_by_nonce.len();
101
102 let before_challenges = self.pending_challenges.len();
103 self.pending_challenges
104 .retain(|_src_id, (_challenge_data, timestamp, _raw)| {
105 now.duration_since(*timestamp) < MESSAGE_CACHE_TIMEOUT
106 });
107 let removed_challenges = before_challenges - self.pending_challenges.len();
108
109 let total_removed = removed_messages + removed_challenges;
110 if total_removed > 0 {
111 trace!(
112 protocol = "discv5",
113 "Cleaned up {} stale entries ({} messages, {} challenges)",
114 total_removed,
115 removed_messages,
116 removed_challenges,
117 );
118 }
119
120 if let Some(start) = self.ip_vote_period_start
121 && now.duration_since(start) >= IP_VOTE_WINDOW
122 {
123 return self.finalize_ip_vote_round();
124 }
125 None
126 }
127
128 pub fn record_ip_vote(&mut self, reported_ip: IpAddr, voter_id: H256) -> Option<IpAddr> {
131 if Self::is_private_ip(reported_ip) {
132 return None;
133 }
134
135 let now = Instant::now();
136
137 if self.ip_vote_period_start.is_none() {
138 self.ip_vote_period_start = Some(now);
139 }
140
141 self.ip_votes
142 .entry(reported_ip)
143 .or_default()
144 .insert(voter_id);
145
146 let total_votes: usize = self.ip_votes.values().map(|v| v.len()).sum();
147 let round_ended = if !self.first_ip_vote_round_completed {
148 total_votes >= IP_VOTE_THRESHOLD
149 } else {
150 self.ip_vote_period_start
151 .is_some_and(|start| now.duration_since(start) >= IP_VOTE_WINDOW)
152 };
153
154 if round_ended {
155 return self.finalize_ip_vote_round();
156 }
157 None
158 }
159
160 fn finalize_ip_vote_round(&mut self) -> Option<IpAddr> {
163 let winner = self
164 .ip_votes
165 .iter()
166 .map(|(ip, voters)| (*ip, voters.len()))
167 .max_by_key(|(_, count)| *count);
168
169 let result = winner.and_then(|(winning_ip, vote_count)| {
170 (vote_count >= IP_VOTE_THRESHOLD).then_some(winning_ip)
171 });
172
173 self.ip_votes.clear();
174 self.ip_vote_period_start = Some(Instant::now());
175 self.first_ip_vote_round_completed = true;
176
177 result
178 }
179
180 pub fn is_private_ip(ip: IpAddr) -> bool {
182 match ip {
183 IpAddr::V4(v4) => v4.is_private() || v4.is_loopback() || v4.is_link_local(),
184 IpAddr::V6(v6) => {
185 v6.is_loopback()
186 || v6.is_unspecified()
187 || (v6.segments()[0] & 0xfe00) == 0xfc00
189 || (v6.segments()[0] & 0xffc0) == 0xfe80
191 }
192 }
193 }
194}
195
196pub(crate) fn update_local_ip(
198 local_node: &mut Node,
199 local_node_record: &mut NodeRecord,
200 signer: &secp256k1::SecretKey,
201 new_ip: IpAddr,
202) {
203 let mut updated_node = local_node.clone();
204 updated_node.ip = new_ip;
205 let new_seq = local_node_record.seq + 1;
206 let Ok(mut new_record) = NodeRecord::from_node(&updated_node, new_seq, signer) else {
207 tracing::error!(%new_ip, "Failed to create new ENR for IP update");
208 return;
209 };
210 if let Some(fork_id) = local_node_record.get_fork_id().cloned()
211 && new_record.set_fork_id(fork_id, signer).is_err()
212 {
213 tracing::error!(%new_ip, "Failed to set fork_id in new ENR, aborting IP update");
214 return;
215 }
216 local_node.ip = new_ip;
217 *local_node_record = new_record;
218}
219
220#[derive(Debug, Clone)]
221pub struct Discv5Message {
222 pub(crate) from: SocketAddr,
223 pub(crate) packet: Packet,
224}
225
226impl Discv5Message {
227 pub fn from(packet: Packet, from: SocketAddr) -> Self {
228 Self { from, packet }
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use rand::{SeedableRng, rngs::StdRng};
236
237 fn make_test_state() -> Discv5State {
238 Discv5State::default()
239 }
240
241 #[tokio::test]
242 async fn test_next_nonce_counter() {
243 let mut rng = StdRng::seed_from_u64(7);
244 let mut state = make_test_state();
245
246 let n1 = state.next_nonce(&mut rng);
247 let n2 = state.next_nonce(&mut rng);
248
249 assert_eq!(&n1[..4], &[0, 0, 0, 0]);
250 assert_eq!(&n2[..4], &[0, 0, 0, 1]);
251 assert_ne!(&n1[4..], &n2[4..]);
252 }
253
254 #[tokio::test]
255 async fn test_ip_voting_returns_winning_ip() {
256 let mut state = make_test_state();
257
258 let new_ip: IpAddr = "203.0.113.50".parse().unwrap();
259 let voter1 = H256::from_low_u64_be(1);
260 let voter2 = H256::from_low_u64_be(2);
261 let voter3 = H256::from_low_u64_be(3);
262
263 assert_eq!(state.record_ip_vote(new_ip, voter1), None);
264 assert_eq!(state.record_ip_vote(new_ip, voter2), None);
265 assert_eq!(state.record_ip_vote(new_ip, voter3), Some(new_ip));
267 assert!(state.ip_votes.is_empty());
268 }
269
270 #[tokio::test]
271 async fn test_ip_voting_same_peer_votes_once() {
272 let mut state = make_test_state();
273
274 let new_ip: IpAddr = "203.0.113.50".parse().unwrap();
275 let same_voter = H256::from_low_u64_be(1);
276
277 state.record_ip_vote(new_ip, same_voter);
278 state.record_ip_vote(new_ip, same_voter);
279 state.record_ip_vote(new_ip, same_voter);
280
281 assert_eq!(state.ip_votes.get(&new_ip).map(|v| v.len()), Some(1));
282 }
283
284 #[tokio::test]
285 async fn test_ip_voting_ignores_private_ips() {
286 let mut state = make_test_state();
287
288 let voter1 = H256::from_low_u64_be(1);
289
290 let private_ip: IpAddr = "192.168.1.100".parse().unwrap();
291 state.record_ip_vote(private_ip, voter1);
292 assert!(state.ip_votes.is_empty());
293
294 let loopback: IpAddr = "127.0.0.1".parse().unwrap();
295 state.record_ip_vote(loopback, voter1);
296 assert!(state.ip_votes.is_empty());
297
298 let public_ip: IpAddr = "203.0.113.50".parse().unwrap();
299 state.record_ip_vote(public_ip, voter1);
300 assert_eq!(state.ip_votes.get(&public_ip).map(|v| v.len()), Some(1));
301 }
302
303 #[tokio::test]
304 async fn test_ip_voting_split_votes_no_winner() {
305 let mut state = make_test_state();
306
307 let ip1: IpAddr = "203.0.113.50".parse().unwrap();
308 let ip2: IpAddr = "203.0.113.51".parse().unwrap();
309 let voter1 = H256::from_low_u64_be(1);
310 let voter2 = H256::from_low_u64_be(2);
311 let voter3 = H256::from_low_u64_be(3);
312
313 state.record_ip_vote(ip1, voter1);
314 state.record_ip_vote(ip2, voter2);
315 assert_eq!(state.record_ip_vote(ip1, voter3), None);
317 assert!(state.ip_votes.is_empty());
318 assert!(state.first_ip_vote_round_completed);
319 }
320
321 #[tokio::test]
322 async fn test_ip_vote_cleanup() {
323 let mut state = make_test_state();
324
325 let ip: IpAddr = "203.0.113.50".parse().unwrap();
326 let voter1 = H256::from_low_u64_be(1);
327
328 let mut voters = FxHashSet::default();
329 voters.insert(voter1);
330 state.ip_votes.insert(ip, voters);
331 state.ip_vote_period_start = Some(Instant::now());
332 assert_eq!(state.ip_votes.len(), 1);
333
334 assert_eq!(state.cleanup_stale_entries(), None);
336 assert_eq!(state.ip_votes.len(), 1);
337 assert!(!state.first_ip_vote_round_completed);
338 }
339}