Skip to main content

ethrex_p2p/discovery/
lookup.rs

1use crate::peer_table::xor_distance;
2use crate::types::Node;
3use ethrex_common::H256;
4use rustc_hash::FxHashSet;
5use std::time::{Duration, Instant};
6
7/// Number of concurrent queries per iteration round.
8pub const LOOKUP_ALPHA: usize = 3;
9/// Maximum entries in the result set (Kademlia k parameter).
10pub const LOOKUP_BUCKET_SIZE: usize = 16;
11/// Maximum duration before a lookup is considered timed out.
12pub const LOOKUP_TIMEOUT: Duration = Duration::from_secs(20);
13
14/// A single entry in the iterative lookup result set.
15#[derive(Debug, Clone)]
16pub struct LookupEntry {
17    pub node_id: H256,
18    pub node: Node,
19    pub distance: H256,
20    pub queried: bool,
21}
22
23/// Iterative convergence lookup (geth-style).
24///
25/// Generates a random target, seeds with closest known nodes, queries alpha=3
26/// closest not-yet-asked nodes, feeds responses back, and iterates until
27/// convergence (no more unqueried entries closer than what we have, or timeout).
28#[derive(Debug)]
29pub struct IterativeLookup {
30    pub target: H256,
31    result: Vec<LookupEntry>,
32    seen: FxHashSet<H256>,
33    queries_in_flight: usize,
34    started_at: Instant,
35}
36
37impl IterativeLookup {
38    /// Create a new iterative lookup seeded with the given nodes.
39    pub fn new(target: H256, seed_nodes: Vec<(H256, Node)>) -> Self {
40        let mut seen = FxHashSet::default();
41        let mut result: Vec<LookupEntry> = Vec::with_capacity(LOOKUP_BUCKET_SIZE);
42
43        for (node_id, node) in seed_nodes {
44            if seen.insert(node_id) {
45                let distance = xor_distance(&target, &node_id);
46                result.push(LookupEntry {
47                    node_id,
48                    node,
49                    distance,
50                    queried: false,
51                });
52            }
53        }
54
55        // Sort by distance (ascending) and truncate to bucket size
56        result.sort_by(|a, b| a.distance.cmp(&b.distance));
57        result.truncate(LOOKUP_BUCKET_SIZE);
58
59        Self {
60            target,
61            result,
62            seen,
63            queries_in_flight: 0,
64            started_at: Instant::now(),
65        }
66    }
67
68    /// Returns up to `count` closest unqueried entries, marks them as queried,
69    /// and increments the in-flight counter.
70    pub fn next_to_query(&mut self, count: usize) -> Vec<(H256, Node)> {
71        let mut out = Vec::with_capacity(count);
72        for entry in &mut self.result {
73            if out.len() >= count {
74                break;
75            }
76            if !entry.queried {
77                entry.queried = true;
78                self.queries_in_flight += 1;
79                out.push((entry.node_id, entry.node.clone()));
80            }
81        }
82        out
83    }
84
85    /// Feed response nodes into the lookup. Inserts new nodes if they are
86    /// closer than the farthest entry (or the result set is not full yet).
87    /// Deduplicates via the `seen` set.
88    pub fn feed_results(&mut self, nodes: Vec<(H256, Node)>) {
89        for (node_id, node) in nodes {
90            if !self.seen.insert(node_id) {
91                continue;
92            }
93            let distance = xor_distance(&self.target, &node_id);
94
95            if self.result.len() < LOOKUP_BUCKET_SIZE {
96                self.result.push(LookupEntry {
97                    node_id,
98                    node,
99                    distance,
100                    queried: false,
101                });
102            } else if let Some(farthest) = self.result.last()
103                && distance < farthest.distance
104            {
105                // Replace the farthest entry
106                let last_idx = self.result.len() - 1;
107                self.result[last_idx] = LookupEntry {
108                    node_id,
109                    node,
110                    distance,
111                    queried: false,
112                };
113            } else {
114                continue;
115            }
116
117            // Re-sort after insertion
118            self.result.sort_by(|a, b| a.distance.cmp(&b.distance));
119        }
120    }
121
122    /// Record that a response was received (decrements in-flight counter).
123    pub fn record_response(&mut self) {
124        self.queries_in_flight = self.queries_in_flight.saturating_sub(1);
125    }
126
127    /// Record that a query timed out (same as record_response).
128    pub fn record_timeout(&mut self) {
129        self.queries_in_flight = self.queries_in_flight.saturating_sub(1);
130    }
131
132    /// Returns true if the lookup has converged:
133    /// - All entries in the result set have been queried (don't wait for
134    ///   stragglers — late responses still get processed via handle_neighbors
135    ///   and feed into the connection pool / next lookup), OR
136    /// - The lookup has timed out.
137    pub fn is_finished(&self) -> bool {
138        if self.started_at.elapsed() >= LOOKUP_TIMEOUT {
139            return true;
140        }
141        !self.result.iter().any(|e| !e.queried)
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use ethrex_common::H512;
149    use std::net::{IpAddr, Ipv4Addr};
150
151    fn make_node(seed: u8) -> (H256, Node) {
152        let pk = H512::from_low_u64_be(seed as u64 + 1);
153        let node = Node::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, seed)), 30303, 30303, pk);
154        (node.node_id(), node)
155    }
156
157    #[test]
158    fn new_sorts_by_distance_and_truncates() {
159        let target = H256::zero();
160        let seeds: Vec<_> = (1..=20).map(make_node).collect();
161        let lookup = IterativeLookup::new(target, seeds);
162
163        assert!(lookup.result.len() <= LOOKUP_BUCKET_SIZE);
164        for w in lookup.result.windows(2) {
165            assert!(w[0].distance <= w[1].distance);
166        }
167    }
168
169    #[test]
170    fn next_to_query_returns_alpha_entries() {
171        let target = H256::zero();
172        let seeds: Vec<_> = (1..=10).map(make_node).collect();
173        let mut lookup = IterativeLookup::new(target, seeds);
174
175        let batch = lookup.next_to_query(LOOKUP_ALPHA);
176        assert_eq!(batch.len(), LOOKUP_ALPHA);
177        assert_eq!(lookup.queries_in_flight, LOOKUP_ALPHA);
178    }
179
180    #[test]
181    fn feed_results_deduplicates() {
182        let target = H256::zero();
183        let seeds: Vec<_> = (1..=3).map(make_node).collect();
184        let mut lookup = IterativeLookup::new(target, seeds.clone());
185
186        let initial_len = lookup.result.len();
187        // Feed the same nodes again
188        lookup.feed_results(seeds);
189        assert_eq!(lookup.result.len(), initial_len);
190    }
191
192    #[test]
193    fn is_finished_when_all_queried() {
194        let target = H256::zero();
195        let seeds: Vec<_> = (1..=2).map(make_node).collect();
196        let mut lookup = IterativeLookup::new(target, seeds);
197
198        assert!(!lookup.is_finished());
199
200        let _ = lookup.next_to_query(10);
201        // Finished once all entries are queried (don't wait for in-flight)
202        assert!(lookup.is_finished());
203    }
204}