ant_bootstrap/cache_store/
mod.rs1pub mod cache_data_v0;
10pub mod cache_data_v1;
11
12use crate::{BootstrapConfig, Error, Result, craft_valid_multiaddr};
13use libp2p::{Multiaddr, PeerId, multiaddr::Protocol};
14use rand::Rng;
15use std::{collections::HashSet, fs, sync::Arc, time::Duration};
16use tokio::sync::RwLock;
17use tracing::Instrument;
18
19pub type CacheDataLatest = cache_data_v1::CacheData;
20pub const CACHE_DATA_VERSION_LATEST: u32 = cache_data_v1::CacheData::CACHE_DATA_VERSION;
21
22#[derive(Clone, Debug)]
23pub struct BootstrapCacheStore {
24 pub(crate) config: Arc<BootstrapConfig>,
26 pub(crate) data: Arc<RwLock<CacheDataLatest>>,
28 pub(crate) to_remove: Arc<RwLock<HashSet<PeerId>>>,
30}
31
32impl BootstrapCacheStore {
33 pub fn config(&self) -> &BootstrapConfig {
34 &self.config
35 }
36
37 pub fn new(config: BootstrapConfig) -> Result<Self> {
39 info!("Creating new CacheStore with config: {:?}", config);
40
41 if !config.cache_dir.exists() {
43 info!(
44 "Attempting to create cache directory at {:?}",
45 config.cache_dir
46 );
47 fs::create_dir_all(&config.cache_dir).inspect_err(|err| {
48 warn!(
49 "Failed to create cache directory at {:?}: {err}",
50 config.cache_dir
51 );
52 })?;
53 }
54
55 let store = Self {
56 config: Arc::new(config),
57 data: Arc::new(RwLock::new(CacheDataLatest::default())),
58 to_remove: Arc::new(RwLock::new(HashSet::new())),
59 };
60
61 Ok(store)
62 }
63
64 pub async fn peer_count(&self) -> usize {
65 self.data.read().await.peers.len()
66 }
67
68 pub async fn get_all_addrs(&self) -> Vec<Multiaddr> {
69 self.data.read().await.get_all_addrs().cloned().collect()
70 }
71
72 pub async fn queue_remove_peer(&self, peer_id: &PeerId) {
74 self.to_remove.write().await.insert(*peer_id);
75 }
76
77 pub async fn add_addr(&self, addr: Multiaddr) {
81 if addr.iter().any(|p| matches!(p, Protocol::P2pCircuit)) {
82 return;
83 }
84 let Some(addr) = craft_valid_multiaddr(&addr, false) else {
85 return;
86 };
87 let peer_id = match addr.iter().find(|p| matches!(p, Protocol::P2p(_))) {
88 Some(Protocol::P2p(id)) => id,
89 _ => return,
90 };
91
92 debug!("Adding addr to bootstrap cache: {addr}");
93
94 self.data.write().await.add_peer(
95 peer_id,
96 [addr].iter(),
97 self.config.max_addrs_per_cached_peer,
98 self.config.max_cached_peers,
99 );
100 }
101
102 pub fn load_cache_data(cfg: &BootstrapConfig) -> Result<CacheDataLatest> {
105 match cache_data_v1::CacheData::read_from_file(
107 &cfg.cache_dir,
108 &Self::cache_file_name(cfg.local),
109 ) {
110 Ok(mut data) => {
111 while data.peers.len() > cfg.max_cached_peers {
112 data.peers.pop_back();
113 }
114 return Ok(data);
115 }
116 Err(err) => {
117 warn!("Failed to load cache data from latest version: {err}");
118 }
119 }
120
121 match cache_data_v0::CacheData::read_from_file(
123 &cfg.cache_dir,
124 &Self::cache_file_name(cfg.local),
125 ) {
126 Ok(data) => {
127 warn!("Loaded cache data from older version, upgrading to latest version");
128 let mut data: CacheDataLatest = data.into();
129 while data.peers.len() > cfg.max_cached_peers {
130 data.peers.pop_back();
131 }
132
133 Ok(data)
134 }
135 Err(err) => {
136 warn!("Failed to load cache data from older version: {err}");
137 Err(Error::FailedToParseCacheData)
138 }
139 }
140 }
141
142 pub async fn sync_and_flush_to_disk(&self) -> Result<()> {
146 if self.config.disable_cache_writing {
147 info!("Cache writing is disabled, skipping sync to disk");
148 return Ok(());
149 }
150
151 if self.data.read().await.peers.is_empty() {
152 info!("Cache is empty, skipping sync and flush to disk");
153 return Ok(());
154 }
155
156 info!(
157 "Flushing cache to disk, with data containing: {} peers",
158 self.data.read().await.peers.len(),
159 );
160
161 if let Ok(data_from_file) = Self::load_cache_data(&self.config) {
162 self.data.write().await.sync(
163 &data_from_file,
164 self.config.max_addrs_per_cached_peer,
165 self.config.max_cached_peers,
166 );
167 } else {
168 warn!("Failed to load cache data from file, overwriting with new data");
169 }
170
171 let to_remove: Vec<PeerId> = self.to_remove.write().await.drain().collect();
173 if !to_remove.is_empty() {
174 info!("Removing {} peers from cache", to_remove.len());
175 for peer_id in to_remove {
176 self.data.write().await.remove_peer(&peer_id);
177 }
178 } else {
179 debug!("No peers queued for removal from cache");
180 }
181
182 self.write().await.inspect_err(|e| {
183 error!("Failed to save cache to disk: {e}");
184 })?;
185
186 debug!("Clearing in-memory cache data after flush to disk");
188 self.data.write().await.peers.clear();
189
190 Ok(())
191 }
192
193 pub async fn write(&self) -> Result<()> {
196 if self.config.disable_cache_writing {
197 info!("Cache writing is disabled, skipping sync to disk");
198 return Ok(());
199 }
200
201 let filename = Self::cache_file_name(self.config.local);
202
203 self.data
204 .write()
205 .await
206 .write_to_file(&self.config.cache_dir, &filename)?;
207
208 if self.config.backwards_compatible_writes {
209 let data = self.data.read().await;
210 cache_data_v0::CacheData::from(&*data)
211 .write_to_file(&self.config.cache_dir, &filename)?;
212 }
213
214 Ok(())
215 }
216
217 pub fn cache_file_name(local: bool) -> String {
219 if local {
220 format!(
221 "bootstrap_cache_local_{}.json",
222 crate::get_network_version()
223 )
224 } else {
225 format!("bootstrap_cache_{}.json", crate::get_network_version())
226 }
227 }
228
229 pub(crate) fn sync_and_flush_periodically(&self) -> tokio::task::JoinHandle<()> {
232 let store = self.clone();
233
234 let current_span = tracing::Span::current();
235 tokio::spawn(async move {
236 let mut sleep_interval =
238 duration_with_variance(store.config.min_cache_save_duration, 10);
239 if store.config.disable_cache_writing {
240 info!("Cache writing is disabled, skipping periodic sync and flush task");
241 return;
242 }
243 info!("Starting periodic cache sync and flush task, first sync in {sleep_interval:?}");
244
245 loop {
246 tokio::time::sleep(sleep_interval).await;
247 if let Err(e) = store.sync_and_flush_to_disk().await {
248 error!("Failed to sync and flush cache to disk: {e}");
249 }
250 let max_cache_save_duration =
252 duration_with_variance(store.config.max_cache_save_duration, 1);
253
254 let new_interval = sleep_interval
255 .checked_mul(store.config.cache_save_scaling_factor)
256 .unwrap_or(max_cache_save_duration);
257 sleep_interval = new_interval.min(max_cache_save_duration);
258 info!("Cache synced and flushed to disk successfully - next sync in {sleep_interval:?}");
259 }
260 }.instrument(current_span))
261 }
262}
263
264fn duration_with_variance(duration: Duration, variance: u32) -> Duration {
266 let variance = duration.as_secs() as f64 * (variance as f64 / 100.0);
267
268 let random_adjustment = Duration::from_secs(rand::thread_rng().gen_range(0..variance as u64));
269 if random_adjustment.as_secs().is_multiple_of(2) {
270 duration.saturating_sub(random_adjustment)
271 } else {
272 duration.saturating_add(random_adjustment)
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use crate::{
280 cache_store::{cache_data_v0, cache_data_v1},
281 multiaddr_get_peer_id,
282 };
283 use libp2p::{Multiaddr, PeerId};
284 use std::{collections::HashSet, time::SystemTime};
285 use tempfile::TempDir;
286 use tokio::{
287 task,
288 time::{Duration, sleep},
289 };
290
291 #[tokio::test]
292 async fn test_duration_variance_fn() {
293 let duration = Duration::from_secs(150);
294 let variance = 10;
295 let expected_variance = Duration::from_secs(15); for _ in 0..10000 {
297 let new_duration = duration_with_variance(duration, variance);
298 println!("new_duration: {new_duration:?}");
299 if new_duration < duration - expected_variance
300 || new_duration > duration + expected_variance
301 {
302 panic!("new_duration: {new_duration:?} is not within the expected range",);
303 }
304 }
305 }
306
307 fn temp_config(dir: &TempDir) -> BootstrapConfig {
308 BootstrapConfig::default().with_cache_dir(dir.path())
309 }
310
311 #[tokio::test]
312 async fn test_empty_cache() {
313 let dir = TempDir::new().expect("temp dir");
314 let config = temp_config(&dir);
315 let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
316
317 cache.write().await.expect("write empty cache");
318 let loaded = BootstrapCacheStore::load_cache_data(&config).expect("load cache");
319 assert!(loaded.peers.is_empty());
320 }
321
322 #[tokio::test]
323 async fn test_max_peer_limit_enforcement() {
324 let dir = TempDir::new().expect("temp dir");
325 let config = BootstrapConfig::default()
326 .with_cache_dir(dir.path())
327 .with_max_cached_peers(3);
328 let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
329
330 let samples = [
331 "/ip4/127.0.0.1/udp/1200/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
332 "/ip4/127.0.0.2/udp/1201/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
333 "/ip4/127.0.0.3/udp/1202/quic-v1/p2p/12D3KooWHehYgXKLxsXjzFzDqMLKhcAVc4LaktnT7Zei1G2zcpJB",
334 "/ip4/127.0.0.4/udp/1203/quic-v1/p2p/12D3KooWQF3NMWHRmMQBY8GVdpQh1V6TFYuQqZkKKvYE7yCS6fYK",
335 "/ip4/127.0.0.5/udp/1204/quic-v1/p2p/12D3KooWRi6wF7yxWLuPSNskXc6kQ5cJ6eaymeMbCRdTnMesPgFx",
336 ];
337
338 let mut recorded = Vec::new();
339 for addr_str in samples {
340 let addr: Multiaddr = addr_str.parse().unwrap();
341 recorded.push(addr.clone());
342 cache.add_addr(addr).await;
343 sleep(Duration::from_millis(5)).await;
344 }
345
346 let current = cache.get_all_addrs().await;
347 assert_eq!(current.len(), 3);
348 assert!(current.iter().all(|addr| recorded[2..].contains(addr)));
349
350 cache.write().await.expect("persist cache");
351 let persisted = BootstrapCacheStore::load_cache_data(&config)
352 .expect("load persisted")
353 .get_all_addrs()
354 .cloned()
355 .collect::<Vec<_>>();
356 assert_eq!(persisted.len(), 3);
357 assert!(persisted.iter().all(|addr| recorded[2..].contains(addr)));
358 }
359
360 #[tokio::test]
361 async fn test_queued_peer_not_removed_until_flush() {
362 let dir = TempDir::new().expect("temp dir");
363 let config = temp_config(&dir);
364 let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
365
366 let addr: Multiaddr = "/ip4/127.0.0.6/udp/1205/quic-v1/p2p/12D3KooWQnE7zXkVUEGBnJtNfR88Ujz4ezgm6bVnkvxHCzhF7S5S"
367 .parse()
368 .unwrap();
369 cache.add_addr(addr.clone()).await;
370 cache.write().await.expect("persist initial cache state");
371 let peer_id = multiaddr_get_peer_id(&addr).expect("peer id");
372 cache.queue_remove_peer(&peer_id).await;
373 let addrs_before_flush = cache.get_all_addrs().await;
374 assert_eq!(
375 addrs_before_flush.len(),
376 1,
377 "peer should remain available before flush"
378 );
379 assert_eq!(
380 addrs_before_flush[0], addr,
381 "cached address should match the inserted peer"
382 );
383
384 let persisted_before_flush =
385 BootstrapCacheStore::load_cache_data(&config).expect("load persisted cache");
386 let persisted_before_flush: Vec<_> =
387 persisted_before_flush.get_all_addrs().cloned().collect();
388 assert!(
389 persisted_before_flush.iter().any(|stored| stored == &addr),
390 "queued removal must not affect persisted cache before flush"
391 );
392
393 cache
394 .sync_and_flush_to_disk()
395 .await
396 .expect("flush cache to disk");
397
398 let persisted_after_flush =
399 BootstrapCacheStore::load_cache_data(&config).expect("load persisted cache");
400 let persisted_after_flush: Vec<_> =
401 persisted_after_flush.get_all_addrs().cloned().collect();
402 assert!(
403 persisted_after_flush.iter().all(|stored| stored != &addr),
404 "peer should be absent from persisted cache after flush"
405 );
406 }
407
408 #[tokio::test]
409 async fn test_queued_peer_removal_queue_drained_after_flush() {
410 let dir = TempDir::new().expect("temp dir");
411 let config = temp_config(&dir);
412 let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
413
414 let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
415 .parse()
416 .unwrap();
417 cache.add_addr(addr.clone()).await;
418 cache.write().await.expect("persist initial cache state");
419
420 let peer_id = multiaddr_get_peer_id(&addr).expect("peer id");
421 cache.queue_remove_peer(&peer_id).await;
422 cache
423 .sync_and_flush_to_disk()
424 .await
425 .expect("flush queued removals");
426 let persisted_after_removal =
427 BootstrapCacheStore::load_cache_data(&config).expect("load cache data");
428 assert!(
429 persisted_after_removal.get_all_addrs().next().is_none(),
430 "peer should be removed from persisted cache after flush"
431 );
432
433 cache.add_addr(addr.clone()).await;
434
435 cache
436 .sync_and_flush_to_disk()
437 .await
438 .expect("flush cache after re-adding peer");
439 let persisted_after_re_add =
440 BootstrapCacheStore::load_cache_data(&config).expect("load cache data");
441 let persisted_after_re_add: Vec<_> =
442 persisted_after_re_add.get_all_addrs().cloned().collect();
443 assert_eq!(
444 persisted_after_re_add.len(),
445 1,
446 "re-added peer should persist after subsequent flush"
447 );
448 assert_eq!(
449 persisted_after_re_add[0], addr,
450 "persisted address should match the re-added peer"
451 );
452 }
453
454 #[tokio::test]
455 async fn test_cache_file_corruption() {
456 let dir = TempDir::new().expect("temp dir");
457 let cache_dir = dir.path();
458 let config = BootstrapConfig::default().with_cache_dir(cache_dir);
459 let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
460
461 let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
462 .parse()
463 .unwrap();
464 cache.add_addr(addr).await;
465 cache.write().await.expect("write cache");
466
467 let corrupted_path = cache_dir
468 .join(format!(
469 "version_{}",
470 cache_data_v1::CacheData::CACHE_DATA_VERSION
471 ))
472 .join(BootstrapCacheStore::cache_file_name(false));
473 std::fs::write(&corrupted_path, "{not valid json}").expect("corrupt file");
474
475 let load_err = BootstrapCacheStore::load_cache_data(&config);
476 assert!(load_err.is_err(), "loading corrupted cache should error");
477
478 let new_store =
479 BootstrapCacheStore::new(config.clone()).expect("create store after corruption");
480 assert_eq!(
481 new_store.peer_count().await,
482 0,
483 "new cache should start empty after corruption"
484 );
485 new_store.write().await.expect("write clean cache");
486
487 let reloaded = BootstrapCacheStore::load_cache_data(&config).expect("reload cache");
488 assert!(
489 reloaded.peers.is_empty(),
490 "cache data should be empty after regeneration"
491 );
492 }
493
494 #[tokio::test]
495 async fn test_max_addrs_per_peer() {
496 let dir = TempDir::new().expect("temp dir");
497 let config = BootstrapConfig::default()
498 .with_cache_dir(dir.path())
499 .with_max_addrs_per_cached_peer(2);
500 let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
501
502 let peer_id = "12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE";
503 for octet in 1..=4 {
504 let addr: Multiaddr = format!("/ip4/127.0.0.{octet}/udp/8080/quic-v1/p2p/{peer_id}")
505 .parse()
506 .unwrap();
507 cache.add_addr(addr).await;
508 }
509
510 cache.write().await.expect("write cache");
511 let reloaded = BootstrapCacheStore::load_cache_data(&config).expect("load cache");
512 let collected: Vec<_> = reloaded.get_all_addrs().cloned().collect();
513 assert!(
514 collected.len() <= 2,
515 "should honor max_addrs_per_peer limit"
516 );
517 }
518
519 #[tokio::test]
520 async fn test_concurrent_cache_access() {
521 let dir = TempDir::new().expect("temp dir");
522 let cache_dir = dir.path().to_path_buf();
523 let config = BootstrapConfig::default().with_cache_dir(cache_dir.clone());
524
525 let mut handles = Vec::new();
526 for idx in 0..5 {
527 let config_clone = config.clone();
528 handles.push(task::spawn(async move {
529 let store = BootstrapCacheStore::new(config_clone)?;
530 let addr: Multiaddr = format!(
531 "/ip4/127.0.0.{}/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UER{}",
532 idx + 1,
533 idx + 1
534 )
535 .parse()
536 .unwrap();
537 store.add_addr(addr).await;
538 sleep(Duration::from_millis(10)).await;
539 store.sync_and_flush_to_disk().await
540 }));
541 }
542
543 for handle in handles {
544 let result = handle.await.expect("task join");
545 result.expect("task result");
546 }
547
548 let final_store = BootstrapCacheStore::new(config).expect("create final store");
549 let loaded = BootstrapCacheStore::load_cache_data(final_store.config()).expect("load");
550 assert_eq!(loaded.peers.len(), 5, "should persist peers from all tasks");
551 }
552
553 #[tokio::test]
554 async fn test_cache_sync_functionality() {
555 let dir = TempDir::new().expect("temp dir");
556 let cache_dir = dir.path();
557
558 let config = BootstrapConfig::default().with_cache_dir(cache_dir);
559 let first_store = BootstrapCacheStore::new(config.clone()).expect("create cache");
560 let addr1: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
561 .parse()
562 .unwrap();
563 first_store.add_addr(addr1.clone()).await;
564 first_store.write().await.expect("write first cache");
565
566 let second_store = BootstrapCacheStore::new(config.clone()).expect("create cache");
567 let addr2: Multiaddr = "/ip4/127.0.0.2/udp/8080/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
568 .parse()
569 .unwrap();
570 second_store.add_addr(addr2.clone()).await;
571 second_store
572 .sync_and_flush_to_disk()
573 .await
574 .expect("sync cache");
575
576 let file_name = BootstrapCacheStore::cache_file_name(false);
577 let cache_path = cache_data_v1::CacheData::cache_file_path(cache_dir, &file_name);
578 let cache_content = std::fs::read_to_string(&cache_path).expect("read cache file");
579 assert!(
580 cache_content.contains(&addr1.to_string())
581 && cache_content.contains(&addr2.to_string()),
582 "cache content should include both addresses"
583 );
584
585 let check_store = BootstrapCacheStore::new(config).expect("create verifying store");
586 let loaded = BootstrapCacheStore::load_cache_data(check_store.config()).expect("load");
587 let addrs: Vec<_> = loaded.get_all_addrs().cloned().collect();
588 assert!(
589 addrs
590 .iter()
591 .any(|addr| addr.to_string() == addr1.to_string())
592 && addrs
593 .iter()
594 .any(|addr| addr.to_string() == addr2.to_string()),
595 "both addresses should be present after sync"
596 );
597 }
598
599 #[tokio::test]
600 async fn test_sync_duplicates_overlapping_peers() {
601 let mut cache1 = CacheDataLatest::default();
602 let mut cache2 = CacheDataLatest::default();
603
604 let peers: Vec<PeerId> = (0..3).map(|_| PeerId::random()).collect();
605 let addr1: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
606 .parse()
607 .unwrap();
608 let addr2: Multiaddr = "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
609 .parse()
610 .unwrap();
611 let addr3: Multiaddr = "/ip4/127.0.0.3/udp/8082/quic-v1/p2p/12D3KooWCKCeqLPSgMnDjyFsJuWqREDtKNHx1JEBiwxME7Zdw68n"
612 .parse()
613 .unwrap();
614
615 cache1.add_peer(peers[0], [addr1.clone()].iter(), 10, 10);
616 cache1.add_peer(peers[1], [addr2.clone()].iter(), 10, 10);
617 cache2.add_peer(peers[1], [addr2.clone()].iter(), 10, 10);
618 cache2.add_peer(peers[2], [addr3.clone()].iter(), 10, 10);
619
620 cache1.sync(&cache2, 10, 10);
621 let result: HashSet<_> = cache1
622 .get_all_addrs()
623 .cloned()
624 .map(|addr| addr.to_string())
625 .collect();
626 assert_eq!(result.len(), 3, "should merge and deduplicate addresses");
627 assert!(result.contains(&addr1.to_string()));
628 assert!(result.contains(&addr2.to_string()));
629 assert!(result.contains(&addr3.to_string()));
630 }
631
632 #[tokio::test]
633 async fn test_sync_at_limit_overwrites_unique_peers() {
634 let mut cache1 = CacheDataLatest::default();
635 let mut cache2 = CacheDataLatest::default();
636
637 let addrs: Vec<Multiaddr> = (1..=7)
638 .map(|i| {
639 format!(
640 "/ip4/127.0.0.1/udp/808{i}/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UER{i}"
641 )
642 .parse()
643 .unwrap()
644 })
645 .collect();
646 let peers: Vec<_> = addrs
647 .iter()
648 .map(|addr| match multiaddr_get_peer_id(addr) {
649 Some(peer) => peer,
650 None => panic!("address missing peer id"),
651 })
652 .collect();
653
654 for idx in 0..5 {
655 cache1.add_peer(peers[idx], [addrs[idx].clone()].iter(), 10, 5);
656 }
657 for idx in 2..7 {
658 cache2.add_peer(peers[idx], [addrs[idx].clone()].iter(), 10, 5);
659 }
660
661 cache1.sync(&cache2, 10, 5);
662 let after: HashSet<_> = cache1.peers.iter().map(|(peer_id, _)| *peer_id).collect();
663 assert_eq!(cache1.peers.len(), 5, "should respect max peers");
664 assert!(after.contains(&peers[0]));
665 assert!(after.contains(&peers[1]));
666 }
667
668 #[tokio::test]
669 async fn test_sync_other_at_limit_self_below_limit() {
670 let mut cache1 = CacheDataLatest::default();
671 let mut cache2 = CacheDataLatest::default();
672
673 let addrs: Vec<Multiaddr> = (1..=7)
674 .map(|i| {
675 format!(
676 "/ip4/127.0.0.1/udp/908{i}/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UER{i}"
677 )
678 .parse()
679 .unwrap()
680 })
681 .collect();
682 let peers: Vec<_> = addrs
683 .iter()
684 .map(|addr| multiaddr_get_peer_id(addr).expect("peer id"))
685 .collect();
686
687 for idx in 0..2 {
688 cache1.add_peer(peers[idx], [addrs[idx].clone()].iter(), 10, 5);
689 }
690 for idx in 2..7 {
691 cache2.add_peer(peers[idx], [addrs[idx].clone()].iter(), 10, 5);
692 }
693
694 cache1.sync(&cache2, 10, 5);
695 let after: HashSet<_> = cache1.peers.iter().map(|(peer_id, _)| *peer_id).collect();
696 assert_eq!(cache1.peers.len(), 5);
697 assert!(after.contains(&peers[0]));
698 assert!(after.contains(&peers[1]));
699 }
700
701 #[tokio::test]
702 async fn test_cache_version_upgrade() {
703 let dir = TempDir::new().expect("temp dir");
704 let cache_dir = dir.path();
705
706 let mut v0_data = cache_data_v0::CacheData {
707 peers: Default::default(),
708 last_updated: SystemTime::now(),
709 network_version: crate::get_network_version(),
710 };
711 let peer_id = PeerId::random();
712 let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1"
713 .parse()
714 .expect("parse addr");
715 let boot_addr = cache_data_v0::BootstrapAddr {
716 addr: addr.clone(),
717 success_count: 1,
718 failure_count: 0,
719 last_seen: SystemTime::now(),
720 };
721 v0_data
722 .peers
723 .insert(peer_id, cache_data_v0::BootstrapAddresses(vec![boot_addr]));
724
725 let config = BootstrapConfig::default().with_cache_dir(cache_dir);
726 let filename = BootstrapCacheStore::cache_file_name(false);
727 v0_data
728 .write_to_file(cache_dir, &filename)
729 .expect("write v0 cache");
730
731 let upgraded = BootstrapCacheStore::load_cache_data(&config).expect("load cache");
732 assert!(
733 !upgraded.peers.is_empty(),
734 "peers should carry over after upgrade"
735 );
736 assert!(
737 upgraded.get_all_addrs().next().is_some(),
738 "addresses should be preserved after upgrade"
739 );
740 assert_eq!(
741 upgraded.cache_version,
742 cache_data_v1::CacheData::CACHE_DATA_VERSION.to_string()
743 );
744 }
745
746 #[tokio::test]
747 async fn test_backwards_compatible_writes() {
748 let dir = TempDir::new().expect("temp dir");
749 let cache_dir = dir.path();
750
751 let config = BootstrapConfig::default()
752 .with_cache_dir(cache_dir)
753 .with_backwards_compatible_writes(true);
754 let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
755 let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
756 .parse()
757 .unwrap();
758 cache.add_addr(addr).await;
759 cache.write().await.expect("write cache");
760
761 let filename = BootstrapCacheStore::cache_file_name(false);
762 let v0_data =
763 cache_data_v0::CacheData::read_from_file(cache_dir, &filename).expect("read v0");
764 let v1_data =
765 cache_data_v1::CacheData::read_from_file(cache_dir, &filename).expect("read v1");
766 assert!(!v0_data.peers.is_empty(), "v0 data should be populated");
767 assert!(!v1_data.peers.is_empty(), "v1 data should be populated");
768 }
769
770 #[tokio::test]
771 async fn test_version_specific_file_paths() {
772 let dir = TempDir::new().expect("temp dir");
773 let cache_dir = dir.path();
774
775 let filename = BootstrapCacheStore::cache_file_name(false);
776 let v0_path = cache_data_v0::CacheData::cache_file_path(cache_dir, &filename);
777 let v1_path = cache_data_v1::CacheData::cache_file_path(cache_dir, &filename);
778
779 assert!(
780 v1_path.to_string_lossy().contains(&format!(
781 "version_{}",
782 cache_data_v1::CacheData::CACHE_DATA_VERSION
783 )),
784 "v1 path should include version directory"
785 );
786 assert!(
787 !v0_path.to_string_lossy().contains("version_"),
788 "v0 path should not include version segment"
789 );
790 }
791}