Skip to main content

mainline/rpc/
closest_nodes.rs

1use std::{collections::HashSet, convert::TryInto};
2
3use crate::{common::MAX_BUCKET_SIZE_K, Id, Node};
4
5#[derive(Debug, Clone)]
6/// Manage closest nodes found in a query.
7///
8/// Useful to estimate the Dht size.
9pub struct ClosestNodes {
10    target: Id,
11    nodes: Vec<Node>,
12}
13
14impl ClosestNodes {
15    /// Create a new instance of [ClosestNodes].
16    pub fn new(target: Id) -> Self {
17        Self {
18            target,
19            nodes: Vec::with_capacity(200),
20        }
21    }
22
23    // === Getters ===
24
25    /// Returns the target of the query for these closest nodes.
26    pub fn target(&self) -> Id {
27        self.target
28    }
29
30    /// Returns a slice of the nodes array.
31    pub fn nodes(&self) -> &[Node] {
32        &self.nodes
33    }
34
35    /// Returns the number of nodes.
36    pub fn len(&self) -> usize {
37        self.nodes.len()
38    }
39
40    /// Returns true if there are no nodes.
41    pub fn is_empty(&self) -> bool {
42        self.nodes.is_empty()
43    }
44
45    // === Public Methods ===
46
47    /// Add a node.
48    pub fn add(&mut self, node: Node) {
49        let seek = node.id().xor(&self.target);
50
51        if node.already_exists(&self.nodes) {
52            return;
53        }
54
55        if let Err(pos) = self.nodes.binary_search_by(|prope| {
56            if prope.is_secure() && !node.is_secure() {
57                std::cmp::Ordering::Less
58            } else if !prope.is_secure() && node.is_secure() {
59                std::cmp::Ordering::Greater
60            } else if prope.id() == node.id() {
61                std::cmp::Ordering::Equal
62            } else {
63                prope.id().xor(&self.target).cmp(&seek)
64            }
65        }) {
66            self.nodes.insert(pos, node)
67        }
68    }
69
70    /// Take enough nodes closest to the target, until the following are satisfied:
71    /// 1. At least the closest `k` nodes (20).
72    /// 2. The last node should be at a distance `edk` which is the expected distance of the 20th
73    ///    node given previous estimations of the DHT size.
74    /// 3. The number of subnets with unique 6 bits prefix in nodes ipv4 addresses match or exceeds
75    ///    the average from previous queries.
76    ///
77    /// If one or more of these conditions are not met, then we just take all responding nodes
78    /// and store data at them.
79    pub fn take_until_secure(
80        &self,
81        previous_dht_size_estimate: usize,
82        average_subnets: usize,
83    ) -> &[Node] {
84        let mut until_secure = 0;
85
86        // 20 / dht_size_estimate == expected_dk / ID space
87        // so expected_dk = 20 * ID space / dht_size_estimate
88        let expected_dk =
89            (20.0 * u128::MAX as f64 / (previous_dht_size_estimate as f64 + 1.0)) as u128;
90
91        let mut subnets = HashSet::new();
92
93        for node in &self.nodes {
94            let distance = distance(&self.target, node);
95
96            subnets.insert(subnet(node));
97
98            if distance >= expected_dk && subnets.len() >= average_subnets {
99                break;
100            }
101
102            until_secure += 1;
103        }
104
105        &self.nodes[0..until_secure.max(MAX_BUCKET_SIZE_K).min(self.nodes().len())]
106    }
107
108    /// Count the number of subnets with unique 6 bits prefix in ipv4
109    pub fn subnets_count(&self) -> u8 {
110        if self.nodes.is_empty() {
111            return 20;
112        }
113
114        let mut subnets = HashSet::new();
115
116        for node in self.nodes.iter().take(MAX_BUCKET_SIZE_K) {
117            subnets.insert(subnet(node));
118        }
119
120        subnets.len() as u8
121    }
122
123    /// An estimation of the Dht from the distribution of closest nodes
124    /// responding to a query.
125    ///
126    /// [Read more](https://github.com/pubky/mainline/blob/main/docs/dht_size_estimate.md)
127    pub fn dht_size_estimate(&self) -> f64 {
128        dht_size_estimate(
129            self.nodes
130                .iter()
131                .take(MAX_BUCKET_SIZE_K)
132                .map(|node| distance(&self.target, node)),
133        )
134    }
135}
136
137fn subnet(node: &Node) -> u8 {
138    ((node.address().ip().to_bits() >> 26) & 0b0011_1111) as u8
139}
140
141fn distance(target: &Id, node: &Node) -> u128 {
142    let xor = node.id().xor(target);
143
144    // Round up the lower 4 bytes to get a u128 from u160.
145    u128::from_be_bytes(xor.as_bytes()[0..16].try_into().expect("infallible"))
146}
147
148fn dht_size_estimate<I>(distances: I) -> f64
149where
150    I: IntoIterator<Item = u128>,
151{
152    let mut sum = 0.0;
153    let mut count = 0;
154
155    // Ignoring the first node, as that gives the best result in simulations.
156    for distance in distances {
157        count += 1;
158
159        sum += count as f64 * distance as f64;
160    }
161
162    if count == 0 {
163        return 0.0;
164    }
165
166    let lsq_constant = (count * (count + 1) * (2 * count + 1) / 6) as f64;
167
168    lsq_constant * u128::MAX as f64 / sum
169}
170
171#[cfg(test)]
172mod tests {
173    use std::{collections::BTreeMap, net::SocketAddrV4, str::FromStr, sync::Arc, time::Instant};
174
175    use crate::common::NodeInner;
176
177    use super::*;
178
179    #[test]
180    fn add_sorted_by_id() {
181        let target = Id::random();
182
183        let mut closest_nodes = ClosestNodes::new(target);
184
185        for i in 0..100 {
186            let node = Node::unique(i);
187            closest_nodes.add(node.clone());
188            closest_nodes.add(node);
189        }
190
191        assert_eq!(closest_nodes.nodes().len(), 100);
192
193        let distances = closest_nodes
194            .nodes()
195            .iter()
196            .map(|n| n.id().distance(&target))
197            .collect::<Vec<_>>();
198
199        let mut sorted = distances.clone();
200        sorted.sort();
201
202        assert_eq!(sorted, distances);
203    }
204
205    #[test]
206    fn order_by_secure_id() {
207        let unsecure = Node::random();
208        let secure = Node(Arc::new(NodeInner {
209            id: Id::from_str("5a3ce9c14e7a08645677bbd1cfe7d8f956d53256").unwrap(),
210            address: SocketAddrV4::new([21, 75, 31, 124].into(), 0),
211            token: None,
212            last_seen: Instant::now(),
213        }));
214
215        let mut closest_nodes = ClosestNodes::new(*unsecure.id());
216
217        closest_nodes.add(unsecure.clone());
218        closest_nodes.add(secure.clone());
219
220        assert_eq!(closest_nodes.nodes(), vec![secure, unsecure])
221    }
222
223    #[test]
224    fn take_until_expected_distance_to_20th_node() {
225        let target = Id::random();
226        let dht_size_estimate = 200;
227
228        let mut closest_nodes = ClosestNodes::new(target);
229
230        let target_bytes = target.as_bytes();
231
232        for i in 0..dht_size_estimate {
233            let node = Node::unique(i);
234            closest_nodes.add(node);
235        }
236
237        let mut sybil = ClosestNodes::new(target);
238
239        for _ in 0..20 {
240            let mut bytes = target_bytes.to_vec();
241            bytes[18..].copy_from_slice(&Id::random().as_bytes()[18..]);
242            let node = Node::new(Id::random(), SocketAddrV4::new(0.into(), 0));
243
244            sybil.add(node.clone());
245            closest_nodes.add(node);
246        }
247
248        let closest = closest_nodes.take_until_secure(dht_size_estimate, 0);
249
250        assert!((closest.len() - sybil.nodes().len()) > 10);
251    }
252
253    #[test]
254    fn simulation() {
255        let lookups = 4;
256        let acceptable_margin = 0.2;
257        let sims = 10;
258        let dht_size = 2500_f64;
259
260        let mean = (0..sims)
261            .map(|_| simulate(dht_size as usize, lookups) as f64)
262            .sum::<f64>()
263            / (sims as f64);
264
265        let margin = (mean - dht_size).abs() / dht_size;
266
267        assert!(margin <= acceptable_margin);
268    }
269
270    fn simulate(dht_size: usize, lookups: usize) -> usize {
271        let mut nodes = BTreeMap::new();
272        for i in 0..dht_size {
273            let node = Node::unique(i);
274            nodes.insert(*node.id(), node);
275        }
276
277        (0..lookups)
278            .map(|_| {
279                let target = Id::random();
280
281                let mut closest_nodes = ClosestNodes::new(target);
282
283                for (_, node) in nodes.range(target..).take(100) {
284                    closest_nodes.add(node.clone())
285                }
286                for (_, node) in nodes.range(..target).rev().take(100) {
287                    closest_nodes.add(node.clone())
288                }
289
290                let estimate = closest_nodes.dht_size_estimate();
291
292                estimate as usize
293            })
294            .sum::<usize>()
295            / lookups
296    }
297}