hashiverse_lib/client/peer_tracker/
peer_iterator.rs1use crate::client::peer_tracker::peer_tracker::PeerTracker;
20use crate::protocol::peer::Peer;
21use crate::tools::tools;
22use crate::tools::tools::LeadingAgreementBits;
23use crate::tools::types::Id;
24use log::warn;
25use std::collections::HashSet;
26
27pub struct PeerIterator<'a> {
28 tracker: &'a mut PeerTracker,
29 bucket_location_id: Id,
30 max_iterations_since_high_watermark: usize,
31 peers_already_queried: HashSet<Id>,
32 high_watermark: LeadingAgreementBits,
33 iterations_since_high_watermark: usize,
34 cache_radius: Option<LeadingAgreementBits>,
35}
36
37impl<'a> PeerIterator<'a> {
38 pub fn new(tracker: &'a mut PeerTracker, bucket_location_id: Id, max_iterations_since_high_watermark: usize, cache_radius: Option<LeadingAgreementBits>) -> Self {
39 Self {
40 tracker,
41 bucket_location_id,
42 max_iterations_since_high_watermark,
43 peers_already_queried: HashSet::new(),
44 high_watermark: 0,
45 iterations_since_high_watermark: 0,
46 cache_radius,
47 }
48 }
49 pub fn next_peer(&mut self) -> Option<(Peer, LeadingAgreementBits)> {
50 loop {
51 let nearest_peer = self
52 .tracker
53 .peers()
54 .iter()
55 .filter(|peer| !self.peers_already_queried.contains(&peer.id))
56 .map(|peer| (peer, tools::leading_agreement_bits_xor(&self.bucket_location_id.0, &peer.id.0)))
57 .filter(|(_, lab)| self.cache_radius.is_none_or(|r| *lab < r))
58 .max_by_key(|peer| peer.1);
59
60 match nearest_peer {
61 Some(nearest_peer) => {
62 self.peers_already_queried.insert(nearest_peer.0.id);
63
64 if nearest_peer.1 > self.high_watermark {
65 self.high_watermark = nearest_peer.1;
66 self.iterations_since_high_watermark = 0;
67 }
68 else {
69 self.iterations_since_high_watermark += 1;
70 if self.iterations_since_high_watermark > self.max_iterations_since_high_watermark {
71 return None;
72 }
73 }
74
75 if let Some(r) = &mut self.cache_radius {
77 *r = (*r + 1).min(256);
78 }
79
80 return Some((nearest_peer.0.clone(), nearest_peer.1));
81 }
82 None => {
83 let any_unvisited = self.tracker.peers().iter().any(|p| !self.peers_already_queried.contains(&p.id));
86 if !any_unvisited {
87 return None;
88 }
89 let r = self.cache_radius.as_mut()?;
98 *r = (*r + 1).min(256);
99 }
100 }
101 }
102 }
103
104 pub fn iterations_since_high_watermark(&self) -> usize {
105 self.iterations_since_high_watermark
106 }
107
108 pub fn add_peers(&mut self, peers: Vec<Peer>) {
109 for peer in peers {
110 if let Err(e) = self.tracker.add_peer(peer) {
111 warn!("not adding invalid peer: {}", e);
112 }
113 }
114 }
115 pub fn remove_peer(&mut self, peer: &Peer) {
116 self.tracker.remove_peer(peer);
117 }
118}
119
120pub struct ConvergeToLocationVisitResult {
121 pub done: bool, pub peer_unavailable: bool, pub peers_discovered: Vec<Peer>, }
125
126#[async_trait::async_trait]
127pub trait ConvergeToLocationVisitor: Send + Sync {
128 async fn on_peer(&mut self, peer: &Peer) -> anyhow::Result<ConvergeToLocationVisitResult>;
129}
130
131#[cfg(test)]
132mod tests {
133 use crate::client::client_storage::mem_client_storage::MemClientStorage;
134 use crate::client::peer_tracker::peer_tracker::PeerTracker;
135 use crate::tools::buckets::{BUCKET_DURATIONS, BucketLocation, BucketType, generate_bucket_location};
136 use crate::tools::config;
137 use crate::tools::runtime_services::RuntimeServices;
138 use crate::tools::server_id::ServerId;
139 use crate::tools::time::{DurationMillis, TimeMillis};
140 use crate::tools::types::Id;
141
142 #[tokio::test]
143 async fn converge_basics_test() -> anyhow::Result<()> {
144 let runtime_services = RuntimeServices::default_for_testing();
145 let client_storage = MemClientStorage::new().await?;
146 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
147
148 const NUM_PEERS: usize = 100;
149
150 {
151 for _ in 0..NUM_PEERS {
152 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
153 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
154 peer_tracker.add_peer(peer)?;
155 }
156 assert_eq!(NUM_PEERS, peer_tracker.len());
157 }
158
159 {
161 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
162 let mut count = 0;
163 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
164 while let Some(_peer) = peer_iter.next_peer() {
165 count += 1;
166 }
167 assert_eq!(NUM_PEERS, count);
168 };
169
170 {
172 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
173 let mut count = 0;
174 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
175 if peer_iter.next_peer().is_some() {
176 count += 1;
177 }
178 assert_eq!(1, count);
179 };
180
181 {
183 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
184 let mut count = 0;
185 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
186 while let Some((peer, _)) = peer_iter.next_peer() {
187 count += 1;
188 if 0 == count % 2 {
189 peer_iter.remove_peer(&peer);
190 }
191 }
192 assert_eq!(NUM_PEERS, count);
193 assert_eq!(NUM_PEERS / 2, peer_tracker.len());
194 }
195
196 Ok(())
197 }
198
199 #[tokio::test]
200 async fn converge_termination_test() -> anyhow::Result<()> {
201 let runtime_services = RuntimeServices::default_for_testing();
202 let client_storage = MemClientStorage::new().await?;
203 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
204
205 const NUM_PEERS: usize = 100;
206
207 {
208 for _ in 0..NUM_PEERS {
209 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
210 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
211 peer_tracker.add_peer(peer)?;
212 }
213 assert_eq!(NUM_PEERS, peer_tracker.len());
214 }
215
216 {
217 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
218 let mut count = 0;
219 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 3, None).await?;
220 while let Some(_peer) = peer_iter.next_peer() {
221 count += 1;
222 }
223 assert_eq!(3 + 1, count);
224 }
225
226 Ok(())
227 }
228
229 #[tokio::test]
230 async fn converge_insertions_test() -> anyhow::Result<()> {
231 let runtime_services = RuntimeServices::default_for_testing();
232 let client_storage = MemClientStorage::new().await?;
233 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
234
235 const NUM_PEERS: usize = 100;
236
237 {
238 for _ in 0..NUM_PEERS {
239 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
240 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
241 peer_tracker.add_peer(peer)?;
242 }
243 assert_eq!(NUM_PEERS, peer_tracker.len());
244 }
245
246 {
248 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
249 let mut count = 0;
250 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
251 while let Some(_peer) = peer_iter.next_peer() {
252 count += 1;
253
254 if 0 == count % 10 {
255 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
256 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
257 peer_iter.add_peers(vec![peer]);
258 }
259
260 if 50 == count {
261 break;
262 }
263 }
264
265 assert_eq!(50, count);
266 assert_eq!(NUM_PEERS + 5, peer_tracker.len());
267 }
268
269 Ok(())
270 }
271
272 #[tokio::test]
273 async fn converge_targeting_test() -> anyhow::Result<()> {
274 let runtime_services = RuntimeServices::default_for_testing();
275 let client_storage = MemClientStorage::new().await?;
276 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
277
278 const NUM_PEERS: usize = 100;
279
280 {
281 for _ in 0..NUM_PEERS {
282 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
283 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
284 peer_tracker.add_peer(peer)?;
285 }
286 assert_eq!(NUM_PEERS, peer_tracker.len());
287 }
288
289 let target_server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
291 let target_peer = target_server_id.to_peer(runtime_services.time_provider.as_ref())?;
292
293 {
294 const PEER_DISCOVERY_I: usize = 37usize;
295 const PEER_DISCOVERY_I_PLUS_1: usize = PEER_DISCOVERY_I + 1;
296
297 let bucket_location = {
298 let mut location_id = target_peer.id;
299 for i in 10..31 {
300 location_id.0[i] = 0u8;
301 }
302 BucketLocation {
303 bucket_type: BucketType::User,
304 base_id: location_id,
305 duration: DurationMillis::zero(),
306 bucket_time_millis: TimeMillis::zero(),
307 location_id,
308 }
309 };
310
311 let mut count = 0;
312 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
313 while let Some((peer, _)) = peer_iter.next_peer() {
314 count += 1;
315 match count {
316 PEER_DISCOVERY_I => {
317 peer_iter.add_peers(vec![target_peer.clone()]);
318 }
319 PEER_DISCOVERY_I_PLUS_1 => {
320 if peer.id != target_peer.id {
321 anyhow::bail!("peer is not the one we expected");
322 }
323 break;
324 }
325 _ => {}
326 }
327 }
328
329 assert_eq!(PEER_DISCOVERY_I_PLUS_1, count);
330 assert_eq!(NUM_PEERS + 1, peer_tracker.len());
331 }
332
333 Ok(())
334 }
335
336 #[tokio::test]
339 async fn converge_cache_radius_test() -> anyhow::Result<()> {
340 let runtime_services = RuntimeServices::default_for_testing();
341 let client_storage = MemClientStorage::new().await?;
342 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
343
344 let location_id = Id::zero();
345
346 const NUM_PEERS: usize = 100;
347 let mut labs_added: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
348 for _ in 0..NUM_PEERS {
349 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
350 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
351 let lab = crate::tools::tools::leading_agreement_bits_xor(&location_id.0, &peer.id.0);
352 labs_added.push(lab);
353 peer_tracker.add_peer(peer)?;
354 }
355 assert_eq!(NUM_PEERS, peer_tracker.len());
356
357 let mut sorted_labs = labs_added.clone();
358 sorted_labs.sort();
359 let cache_radius = sorted_labs[NUM_PEERS / 2];
360
361 let mut labs_visited: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
362 let mut peer_iter = peer_tracker.iterate_to_location(location_id, usize::MAX, Some(cache_radius)).await?;
363 while let Some((_, lab)) = peer_iter.next_peer() {
364 labs_visited.push(lab);
365 }
366
367 assert_eq!(NUM_PEERS, labs_visited.len(), "all peers should be visited");
368
369 let has_outside_peers = labs_added.iter().any(|&lab| lab < cache_radius);
370 if has_outside_peers {
371 assert!(labs_visited[0] < cache_radius, "first peer should be outside the initial cache zone, got lab={} cache_radius={}", labs_visited[0], cache_radius);
372 }
373
374 Ok(())
375 }
376}