Skip to main content

hashiverse_lib/client/peer_tracker/
peer_iterator.rs

1//! # Stateful peer walk by XOR distance
2//!
3//! `PeerIterator` is the execution primitive for every DHT operation: given a target
4//! [`crate::tools::types::Id`] and a
5//! [`crate::client::peer_tracker::peer_tracker::PeerTracker`], hand back peers in order
6//! of decreasing closeness while remembering which ones have already been tried.
7//!
8//! Two knobs tune the walk:
9//!
10//! - **High-watermark** on [`crate::tools::tools::LeadingAgreementBits`] — once `N`
11//!   iterations have passed without finding a peer closer than the best so far, the
12//!   iterator gives up. This is the standard Kademlia "no further progress possible"
13//!   signal.
14//! - **Cache radius** — supplied by
15//!   [`crate::client::caching::cache_radius_tracker`]. Peers closer than the recorded
16//!   cache radius are skipped, because whatever we'd fetch from them is already cached
17//!   further out; fetching again just hammers the closest nodes.
18
19use crate::client::peer_tracker::peer_tracker::PeerTracker;
20use crate::protocol::peer::Peer;
21use crate::tools::tools;
22use crate::tools::tools::LeadingAgreementBits;
23use crate::tools::types::Id;
24use log::warn;
25use std::collections::HashSet;
26
27pub struct PeerIterator<'a> {
28    tracker: &'a mut PeerTracker,
29    bucket_location_id: Id,
30    max_iterations_since_high_watermark: usize,
31    peers_already_queried: HashSet<Id>,
32    high_watermark: LeadingAgreementBits,
33    iterations_since_high_watermark: usize,
34    cache_radius: Option<LeadingAgreementBits>,
35}
36
37impl<'a> PeerIterator<'a> {
38    pub fn new(tracker: &'a mut PeerTracker, bucket_location_id: Id, max_iterations_since_high_watermark: usize, cache_radius: Option<LeadingAgreementBits>) -> Self {
39        Self {
40            tracker,
41            bucket_location_id,
42            max_iterations_since_high_watermark,
43            peers_already_queried: HashSet::new(),
44            high_watermark: 0,
45            iterations_since_high_watermark: 0,
46            cache_radius,
47        }
48    }
49    pub fn next_peer(&mut self) -> Option<(Peer, LeadingAgreementBits)> {
50        loop {
51            let nearest_peer = self
52                .tracker
53                .peers()
54                .iter()
55                .filter(|peer| !self.peers_already_queried.contains(&peer.id))
56                .map(|peer| (peer, tools::leading_agreement_bits_xor(&self.bucket_location_id.0, &peer.id.0)))
57                .filter(|(_, lab)| self.cache_radius.is_none_or(|r| *lab < r))
58                .max_by_key(|peer| peer.1);
59
60            match nearest_peer {
61                Some(nearest_peer) => {
62                    self.peers_already_queried.insert(nearest_peer.0.id);
63
64                    if nearest_peer.1 > self.high_watermark {
65                        self.high_watermark = nearest_peer.1;
66                        self.iterations_since_high_watermark = 0;
67                    }
68                    else {
69                        self.iterations_since_high_watermark += 1;
70                        if self.iterations_since_high_watermark > self.max_iterations_since_high_watermark {
71                            return None;
72                        }
73                    }
74
75                    // Each successful return opens one more ring of closer peers on the next call.
76                    if let Some(r) = &mut self.cache_radius {
77                        *r = (*r + 1).min(256);
78                    }
79
80                    return Some((nearest_peer.0.clone(), nearest_peer.1));
81                }
82                None => {
83                    // No unvisited peer passes the current radius filter.
84                    // If there are no unvisited peers at all, we are done.
85                    let any_unvisited = self.tracker.peers().iter().any(|p| !self.peers_already_queried.contains(&p.id));
86                    if !any_unvisited {
87                        return None;
88                    }
89                    // Allow the next ring of closer peers and retry.
90                    {
91                        let r = &mut self.cache_radius?;
92                        *r = (*r + 1).min(256)
93                    }
94                }
95            }
96        }
97    }
98
99    pub fn iterations_since_high_watermark(&self) -> usize {
100        self.iterations_since_high_watermark
101    }
102
103    pub fn add_peers(&mut self, peers: Vec<Peer>) {
104        for peer in peers {
105            if let Err(e) = self.tracker.add_peer(peer) {
106                warn!("not adding invalid peer: {}", e);
107            }
108        }
109    }
110    pub fn remove_peer(&mut self, peer: &Peer) {
111        self.tracker.remove_peer(peer);
112    }
113}
114
115pub struct ConvergeToLocationVisitResult {
116    pub done: bool,                  // Stop iterating
117    pub peer_unavailable: bool,      // Indicate that this peer has a problem of some sort and should be removed
118    pub peers_discovered: Vec<Peer>, // Supply some newly discovered Peers to add to the iterations
119}
120
121#[async_trait::async_trait]
122pub trait ConvergeToLocationVisitor: Send + Sync {
123    async fn on_peer(&mut self, peer: &Peer) -> anyhow::Result<ConvergeToLocationVisitResult>;
124}
125
126#[cfg(test)]
127mod tests {
128    use crate::client::client_storage::mem_client_storage::MemClientStorage;
129    use crate::client::peer_tracker::peer_tracker::PeerTracker;
130    use crate::tools::buckets::{BUCKET_DURATIONS, BucketLocation, BucketType, generate_bucket_location};
131    use crate::tools::config;
132    use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
133    use crate::tools::runtime_services::RuntimeServices;
134    use crate::tools::server_id::ServerId;
135    use crate::tools::time::{DurationMillis, TimeMillis};
136    use crate::tools::time_provider::time_provider::RealTimeProvider;
137    use crate::tools::types::{Id, Pow};
138    use crate::transport::mem_transport::MemTransportFactory;
139    use std::sync::Arc;
140
141    fn get_test_runtime_services() -> Arc<RuntimeServices> {
142        Arc::new(RuntimeServices {
143            time_provider: Arc::new(RealTimeProvider),
144            transport_factory: MemTransportFactory::default(),
145            pow_generator: Arc::new(SingleThreadedPowGenerator::new()),
146        })
147    }
148
149    #[tokio::test]
150    async fn general_tests() -> anyhow::Result<()> {
151        let runtime_services = RuntimeServices::default_for_testing();
152        let client_storage = MemClientStorage::new().await?;
153        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
154
155        assert!(peer_tracker.is_empty());
156        assert_eq!(0, peer_tracker.len());
157
158        // Dont accept insufficient pow
159        {
160            loop {
161                let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), Pow(config::SERVER_KEY_POW_MIN.0 / 2), true, runtime_services.pow_generator.as_ref()).await?;
162                if server_id.pow >= config::SERVER_KEY_POW_MIN {
163                    continue;
164                }
165                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
166                let result = peer_tracker.add_peer(peer);
167                assert!(result.is_err());
168                assert_eq!(0, peer_tracker.len());
169                break;
170            }
171        }
172
173        // Add an individual
174        {
175            let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
176            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
177            let result = peer_tracker.add_peer(peer);
178            assert!(result.is_ok());
179            assert_eq!(1, peer_tracker.len());
180        }
181
182        // Cant add individual twice
183        {
184            let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
185            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
186            let result = peer_tracker.add_peer(peer.clone());
187            assert!(result.is_ok());
188            assert_eq!(2, peer_tracker.len());
189            let result = peer_tracker.add_peer(peer.clone());
190            assert!(result.is_ok());
191            assert_eq!(2, peer_tracker.len());
192        }
193
194        // Add an individual, then remove it
195        {
196            let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
197            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
198            let result = peer_tracker.add_peer(peer.clone());
199            assert!(result.is_ok());
200            assert_eq!(3, peer_tracker.len());
201            peer_tracker.remove_peer(&peer);
202            assert_eq!(2, peer_tracker.len());
203        }
204
205        // Remove an unknown individual
206        {
207            let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
208            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
209            peer_tracker.remove_peer(&peer);
210            assert_eq!(2, peer_tracker.len());
211        }
212
213        Ok(())
214    }
215
216    #[tokio::test]
217    async fn converge_basics_test() -> anyhow::Result<()> {
218        let runtime_services = get_test_runtime_services();
219        let client_storage = MemClientStorage::new().await?;
220        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
221
222        const NUM_PEERS: usize = 100;
223
224        {
225            for _ in 0..NUM_PEERS {
226                let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
227                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
228                peer_tracker.add_peer(peer)?;
229            }
230            assert_eq!(NUM_PEERS, peer_tracker.len());
231        }
232
233        // Now iterate through them all
234        {
235            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
236            let mut count = 0;
237            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
238            while let Some(_peer) = peer_iter.next_peer() {
239                count += 1;
240            }
241            assert_eq!(NUM_PEERS, count);
242        };
243
244        // Now iterate through them, but bail after the first
245        {
246            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
247            let mut count = 0;
248            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
249            if peer_iter.next_peer().is_some() {
250                count += 1;
251            }
252            assert_eq!(1, count);
253        };
254
255        // Now iterate through them, but delete half of them
256        {
257            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
258            let mut count = 0;
259            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
260            while let Some((peer, _)) = peer_iter.next_peer() {
261                count += 1;
262                if 0 == count % 2 {
263                    peer_iter.remove_peer(&peer);
264                }
265            }
266            assert_eq!(NUM_PEERS, count);
267            assert_eq!(NUM_PEERS / 2, peer_tracker.len());
268        }
269
270        Ok(())
271    }
272
273    #[tokio::test]
274    async fn converge_termination_test() -> anyhow::Result<()> {
275        let runtime_services = get_test_runtime_services();
276        let client_storage = MemClientStorage::new().await?;
277        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
278
279        const NUM_PEERS: usize = 100;
280
281        {
282            for _ in 0..NUM_PEERS {
283                let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
284                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
285                peer_tracker.add_peer(peer)?;
286            }
287            assert_eq!(NUM_PEERS, peer_tracker.len());
288        }
289
290        {
291            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
292            let mut count = 0;
293            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 3, None).await?;
294            while let Some(_peer) = peer_iter.next_peer() {
295                count += 1;
296            }
297            assert_eq!(3 + 1, count);
298        }
299
300        Ok(())
301    }
302
303    #[tokio::test]
304    async fn converge_insertions_test() -> anyhow::Result<()> {
305        let runtime_services = get_test_runtime_services();
306        let client_storage = MemClientStorage::new().await?;
307        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
308
309        const NUM_PEERS: usize = 100;
310
311        {
312            for _ in 0..NUM_PEERS {
313                let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
314                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
315                peer_tracker.add_peer(peer)?;
316            }
317            assert_eq!(NUM_PEERS, peer_tracker.len());
318        }
319
320        // Now iterate through them, but add a few more peers
321        {
322            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
323            let mut count = 0;
324            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
325            while let Some(_peer) = peer_iter.next_peer() {
326                count += 1;
327
328                if 0 == count % 10 {
329                    let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
330                    let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
331                    peer_iter.add_peers(vec![peer]);
332                }
333
334                if 50 == count {
335                    break;
336                }
337            }
338
339            assert_eq!(50, count);
340            assert_eq!(NUM_PEERS + 5, peer_tracker.len());
341        }
342
343        Ok(())
344    }
345
346    #[tokio::test]
347    async fn converge_targeting_test() -> anyhow::Result<()> {
348        let runtime_services = get_test_runtime_services();
349        let client_storage = MemClientStorage::new().await?;
350        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
351
352        const NUM_PEERS: usize = 100;
353
354        {
355            for _ in 0..NUM_PEERS {
356                let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
357                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
358                peer_tracker.add_peer(peer)?;
359            }
360            assert_eq!(NUM_PEERS, peer_tracker.len());
361        }
362
363        // This is the peer we are actually targeting
364        let target_server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
365        let target_peer = target_server_id.to_peer(runtime_services.time_provider.as_ref())?;
366
367        {
368            const PEER_DISCOVERY_I: usize = 37usize;
369            const PEER_DISCOVERY_I_PLUS_1: usize = PEER_DISCOVERY_I + 1;
370
371            let bucket_location = {
372                let mut location_id = target_peer.id;
373                for i in 10..31 {
374                    location_id.0[i] = 0u8;
375                }
376                BucketLocation {
377                    bucket_type: BucketType::User,
378                    base_id: location_id,
379                    duration: DurationMillis::zero(),
380                    bucket_time_millis: TimeMillis::zero(),
381                    location_id,
382                }
383            };
384
385            let mut count = 0;
386            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
387            while let Some((peer, _)) = peer_iter.next_peer() {
388                count += 1;
389                match count {
390                    PEER_DISCOVERY_I => {
391                        peer_iter.add_peers(vec![target_peer.clone()]);
392                    }
393                    PEER_DISCOVERY_I_PLUS_1 => {
394                        if peer.id != target_peer.id {
395                            anyhow::bail!("peer is not the one we expected");
396                        }
397                        break;
398                    }
399                    _ => {}
400                }
401            }
402
403            assert_eq!(PEER_DISCOVERY_I_PLUS_1, count);
404            assert_eq!(NUM_PEERS + 1, peer_tracker.len());
405        }
406
407        Ok(())
408    }
409
410    /// Verify that `cache_radius` starts by skipping peers inside the radius, then opens up
411    /// one ring per step so that closer peers are eventually visited too.
412    #[tokio::test]
413    async fn converge_cache_radius_test() -> anyhow::Result<()> {
414        let runtime_services = get_test_runtime_services();
415        let client_storage = MemClientStorage::new().await?;
416        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
417
418        let location_id = Id::zero();
419
420        let make_peer_with_lab = |lab_bits: usize| -> anyhow::Result<crate::protocol::peer::Peer> {
421            let mut id_bytes = [0u8; 32];
422            let byte_idx = lab_bits / 8;
423            let bit_idx = 7 - (lab_bits % 8);
424            id_bytes[byte_idx] = 1u8 << bit_idx;
425            let id = Id(id_bytes);
426            let _ = id;
427            anyhow::bail!("use direct ServerId below")
428        };
429        let _ = make_peer_with_lab;
430
431        const NUM_PEERS: usize = 100;
432        let mut labs_added: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
433        for _ in 0..NUM_PEERS {
434            let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
435            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
436            let lab = crate::tools::tools::leading_agreement_bits_xor(&location_id.0, &peer.id.0);
437            labs_added.push(lab);
438            peer_tracker.add_peer(peer)?;
439        }
440        assert_eq!(NUM_PEERS, peer_tracker.len());
441
442        let mut sorted_labs = labs_added.clone();
443        sorted_labs.sort();
444        let cache_radius = sorted_labs[NUM_PEERS / 2];
445
446        let mut labs_visited: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
447        let mut peer_iter = peer_tracker.iterate_to_location(location_id, usize::MAX, Some(cache_radius)).await?;
448        while let Some((_, lab)) = peer_iter.next_peer() {
449            labs_visited.push(lab);
450        }
451
452        assert_eq!(NUM_PEERS, labs_visited.len(), "all peers should be visited");
453
454        let has_outside_peers = labs_added.iter().any(|&lab| lab < cache_radius);
455        if has_outside_peers {
456            assert!(labs_visited[0] < cache_radius, "first peer should be outside the initial cache zone, got lab={} cache_radius={}", labs_visited[0], cache_radius);
457        }
458
459        Ok(())
460    }
461}