hashiverse_lib/client/peer_tracker/
peer_iterator.rs1use crate::client::peer_tracker::peer_tracker::PeerTracker;
20use crate::protocol::peer::Peer;
21use crate::tools::tools;
22use crate::tools::tools::LeadingAgreementBits;
23use crate::tools::types::Id;
24use log::warn;
25use std::collections::HashSet;
26
27pub struct PeerIterator<'a> {
28 tracker: &'a mut PeerTracker,
29 bucket_location_id: Id,
30 max_iterations_since_high_watermark: usize,
31 peers_already_queried: HashSet<Id>,
32 high_watermark: LeadingAgreementBits,
33 iterations_since_high_watermark: usize,
34 cache_radius: Option<LeadingAgreementBits>,
35}
36
37impl<'a> PeerIterator<'a> {
38 pub fn new(tracker: &'a mut PeerTracker, bucket_location_id: Id, max_iterations_since_high_watermark: usize, cache_radius: Option<LeadingAgreementBits>) -> Self {
39 Self {
40 tracker,
41 bucket_location_id,
42 max_iterations_since_high_watermark,
43 peers_already_queried: HashSet::new(),
44 high_watermark: 0,
45 iterations_since_high_watermark: 0,
46 cache_radius,
47 }
48 }
49 pub fn next_peer(&mut self) -> Option<(Peer, LeadingAgreementBits)> {
50 loop {
51 let nearest_peer = self
52 .tracker
53 .peers()
54 .iter()
55 .filter(|peer| !self.peers_already_queried.contains(&peer.id))
56 .map(|peer| (peer, tools::leading_agreement_bits_xor(&self.bucket_location_id.0, &peer.id.0)))
57 .filter(|(_, lab)| self.cache_radius.is_none_or(|r| *lab < r))
58 .max_by_key(|peer| peer.1);
59
60 match nearest_peer {
61 Some(nearest_peer) => {
62 self.peers_already_queried.insert(nearest_peer.0.id);
63
64 if nearest_peer.1 > self.high_watermark {
65 self.high_watermark = nearest_peer.1;
66 self.iterations_since_high_watermark = 0;
67 }
68 else {
69 self.iterations_since_high_watermark += 1;
70 if self.iterations_since_high_watermark > self.max_iterations_since_high_watermark {
71 return None;
72 }
73 }
74
75 if let Some(r) = &mut self.cache_radius {
77 *r = (*r + 1).min(256);
78 }
79
80 return Some((nearest_peer.0.clone(), nearest_peer.1));
81 }
82 None => {
83 let any_unvisited = self.tracker.peers().iter().any(|p| !self.peers_already_queried.contains(&p.id));
86 if !any_unvisited {
87 return None;
88 }
89 {
91 let r = &mut self.cache_radius?;
92 *r = (*r + 1).min(256)
93 }
94 }
95 }
96 }
97 }
98
99 pub fn iterations_since_high_watermark(&self) -> usize {
100 self.iterations_since_high_watermark
101 }
102
103 pub fn add_peers(&mut self, peers: Vec<Peer>) {
104 for peer in peers {
105 if let Err(e) = self.tracker.add_peer(peer) {
106 warn!("not adding invalid peer: {}", e);
107 }
108 }
109 }
110 pub fn remove_peer(&mut self, peer: &Peer) {
111 self.tracker.remove_peer(peer);
112 }
113}
114
115pub struct ConvergeToLocationVisitResult {
116 pub done: bool, pub peer_unavailable: bool, pub peers_discovered: Vec<Peer>, }
120
121#[async_trait::async_trait]
122pub trait ConvergeToLocationVisitor: Send + Sync {
123 async fn on_peer(&mut self, peer: &Peer) -> anyhow::Result<ConvergeToLocationVisitResult>;
124}
125
126#[cfg(test)]
127mod tests {
128 use crate::client::client_storage::mem_client_storage::MemClientStorage;
129 use crate::client::peer_tracker::peer_tracker::PeerTracker;
130 use crate::tools::buckets::{BUCKET_DURATIONS, BucketLocation, BucketType, generate_bucket_location};
131 use crate::tools::config;
132 use crate::tools::pow_generator::single_threaded_pow_generator::SingleThreadedPowGenerator;
133 use crate::tools::runtime_services::RuntimeServices;
134 use crate::tools::server_id::ServerId;
135 use crate::tools::time::{DurationMillis, TimeMillis};
136 use crate::tools::time_provider::time_provider::RealTimeProvider;
137 use crate::tools::types::{Id, Pow};
138 use crate::transport::mem_transport::MemTransportFactory;
139 use std::sync::Arc;
140
141 fn get_test_runtime_services() -> Arc<RuntimeServices> {
142 Arc::new(RuntimeServices {
143 time_provider: Arc::new(RealTimeProvider),
144 transport_factory: MemTransportFactory::default(),
145 pow_generator: Arc::new(SingleThreadedPowGenerator::new()),
146 })
147 }
148
149 #[tokio::test]
150 async fn general_tests() -> anyhow::Result<()> {
151 let runtime_services = RuntimeServices::default_for_testing();
152 let client_storage = MemClientStorage::new().await?;
153 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
154
155 assert!(peer_tracker.is_empty());
156 assert_eq!(0, peer_tracker.len());
157
158 {
160 loop {
161 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), Pow(config::SERVER_KEY_POW_MIN.0 / 2), true, runtime_services.pow_generator.as_ref()).await?;
162 if server_id.pow >= config::SERVER_KEY_POW_MIN {
163 continue;
164 }
165 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
166 let result = peer_tracker.add_peer(peer);
167 assert!(result.is_err());
168 assert_eq!(0, peer_tracker.len());
169 break;
170 }
171 }
172
173 {
175 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
176 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
177 let result = peer_tracker.add_peer(peer);
178 assert!(result.is_ok());
179 assert_eq!(1, peer_tracker.len());
180 }
181
182 {
184 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
185 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
186 let result = peer_tracker.add_peer(peer.clone());
187 assert!(result.is_ok());
188 assert_eq!(2, peer_tracker.len());
189 let result = peer_tracker.add_peer(peer.clone());
190 assert!(result.is_ok());
191 assert_eq!(2, peer_tracker.len());
192 }
193
194 {
196 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
197 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
198 let result = peer_tracker.add_peer(peer.clone());
199 assert!(result.is_ok());
200 assert_eq!(3, peer_tracker.len());
201 peer_tracker.remove_peer(&peer);
202 assert_eq!(2, peer_tracker.len());
203 }
204
205 {
207 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
208 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
209 peer_tracker.remove_peer(&peer);
210 assert_eq!(2, peer_tracker.len());
211 }
212
213 Ok(())
214 }
215
216 #[tokio::test]
217 async fn converge_basics_test() -> anyhow::Result<()> {
218 let runtime_services = get_test_runtime_services();
219 let client_storage = MemClientStorage::new().await?;
220 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
221
222 const NUM_PEERS: usize = 100;
223
224 {
225 for _ in 0..NUM_PEERS {
226 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
227 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
228 peer_tracker.add_peer(peer)?;
229 }
230 assert_eq!(NUM_PEERS, peer_tracker.len());
231 }
232
233 {
235 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
236 let mut count = 0;
237 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
238 while let Some(_peer) = peer_iter.next_peer() {
239 count += 1;
240 }
241 assert_eq!(NUM_PEERS, count);
242 };
243
244 {
246 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
247 let mut count = 0;
248 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
249 if peer_iter.next_peer().is_some() {
250 count += 1;
251 }
252 assert_eq!(1, count);
253 };
254
255 {
257 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
258 let mut count = 0;
259 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
260 while let Some((peer, _)) = peer_iter.next_peer() {
261 count += 1;
262 if 0 == count % 2 {
263 peer_iter.remove_peer(&peer);
264 }
265 }
266 assert_eq!(NUM_PEERS, count);
267 assert_eq!(NUM_PEERS / 2, peer_tracker.len());
268 }
269
270 Ok(())
271 }
272
273 #[tokio::test]
274 async fn converge_termination_test() -> anyhow::Result<()> {
275 let runtime_services = get_test_runtime_services();
276 let client_storage = MemClientStorage::new().await?;
277 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
278
279 const NUM_PEERS: usize = 100;
280
281 {
282 for _ in 0..NUM_PEERS {
283 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
284 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
285 peer_tracker.add_peer(peer)?;
286 }
287 assert_eq!(NUM_PEERS, peer_tracker.len());
288 }
289
290 {
291 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
292 let mut count = 0;
293 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, 3, None).await?;
294 while let Some(_peer) = peer_iter.next_peer() {
295 count += 1;
296 }
297 assert_eq!(3 + 1, count);
298 }
299
300 Ok(())
301 }
302
303 #[tokio::test]
304 async fn converge_insertions_test() -> anyhow::Result<()> {
305 let runtime_services = get_test_runtime_services();
306 let client_storage = MemClientStorage::new().await?;
307 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
308
309 const NUM_PEERS: usize = 100;
310
311 {
312 for _ in 0..NUM_PEERS {
313 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
314 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
315 peer_tracker.add_peer(peer)?;
316 }
317 assert_eq!(NUM_PEERS, peer_tracker.len());
318 }
319
320 {
322 let bucket_location = generate_bucket_location(BucketType::User, Id::random(), BUCKET_DURATIONS[0], runtime_services.time_provider.current_time_millis())?;
323 let mut count = 0;
324 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
325 while let Some(_peer) = peer_iter.next_peer() {
326 count += 1;
327
328 if 0 == count % 10 {
329 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
330 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
331 peer_iter.add_peers(vec![peer]);
332 }
333
334 if 50 == count {
335 break;
336 }
337 }
338
339 assert_eq!(50, count);
340 assert_eq!(NUM_PEERS + 5, peer_tracker.len());
341 }
342
343 Ok(())
344 }
345
346 #[tokio::test]
347 async fn converge_targeting_test() -> anyhow::Result<()> {
348 let runtime_services = get_test_runtime_services();
349 let client_storage = MemClientStorage::new().await?;
350 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
351
352 const NUM_PEERS: usize = 100;
353
354 {
355 for _ in 0..NUM_PEERS {
356 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
357 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
358 peer_tracker.add_peer(peer)?;
359 }
360 assert_eq!(NUM_PEERS, peer_tracker.len());
361 }
362
363 let target_server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
365 let target_peer = target_server_id.to_peer(runtime_services.time_provider.as_ref())?;
366
367 {
368 const PEER_DISCOVERY_I: usize = 37usize;
369 const PEER_DISCOVERY_I_PLUS_1: usize = PEER_DISCOVERY_I + 1;
370
371 let bucket_location = {
372 let mut location_id = target_peer.id;
373 for i in 10..31 {
374 location_id.0[i] = 0u8;
375 }
376 BucketLocation {
377 bucket_type: BucketType::User,
378 base_id: location_id,
379 duration: DurationMillis::zero(),
380 bucket_time_millis: TimeMillis::zero(),
381 location_id,
382 }
383 };
384
385 let mut count = 0;
386 let mut peer_iter = peer_tracker.iterate_to_location(bucket_location.location_id, usize::MAX, None).await?;
387 while let Some((peer, _)) = peer_iter.next_peer() {
388 count += 1;
389 match count {
390 PEER_DISCOVERY_I => {
391 peer_iter.add_peers(vec![target_peer.clone()]);
392 }
393 PEER_DISCOVERY_I_PLUS_1 => {
394 if peer.id != target_peer.id {
395 anyhow::bail!("peer is not the one we expected");
396 }
397 break;
398 }
399 _ => {}
400 }
401 }
402
403 assert_eq!(PEER_DISCOVERY_I_PLUS_1, count);
404 assert_eq!(NUM_PEERS + 1, peer_tracker.len());
405 }
406
407 Ok(())
408 }
409
410 #[tokio::test]
413 async fn converge_cache_radius_test() -> anyhow::Result<()> {
414 let runtime_services = get_test_runtime_services();
415 let client_storage = MemClientStorage::new().await?;
416 let mut peer_tracker = PeerTracker::new(runtime_services.clone(), client_storage.clone()).await?;
417
418 let location_id = Id::zero();
419
420 let make_peer_with_lab = |lab_bits: usize| -> anyhow::Result<crate::protocol::peer::Peer> {
421 let mut id_bytes = [0u8; 32];
422 let byte_idx = lab_bits / 8;
423 let bit_idx = 7 - (lab_bits % 8);
424 id_bytes[byte_idx] = 1u8 << bit_idx;
425 let id = Id(id_bytes);
426 let _ = id;
427 anyhow::bail!("use direct ServerId below")
428 };
429 let _ = make_peer_with_lab;
430
431 const NUM_PEERS: usize = 100;
432 let mut labs_added: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
433 for _ in 0..NUM_PEERS {
434 let server_id = ServerId::new("own_pow", runtime_services.time_provider.as_ref(), config::SERVER_KEY_POW_MIN, true, runtime_services.pow_generator.as_ref()).await?;
435 let peer = server_id.to_peer(runtime_services.time_provider.as_ref())?;
436 let lab = crate::tools::tools::leading_agreement_bits_xor(&location_id.0, &peer.id.0);
437 labs_added.push(lab);
438 peer_tracker.add_peer(peer)?;
439 }
440 assert_eq!(NUM_PEERS, peer_tracker.len());
441
442 let mut sorted_labs = labs_added.clone();
443 sorted_labs.sort();
444 let cache_radius = sorted_labs[NUM_PEERS / 2];
445
446 let mut labs_visited: Vec<crate::tools::tools::LeadingAgreementBits> = Vec::new();
447 let mut peer_iter = peer_tracker.iterate_to_location(location_id, usize::MAX, Some(cache_radius)).await?;
448 while let Some((_, lab)) = peer_iter.next_peer() {
449 labs_visited.push(lab);
450 }
451
452 assert_eq!(NUM_PEERS, labs_visited.len(), "all peers should be visited");
453
454 let has_outside_peers = labs_added.iter().any(|&lab| lab < cache_radius);
455 if has_outside_peers {
456 assert!(labs_visited[0] < cache_radius, "first peer should be outside the initial cache zone, got lab={} cache_radius={}", labs_visited[0], cache_radius);
457 }
458
459 Ok(())
460 }
461}