Skip to main content

hashiverse_lib/client/peer_tracker/
peer_iterator.rs

1//! # Stateful peer walk by XOR distance
2//!
3//! `PeerIterator` is the execution primitive for every DHT operation: given a target
4//! [`crate::tools::types::Id`] and a
5//! [`crate::client::peer_tracker::peer_tracker::PeerTracker`], hand back peers in order
6//! of decreasing closeness while remembering which ones have already been tried.
7//!
8//! Two knobs tune the walk:
9//!
10//! - **High-watermark** on [`crate::tools::tools::LeadingAgreementBits`] — once `N`
11//!   iterations have passed without finding a peer closer than the best so far, the
12//!   iterator gives up. This is the standard Kademlia "no further progress possible"
13//!   signal.
14//! - **Cache radius** — supplied by
15//!   [`crate::client::caching::cache_radius_tracker`]. Peers closer than the recorded
16//!   cache radius are skipped, because whatever we'd fetch from them is already cached
17//!   further out; fetching again just hammers the closest nodes.
18
19use crate::client::peer_tracker::peer_tracker::PeerTracker;
20use crate::protocol::peer::Peer;
21use crate::tools::tools;
22use crate::tools::tools::LeadingAgreementBits;
23use crate::tools::types::Id;
24use log::warn;
25use std::collections::HashSet;
26
27pub struct PeerIterator<'a> {
28    tracker: &'a mut PeerTracker,
29    bucket_location_id: Id,
30    max_iterations_since_high_watermark: usize,
31    peers_already_queried: HashSet<Id>,
32    high_watermark: LeadingAgreementBits,
33    iterations_since_high_watermark: usize,
34    cache_radius: Option<LeadingAgreementBits>,
35}
36
37impl<'a> PeerIterator<'a> {
38    pub fn new(tracker: &'a mut PeerTracker, bucket_location_id: Id, max_iterations_since_high_watermark: usize, cache_radius: Option<LeadingAgreementBits>) -> Self {
39        Self {
40            tracker,
41            bucket_location_id,
42            max_iterations_since_high_watermark,
43            peers_already_queried: HashSet::new(),
44            high_watermark: 0,
45            iterations_since_high_watermark: 0,
46            cache_radius,
47        }
48    }
49    pub fn next_peer(&mut self) -> Option<(Peer, LeadingAgreementBits)> {
50        loop {
51            let nearest_peer = self
52                .tracker
53                .peers()
54                .iter()
55                .filter(|peer| !self.peers_already_queried.contains(&peer.id))
56                .map(|peer| (peer, tools::leading_agreement_bits_xor(&self.bucket_location_id.0, &peer.id.0)))
57                .filter(|(_, lab)| self.cache_radius.is_none_or(|r| *lab < r))
58                .max_by_key(|peer| peer.1);
59
60            match nearest_peer {
61                Some(nearest_peer) => {
62                    self.peers_already_queried.insert(nearest_peer.0.id);
63
64                    if nearest_peer.1 > self.high_watermark {
65                        self.high_watermark = nearest_peer.1;
66                        self.iterations_since_high_watermark = 0;
67                    }
68                    else {
69                        self.iterations_since_high_watermark += 1;
70                        if self.iterations_since_high_watermark > self.max_iterations_since_high_watermark {
71                            return None;
72                        }
73                    }
74
75                    // Each successful return opens one more ring of closer peers on the next call.
76                    if let Some(r) = &mut self.cache_radius {
77                        *r = (*r + 1).min(256);
78                    }
79
80                    return Some((nearest_peer.0.clone(), nearest_peer.1));
81                }
82                None => {
83                    // No unvisited peer passes the current radius filter.
84                    // If there are no unvisited peers at all, we are done.
85                    let any_unvisited = self.tracker.peers().iter().any(|p| !self.peers_already_queried.contains(&p.id));
86                    if !any_unvisited {
87                        return None;
88                    }
89                    // Allow the next ring of closer peers and retry.
90                    //
91                    // NOTE: must take `&mut` via `as_mut()`, NOT `&mut self.cache_radius?`.
92                    // The latter applies `?` first, which copies the inner `i32` out (because
93                    // `LeadingAgreementBits: Copy`), so `&mut` then borrows a stack temporary
94                    // and the mutation never reaches `self.cache_radius`. That regression
95                    // (introduced in commit c1e734e) caused an infinite loop whenever the
96                    // initial cache_radius was 0.
97                    let r = self.cache_radius.as_mut()?;
98                    *r = (*r + 1).min(256);
99                }
100            }
101        }
102    }
103
104    pub fn iterations_since_high_watermark(&self) -> usize {
105        self.iterations_since_high_watermark
106    }
107
108    pub fn add_peers(&mut self, peers: Vec<Peer>) {
109        for peer in peers {
110            if let Err(e) = self.tracker.add_peer(peer) {
111                warn!("not adding invalid peer: {}", e);
112            }
113        }
114    }
115    pub fn remove_peer(&mut self, peer: &Peer) {
116        self.tracker.remove_peer(peer);
117    }
118}
119
120pub struct ConvergeToLocationVisitResult {
121    pub done: bool,                  // Stop iterating
122    pub peer_unavailable: bool,      // Indicate that this peer has a problem of some sort and should be removed
123    pub peers_discovered: Vec<Peer>, // Supply some newly discovered Peers to add to the iterations
124}
125
126#[async_trait::async_trait]
127pub trait ConvergeToLocationVisitor: Send + Sync {
128    async fn on_peer(&mut self, peer: &Peer) -> anyhow::Result<ConvergeToLocationVisitResult>;
129}
130
131#[cfg(test)]
132mod tests {
133    use crate::client::client_storage::mem_client_storage::MemClientStorage;
134    use crate::client::peer_tracker::peer_tracker::PeerTracker;
135    use crate::tools::buckets::{BUCKET_DURATIONS, BucketLocation, BucketType, generate_bucket_location};
136    use crate::tools::config;
137    use crate::tools::runtime_services::RuntimeServices;
138    use crate::tools::server_id::ServerId;
139    use crate::tools::time::{DurationMillis, TimeMillis};
140    use crate::tools::types::Id;
141
142    #[tokio::test]
143    async fn converge_basics_test() -> anyhow::Result<()> {
144        let runtime_services = RuntimeServices::default_for_testing();
145        let client_storage = MemClientStorage::new().await?;
146        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
147
148        const NUM_PEERS: usize = 100;
149
150        {
151            for _ in 0..NUM_PEERS {
152                let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
153                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
154                peer_tracker.add_peer(peer)?;
155            }
156            assert_eq!(NUM_PEERS, peer_tracker.len());
157        }
158
159        // Now iterate through them all
160        {
161            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
162            let mut count = 0;
163            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
164            while let Some(_peer) = peer_iter.next_peer() {
165                count += 1;
166            }
167            assert_eq!(NUM_PEERS, count);
168        };
169
170        // Now iterate through them, but bail after the first
171        {
172            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
173            let mut count = 0;
174            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
175            if peer_iter.next_peer().is_some() {
176                count += 1;
177            }
178            assert_eq!(1, count);
179        };
180
181        // Now iterate through them, but delete half of them
182        {
183            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
184            let mut count = 0;
185            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
186            while let Some((peer, _)) = peer_iter.next_peer() {
187                count += 1;
188                if 0 == count % 2 {
189                    peer_iter.remove_peer(&peer);
190                }
191            }
192            assert_eq!(NUM_PEERS, count);
193            assert_eq!(NUM_PEERS / 2, peer_tracker.len());
194        }
195
196        Ok(())
197    }
198
199    #[tokio::test]
200    async fn converge_termination_test() -> anyhow::Result<()> {
201        let runtime_services = RuntimeServices::default_for_testing();
202        let client_storage = MemClientStorage::new().await?;
203        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
204
205        const NUM_PEERS: usize = 100;
206
207        {
208            for _ in 0..NUM_PEERS {
209                let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
210                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
211                peer_tracker.add_peer(peer)?;
212            }
213            assert_eq!(NUM_PEERS, peer_tracker.len());
214        }
215
216        {
217            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
218            let mut count = 0;
219            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 3, None).await?;
220            while let Some(_peer) = peer_iter.next_peer() {
221                count += 1;
222            }
223            assert_eq!(3 + 1, count);
224        }
225
226        Ok(())
227    }
228
229    #[tokio::test]
230    async fn converge_insertions_test() -> anyhow::Result<()> {
231        let runtime_services = RuntimeServices::default_for_testing();
232        let client_storage = MemClientStorage::new().await?;
233        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
234
235        const NUM_PEERS: usize = 100;
236
237        {
238            for _ in 0..NUM_PEERS {
239                let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
240                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
241                peer_tracker.add_peer(peer)?;
242            }
243            assert_eq!(NUM_PEERS, peer_tracker.len());
244        }
245
246        // Now iterate through them, but add a few more peers
247        {
248            let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
249            let mut count = 0;
250            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
251            while let Some(_peer) = peer_iter.next_peer() {
252                count += 1;
253
254                if 0 == count % 10 {
255                    let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
256                    let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
257                    peer_iter.add_peers(vec![peer]);
258                }
259
260                if 50 == count {
261                    break;
262                }
263            }
264
265            assert_eq!(50, count);
266            assert_eq!(NUM_PEERS + 5, peer_tracker.len());
267        }
268
269        Ok(())
270    }
271
272    #[tokio::test]
273    async fn converge_targeting_test() -> anyhow::Result<()> {
274        let runtime_services = RuntimeServices::default_for_testing();
275        let client_storage = MemClientStorage::new().await?;
276        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
277
278        const NUM_PEERS: usize = 100;
279
280        {
281            for _ in 0..NUM_PEERS {
282                let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
283                let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
284                peer_tracker.add_peer(peer)?;
285            }
286            assert_eq!(NUM_PEERS, peer_tracker.len());
287        }
288
289        // This is the peer we are actually targeting
290        let target_server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
291        let target_peer = target_server_id.to_peer(runtime_services.time_provider.as_ref())?;
292
293        {
294            const PEER_DISCOVERY_I: usize = 37usize;
295            const PEER_DISCOVERY_I_PLUS_1: usize = PEER_DISCOVERY_I + 1;
296
297            let bucket_location = {
298                let mut location_id = target_peer.id;
299                for i in 10..31 {
300                    location_id.0[i] = 0u8;
301                }
302                BucketLocation {
303                    bucket_type: BucketType::User,
304                    base_id: location_id,
305                    duration: DurationMillis::zero(),
306                    bucket_time_millis: TimeMillis::zero(),
307                    location_id,
308                }
309            };
310
311            let mut count = 0;
312            let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
313            while let Some((peer, _)) = peer_iter.next_peer() {
314                count += 1;
315                match count {
316                    PEER_DISCOVERY_I => {
317                        peer_iter.add_peers(vec![target_peer.clone()]);
318                    }
319                    PEER_DISCOVERY_I_PLUS_1 => {
320                        if peer.id != target_peer.id {
321                            anyhow::bail!("peer is not the one we expected");
322                        }
323                        break;
324                    }
325                    _ => {}
326                }
327            }
328
329            assert_eq!(PEER_DISCOVERY_I_PLUS_1, count);
330            assert_eq!(NUM_PEERS + 1, peer_tracker.len());
331        }
332
333        Ok(())
334    }
335
336    /// Verify that `cache_radius` starts by skipping peers inside the radius, then opens up
337    /// one ring per step so that closer peers are eventually visited too.
338    #[tokio::test]
339    async fn converge_cache_radius_test() -> anyhow::Result<()> {
340        let runtime_services = RuntimeServices::default_for_testing();
341        let client_storage = MemClientStorage::new().await?;
342        let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
343
344        let location_id = Id::zero();
345
346        const NUM_PEERS: usize = 100;
347        let mut labs_added: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
348        for _ in 0..NUM_PEERS {
349            let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
350            let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
351            let lab = crate::tools::tools::leading_agreement_bits_xor(&location_id.0, &peer.id.0);
352            labs_added.push(lab);
353            peer_tracker.add_peer(peer)?;
354        }
355        assert_eq!(NUM_PEERS, peer_tracker.len());
356
357        let mut sorted_labs = labs_added.clone();
358        sorted_labs.sort();
359        let cache_radius = sorted_labs[NUM_PEERS / 2];
360
361        let mut labs_visited: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
362        let mut peer_iter = peer_tracker.iterate_to_location(location_id, usize::MAX, Some(cache_radius)).await?;
363        while let Some((_, lab)) = peer_iter.next_peer() {
364            labs_visited.push(lab);
365        }
366
367        assert_eq!(NUM_PEERS, labs_visited.len(), "all peers should be visited");
368
369        let has_outside_peers = labs_added.iter().any(|&lab| lab < cache_radius);
370        if has_outside_peers {
371            assert!(labs_visited[0] < cache_radius, "first peer should be outside the initial cache zone, got lab={} cache_radius={}", labs_visited[0], cache_radius);
372        }
373
374        Ok(())
375    }
376}