ant_bootstrap/cache_store/
mod.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9pub mod cache_data_v0;
10pub mod cache_data_v1;
11
12use crate::{BootstrapConfig, Error, Result, craft_valid_multiaddr};
13use libp2p::{Multiaddr, PeerId, multiaddr::Protocol};
14use rand::Rng;
15use std::{collections::HashSet, fs, sync::Arc, time::Duration};
16use tokio::sync::RwLock;
17use tracing::Instrument;
18
19pub type CacheDataLatest = cache_data_v1::CacheData;
20pub const CACHE_DATA_VERSION_LATEST: u32 = cache_data_v1::CacheData::CACHE_DATA_VERSION;
21
22#[derive(Clone, Debug)]
23pub struct BootstrapCacheStore {
24    /// Configuration for the cache store
25    pub(crate) config: Arc<BootstrapConfig>,
26    /// In-memory cache data
27    pub(crate) data: Arc<RwLock<CacheDataLatest>>,
28    /// List of peers to remove from the fs cache, would be done during the next sync_and_flush_to_disk call
29    pub(crate) to_remove: Arc<RwLock<HashSet<PeerId>>>,
30}
31
32impl BootstrapCacheStore {
33    pub fn config(&self) -> &BootstrapConfig {
34        &self.config
35    }
36
37    /// Create an empty CacheStore with the given configuration
38    pub fn new(config: BootstrapConfig) -> Result<Self> {
39        info!("Creating new CacheStore with config: {:?}", config);
40
41        // Create cache directory if it doesn't exist
42        if !config.cache_dir.exists() {
43            info!(
44                "Attempting to create cache directory at {:?}",
45                config.cache_dir
46            );
47            fs::create_dir_all(&config.cache_dir).inspect_err(|err| {
48                warn!(
49                    "Failed to create cache directory at {:?}: {err}",
50                    config.cache_dir
51                );
52            })?;
53        }
54
55        let store = Self {
56            config: Arc::new(config),
57            data: Arc::new(RwLock::new(CacheDataLatest::default())),
58            to_remove: Arc::new(RwLock::new(HashSet::new())),
59        };
60
61        Ok(store)
62    }
63
64    pub async fn peer_count(&self) -> usize {
65        self.data.read().await.peers.len()
66    }
67
68    pub async fn get_all_addrs(&self) -> Vec<Multiaddr> {
69        self.data.read().await.get_all_addrs().cloned().collect()
70    }
71
72    /// Queue a peer for removal from the cache. The actual removal will happen during the next sync_and_flush_to_disk call.
73    pub async fn queue_remove_peer(&self, peer_id: &PeerId) {
74        self.to_remove.write().await.insert(*peer_id);
75    }
76
77    /// Add an address to the cache. Note that the address must have a valid peer ID.
78    ///
79    /// We do not write P2pCircuit addresses to the cache.
80    pub async fn add_addr(&self, addr: Multiaddr) {
81        if addr.iter().any(|p| matches!(p, Protocol::P2pCircuit)) {
82            return;
83        }
84        let Some(addr) = craft_valid_multiaddr(&addr, false) else {
85            return;
86        };
87        let peer_id = match addr.iter().find(|p| matches!(p, Protocol::P2p(_))) {
88            Some(Protocol::P2p(id)) => id,
89            _ => return,
90        };
91
92        debug!("Adding addr to bootstrap cache: {addr}");
93
94        self.data.write().await.add_peer(
95            peer_id,
96            [addr].iter(),
97            self.config.max_addrs_per_cached_peer,
98            self.config.max_cached_peers,
99        );
100    }
101
102    /// Load cache data from disk
103    /// Make sure to have clean addrs inside the cache as we don't call craft_valid_multiaddr
104    pub fn load_cache_data(cfg: &BootstrapConfig) -> Result<CacheDataLatest> {
105        // try loading latest first
106        match cache_data_v1::CacheData::read_from_file(
107            &cfg.cache_dir,
108            &Self::cache_file_name(cfg.local),
109        ) {
110            Ok(mut data) => {
111                while data.peers.len() > cfg.max_cached_peers {
112                    data.peers.pop_back();
113                }
114                return Ok(data);
115            }
116            Err(err) => {
117                warn!("Failed to load cache data from latest version: {err}");
118            }
119        }
120
121        // Try loading older version
122        match cache_data_v0::CacheData::read_from_file(
123            &cfg.cache_dir,
124            &Self::cache_file_name(cfg.local),
125        ) {
126            Ok(data) => {
127                warn!("Loaded cache data from older version, upgrading to latest version");
128                let mut data: CacheDataLatest = data.into();
129                while data.peers.len() > cfg.max_cached_peers {
130                    data.peers.pop_back();
131                }
132
133                Ok(data)
134            }
135            Err(err) => {
136                warn!("Failed to load cache data from older version: {err}");
137                Err(Error::FailedToParseCacheData)
138            }
139        }
140    }
141
142    /// Flush the cache to disk after syncing with the CacheData from the file.
143    ///
144    /// Note: This clears the data in memory after writing to disk.
145    pub async fn sync_and_flush_to_disk(&self) -> Result<()> {
146        if self.config.disable_cache_writing {
147            info!("Cache writing is disabled, skipping sync to disk");
148            return Ok(());
149        }
150
151        if self.data.read().await.peers.is_empty() {
152            info!("Cache is empty, skipping sync and flush to disk");
153            return Ok(());
154        }
155
156        info!(
157            "Flushing cache to disk, with data containing: {} peers",
158            self.data.read().await.peers.len(),
159        );
160
161        if let Ok(data_from_file) = Self::load_cache_data(&self.config) {
162            self.data.write().await.sync(
163                &data_from_file,
164                self.config.max_addrs_per_cached_peer,
165                self.config.max_cached_peers,
166            );
167        } else {
168            warn!("Failed to load cache data from file, overwriting with new data");
169        }
170
171        // Remove queued peers
172        let to_remove: Vec<PeerId> = self.to_remove.write().await.drain().collect();
173        if !to_remove.is_empty() {
174            info!("Removing {} peers from cache", to_remove.len());
175            for peer_id in to_remove {
176                self.data.write().await.remove_peer(&peer_id);
177            }
178        } else {
179            debug!("No peers queued for removal from cache");
180        }
181
182        self.write().await.inspect_err(|e| {
183            error!("Failed to save cache to disk: {e}");
184        })?;
185
186        // Flush after writing
187        debug!("Clearing in-memory cache data after flush to disk");
188        self.data.write().await.peers.clear();
189
190        Ok(())
191    }
192
193    /// Write the cache to disk atomically. This will overwrite the existing cache file, use sync_and_flush_to_disk to
194    /// sync with the file first.
195    pub async fn write(&self) -> Result<()> {
196        if self.config.disable_cache_writing {
197            info!("Cache writing is disabled, skipping sync to disk");
198            return Ok(());
199        }
200
201        let filename = Self::cache_file_name(self.config.local);
202
203        self.data
204            .write()
205            .await
206            .write_to_file(&self.config.cache_dir, &filename)?;
207
208        if self.config.backwards_compatible_writes {
209            let data = self.data.read().await;
210            cache_data_v0::CacheData::from(&*data)
211                .write_to_file(&self.config.cache_dir, &filename)?;
212        }
213
214        Ok(())
215    }
216
217    /// Returns the name of the cache filename based on the local flag
218    pub fn cache_file_name(local: bool) -> String {
219        if local {
220            format!(
221                "bootstrap_cache_local_{}.json",
222                crate::get_network_version()
223            )
224        } else {
225            format!("bootstrap_cache_{}.json", crate::get_network_version())
226        }
227    }
228
229    /// Runs the sync_and_flush_to_disk method periodically
230    /// This is useful for keeping the cache up-to-date without blocking the main thread.
231    pub(crate) fn sync_and_flush_periodically(&self) -> tokio::task::JoinHandle<()> {
232        let store = self.clone();
233
234        let current_span = tracing::Span::current();
235        tokio::spawn(async move {
236            // add a variance of 10% to the interval, to avoid all nodes writing to disk at the same time.
237            let mut sleep_interval =
238                duration_with_variance(store.config.min_cache_save_duration, 10);
239            if store.config.disable_cache_writing {
240                info!("Cache writing is disabled, skipping periodic sync and flush task");
241                return;
242            }
243            info!("Starting periodic cache sync and flush task, first sync in {sleep_interval:?}");
244
245            loop {
246                tokio::time::sleep(sleep_interval).await;
247                if let Err(e) = store.sync_and_flush_to_disk().await {
248                    error!("Failed to sync and flush cache to disk: {e}");
249                }
250                // add a variance of 1% to the max interval to avoid all nodes writing to disk at the same time.
251                let max_cache_save_duration =
252                    duration_with_variance(store.config.max_cache_save_duration, 1);
253
254                let new_interval = sleep_interval
255                    .checked_mul(store.config.cache_save_scaling_factor)
256                    .unwrap_or(max_cache_save_duration);
257                sleep_interval = new_interval.min(max_cache_save_duration);
258                info!("Cache synced and flushed to disk successfully - next sync in {sleep_interval:?}");
259            }
260        }.instrument(current_span))
261    }
262}
263
264/// Returns a new duration that is within +/- variance of the provided duration.
265fn duration_with_variance(duration: Duration, variance: u32) -> Duration {
266    let variance = duration.as_secs() as f64 * (variance as f64 / 100.0);
267
268    let random_adjustment = Duration::from_secs(rand::thread_rng().gen_range(0..variance as u64));
269    if random_adjustment.as_secs().is_multiple_of(2) {
270        duration.saturating_sub(random_adjustment)
271    } else {
272        duration.saturating_add(random_adjustment)
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use crate::{
280        cache_store::{cache_data_v0, cache_data_v1},
281        multiaddr_get_peer_id,
282    };
283    use libp2p::{Multiaddr, PeerId};
284    use std::{collections::HashSet, time::SystemTime};
285    use tempfile::TempDir;
286    use tokio::{
287        task,
288        time::{Duration, sleep},
289    };
290
291    #[tokio::test]
292    async fn test_duration_variance_fn() {
293        let duration = Duration::from_secs(150);
294        let variance = 10;
295        let expected_variance = Duration::from_secs(15); // 10% of 150
296        for _ in 0..10000 {
297            let new_duration = duration_with_variance(duration, variance);
298            println!("new_duration: {new_duration:?}");
299            if new_duration < duration - expected_variance
300                || new_duration > duration + expected_variance
301            {
302                panic!("new_duration: {new_duration:?} is not within the expected range",);
303            }
304        }
305    }
306
307    fn temp_config(dir: &TempDir) -> BootstrapConfig {
308        BootstrapConfig::default().with_cache_dir(dir.path())
309    }
310
311    #[tokio::test]
312    async fn test_empty_cache() {
313        let dir = TempDir::new().expect("temp dir");
314        let config = temp_config(&dir);
315        let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
316
317        cache.write().await.expect("write empty cache");
318        let loaded = BootstrapCacheStore::load_cache_data(&config).expect("load cache");
319        assert!(loaded.peers.is_empty());
320    }
321
322    #[tokio::test]
323    async fn test_max_peer_limit_enforcement() {
324        let dir = TempDir::new().expect("temp dir");
325        let config = BootstrapConfig::default()
326            .with_cache_dir(dir.path())
327            .with_max_cached_peers(3);
328        let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
329
330        let samples = [
331            "/ip4/127.0.0.1/udp/1200/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
332            "/ip4/127.0.0.2/udp/1201/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
333            "/ip4/127.0.0.3/udp/1202/quic-v1/p2p/12D3KooWHehYgXKLxsXjzFzDqMLKhcAVc4LaktnT7Zei1G2zcpJB",
334            "/ip4/127.0.0.4/udp/1203/quic-v1/p2p/12D3KooWQF3NMWHRmMQBY8GVdpQh1V6TFYuQqZkKKvYE7yCS6fYK",
335            "/ip4/127.0.0.5/udp/1204/quic-v1/p2p/12D3KooWRi6wF7yxWLuPSNskXc6kQ5cJ6eaymeMbCRdTnMesPgFx",
336        ];
337
338        let mut recorded = Vec::new();
339        for addr_str in samples {
340            let addr: Multiaddr = addr_str.parse().unwrap();
341            recorded.push(addr.clone());
342            cache.add_addr(addr).await;
343            sleep(Duration::from_millis(5)).await;
344        }
345
346        let current = cache.get_all_addrs().await;
347        assert_eq!(current.len(), 3);
348        assert!(current.iter().all(|addr| recorded[2..].contains(addr)));
349
350        cache.write().await.expect("persist cache");
351        let persisted = BootstrapCacheStore::load_cache_data(&config)
352            .expect("load persisted")
353            .get_all_addrs()
354            .cloned()
355            .collect::<Vec<_>>();
356        assert_eq!(persisted.len(), 3);
357        assert!(persisted.iter().all(|addr| recorded[2..].contains(addr)));
358    }
359
360    #[tokio::test]
361    async fn test_queued_peer_not_removed_until_flush() {
362        let dir = TempDir::new().expect("temp dir");
363        let config = temp_config(&dir);
364        let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
365
366        let addr: Multiaddr = "/ip4/127.0.0.6/udp/1205/quic-v1/p2p/12D3KooWQnE7zXkVUEGBnJtNfR88Ujz4ezgm6bVnkvxHCzhF7S5S"
367            .parse()
368            .unwrap();
369        cache.add_addr(addr.clone()).await;
370        cache.write().await.expect("persist initial cache state");
371        let peer_id = multiaddr_get_peer_id(&addr).expect("peer id");
372        cache.queue_remove_peer(&peer_id).await;
373        let addrs_before_flush = cache.get_all_addrs().await;
374        assert_eq!(
375            addrs_before_flush.len(),
376            1,
377            "peer should remain available before flush"
378        );
379        assert_eq!(
380            addrs_before_flush[0], addr,
381            "cached address should match the inserted peer"
382        );
383
384        let persisted_before_flush =
385            BootstrapCacheStore::load_cache_data(&config).expect("load persisted cache");
386        let persisted_before_flush: Vec<_> =
387            persisted_before_flush.get_all_addrs().cloned().collect();
388        assert!(
389            persisted_before_flush.iter().any(|stored| stored == &addr),
390            "queued removal must not affect persisted cache before flush"
391        );
392
393        cache
394            .sync_and_flush_to_disk()
395            .await
396            .expect("flush cache to disk");
397
398        let persisted_after_flush =
399            BootstrapCacheStore::load_cache_data(&config).expect("load persisted cache");
400        let persisted_after_flush: Vec<_> =
401            persisted_after_flush.get_all_addrs().cloned().collect();
402        assert!(
403            persisted_after_flush.iter().all(|stored| stored != &addr),
404            "peer should be absent from persisted cache after flush"
405        );
406    }
407
408    #[tokio::test]
409    async fn test_queued_peer_removal_queue_drained_after_flush() {
410        let dir = TempDir::new().expect("temp dir");
411        let config = temp_config(&dir);
412        let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
413
414        let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
415            .parse()
416            .unwrap();
417        cache.add_addr(addr.clone()).await;
418        cache.write().await.expect("persist initial cache state");
419
420        let peer_id = multiaddr_get_peer_id(&addr).expect("peer id");
421        cache.queue_remove_peer(&peer_id).await;
422        cache
423            .sync_and_flush_to_disk()
424            .await
425            .expect("flush queued removals");
426        let persisted_after_removal =
427            BootstrapCacheStore::load_cache_data(&config).expect("load cache data");
428        assert!(
429            persisted_after_removal.get_all_addrs().next().is_none(),
430            "peer should be removed from persisted cache after flush"
431        );
432
433        cache.add_addr(addr.clone()).await;
434
435        cache
436            .sync_and_flush_to_disk()
437            .await
438            .expect("flush cache after re-adding peer");
439        let persisted_after_re_add =
440            BootstrapCacheStore::load_cache_data(&config).expect("load cache data");
441        let persisted_after_re_add: Vec<_> =
442            persisted_after_re_add.get_all_addrs().cloned().collect();
443        assert_eq!(
444            persisted_after_re_add.len(),
445            1,
446            "re-added peer should persist after subsequent flush"
447        );
448        assert_eq!(
449            persisted_after_re_add[0], addr,
450            "persisted address should match the re-added peer"
451        );
452    }
453
454    #[tokio::test]
455    async fn test_cache_file_corruption() {
456        let dir = TempDir::new().expect("temp dir");
457        let cache_dir = dir.path();
458        let config = BootstrapConfig::default().with_cache_dir(cache_dir);
459        let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
460
461        let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
462            .parse()
463            .unwrap();
464        cache.add_addr(addr).await;
465        cache.write().await.expect("write cache");
466
467        let corrupted_path = cache_dir
468            .join(format!(
469                "version_{}",
470                cache_data_v1::CacheData::CACHE_DATA_VERSION
471            ))
472            .join(BootstrapCacheStore::cache_file_name(false));
473        std::fs::write(&corrupted_path, "{not valid json}").expect("corrupt file");
474
475        let load_err = BootstrapCacheStore::load_cache_data(&config);
476        assert!(load_err.is_err(), "loading corrupted cache should error");
477
478        let new_store =
479            BootstrapCacheStore::new(config.clone()).expect("create store after corruption");
480        assert_eq!(
481            new_store.peer_count().await,
482            0,
483            "new cache should start empty after corruption"
484        );
485        new_store.write().await.expect("write clean cache");
486
487        let reloaded = BootstrapCacheStore::load_cache_data(&config).expect("reload cache");
488        assert!(
489            reloaded.peers.is_empty(),
490            "cache data should be empty after regeneration"
491        );
492    }
493
494    #[tokio::test]
495    async fn test_max_addrs_per_peer() {
496        let dir = TempDir::new().expect("temp dir");
497        let config = BootstrapConfig::default()
498            .with_cache_dir(dir.path())
499            .with_max_addrs_per_cached_peer(2);
500        let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
501
502        let peer_id = "12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE";
503        for octet in 1..=4 {
504            let addr: Multiaddr = format!("/ip4/127.0.0.{octet}/udp/8080/quic-v1/p2p/{peer_id}")
505                .parse()
506                .unwrap();
507            cache.add_addr(addr).await;
508        }
509
510        cache.write().await.expect("write cache");
511        let reloaded = BootstrapCacheStore::load_cache_data(&config).expect("load cache");
512        let collected: Vec<_> = reloaded.get_all_addrs().cloned().collect();
513        assert!(
514            collected.len() <= 2,
515            "should honor max_addrs_per_peer limit"
516        );
517    }
518
519    #[tokio::test]
520    async fn test_concurrent_cache_access() {
521        let dir = TempDir::new().expect("temp dir");
522        let cache_dir = dir.path().to_path_buf();
523        let config = BootstrapConfig::default().with_cache_dir(cache_dir.clone());
524
525        let mut handles = Vec::new();
526        for idx in 0..5 {
527            let config_clone = config.clone();
528            handles.push(task::spawn(async move {
529                let store = BootstrapCacheStore::new(config_clone)?;
530                let addr: Multiaddr = format!(
531                    "/ip4/127.0.0.{}/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UER{}",
532                    idx + 1,
533                    idx + 1
534                )
535                .parse()
536                .unwrap();
537                store.add_addr(addr).await;
538                sleep(Duration::from_millis(10)).await;
539                store.sync_and_flush_to_disk().await
540            }));
541        }
542
543        for handle in handles {
544            let result = handle.await.expect("task join");
545            result.expect("task result");
546        }
547
548        let final_store = BootstrapCacheStore::new(config).expect("create final store");
549        let loaded = BootstrapCacheStore::load_cache_data(final_store.config()).expect("load");
550        assert_eq!(loaded.peers.len(), 5, "should persist peers from all tasks");
551    }
552
553    #[tokio::test]
554    async fn test_cache_sync_functionality() {
555        let dir = TempDir::new().expect("temp dir");
556        let cache_dir = dir.path();
557
558        let config = BootstrapConfig::default().with_cache_dir(cache_dir);
559        let first_store = BootstrapCacheStore::new(config.clone()).expect("create cache");
560        let addr1: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
561            .parse()
562            .unwrap();
563        first_store.add_addr(addr1.clone()).await;
564        first_store.write().await.expect("write first cache");
565
566        let second_store = BootstrapCacheStore::new(config.clone()).expect("create cache");
567        let addr2: Multiaddr = "/ip4/127.0.0.2/udp/8080/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
568            .parse()
569            .unwrap();
570        second_store.add_addr(addr2.clone()).await;
571        second_store
572            .sync_and_flush_to_disk()
573            .await
574            .expect("sync cache");
575
576        let file_name = BootstrapCacheStore::cache_file_name(false);
577        let cache_path = cache_data_v1::CacheData::cache_file_path(cache_dir, &file_name);
578        let cache_content = std::fs::read_to_string(&cache_path).expect("read cache file");
579        assert!(
580            cache_content.contains(&addr1.to_string())
581                && cache_content.contains(&addr2.to_string()),
582            "cache content should include both addresses"
583        );
584
585        let check_store = BootstrapCacheStore::new(config).expect("create verifying store");
586        let loaded = BootstrapCacheStore::load_cache_data(check_store.config()).expect("load");
587        let addrs: Vec<_> = loaded.get_all_addrs().cloned().collect();
588        assert!(
589            addrs
590                .iter()
591                .any(|addr| addr.to_string() == addr1.to_string())
592                && addrs
593                    .iter()
594                    .any(|addr| addr.to_string() == addr2.to_string()),
595            "both addresses should be present after sync"
596        );
597    }
598
599    #[tokio::test]
600    async fn test_sync_duplicates_overlapping_peers() {
601        let mut cache1 = CacheDataLatest::default();
602        let mut cache2 = CacheDataLatest::default();
603
604        let peers: Vec<PeerId> = (0..3).map(|_| PeerId::random()).collect();
605        let addr1: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
606            .parse()
607            .unwrap();
608        let addr2: Multiaddr = "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
609            .parse()
610            .unwrap();
611        let addr3: Multiaddr = "/ip4/127.0.0.3/udp/8082/quic-v1/p2p/12D3KooWCKCeqLPSgMnDjyFsJuWqREDtKNHx1JEBiwxME7Zdw68n"
612            .parse()
613            .unwrap();
614
615        cache1.add_peer(peers[0], [addr1.clone()].iter(), 10, 10);
616        cache1.add_peer(peers[1], [addr2.clone()].iter(), 10, 10);
617        cache2.add_peer(peers[1], [addr2.clone()].iter(), 10, 10);
618        cache2.add_peer(peers[2], [addr3.clone()].iter(), 10, 10);
619
620        cache1.sync(&cache2, 10, 10);
621        let result: HashSet<_> = cache1
622            .get_all_addrs()
623            .cloned()
624            .map(|addr| addr.to_string())
625            .collect();
626        assert_eq!(result.len(), 3, "should merge and deduplicate addresses");
627        assert!(result.contains(&addr1.to_string()));
628        assert!(result.contains(&addr2.to_string()));
629        assert!(result.contains(&addr3.to_string()));
630    }
631
632    #[tokio::test]
633    async fn test_sync_at_limit_overwrites_unique_peers() {
634        let mut cache1 = CacheDataLatest::default();
635        let mut cache2 = CacheDataLatest::default();
636
637        let addrs: Vec<Multiaddr> = (1..=7)
638            .map(|i| {
639                format!(
640                    "/ip4/127.0.0.1/udp/808{i}/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UER{i}"
641                )
642                .parse()
643                .unwrap()
644            })
645            .collect();
646        let peers: Vec<_> = addrs
647            .iter()
648            .map(|addr| match multiaddr_get_peer_id(addr) {
649                Some(peer) => peer,
650                None => panic!("address missing peer id"),
651            })
652            .collect();
653
654        for idx in 0..5 {
655            cache1.add_peer(peers[idx], [addrs[idx].clone()].iter(), 10, 5);
656        }
657        for idx in 2..7 {
658            cache2.add_peer(peers[idx], [addrs[idx].clone()].iter(), 10, 5);
659        }
660
661        cache1.sync(&cache2, 10, 5);
662        let after: HashSet<_> = cache1.peers.iter().map(|(peer_id, _)| *peer_id).collect();
663        assert_eq!(cache1.peers.len(), 5, "should respect max peers");
664        assert!(after.contains(&peers[0]));
665        assert!(after.contains(&peers[1]));
666    }
667
668    #[tokio::test]
669    async fn test_sync_other_at_limit_self_below_limit() {
670        let mut cache1 = CacheDataLatest::default();
671        let mut cache2 = CacheDataLatest::default();
672
673        let addrs: Vec<Multiaddr> = (1..=7)
674            .map(|i| {
675                format!(
676                    "/ip4/127.0.0.1/udp/908{i}/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UER{i}"
677                )
678                .parse()
679                .unwrap()
680            })
681            .collect();
682        let peers: Vec<_> = addrs
683            .iter()
684            .map(|addr| multiaddr_get_peer_id(addr).expect("peer id"))
685            .collect();
686
687        for idx in 0..2 {
688            cache1.add_peer(peers[idx], [addrs[idx].clone()].iter(), 10, 5);
689        }
690        for idx in 2..7 {
691            cache2.add_peer(peers[idx], [addrs[idx].clone()].iter(), 10, 5);
692        }
693
694        cache1.sync(&cache2, 10, 5);
695        let after: HashSet<_> = cache1.peers.iter().map(|(peer_id, _)| *peer_id).collect();
696        assert_eq!(cache1.peers.len(), 5);
697        assert!(after.contains(&peers[0]));
698        assert!(after.contains(&peers[1]));
699    }
700
701    #[tokio::test]
702    async fn test_cache_version_upgrade() {
703        let dir = TempDir::new().expect("temp dir");
704        let cache_dir = dir.path();
705
706        let mut v0_data = cache_data_v0::CacheData {
707            peers: Default::default(),
708            last_updated: SystemTime::now(),
709            network_version: crate::get_network_version(),
710        };
711        let peer_id = PeerId::random();
712        let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1"
713            .parse()
714            .expect("parse addr");
715        let boot_addr = cache_data_v0::BootstrapAddr {
716            addr: addr.clone(),
717            success_count: 1,
718            failure_count: 0,
719            last_seen: SystemTime::now(),
720        };
721        v0_data
722            .peers
723            .insert(peer_id, cache_data_v0::BootstrapAddresses(vec![boot_addr]));
724
725        let config = BootstrapConfig::default().with_cache_dir(cache_dir);
726        let filename = BootstrapCacheStore::cache_file_name(false);
727        v0_data
728            .write_to_file(cache_dir, &filename)
729            .expect("write v0 cache");
730
731        let upgraded = BootstrapCacheStore::load_cache_data(&config).expect("load cache");
732        assert!(
733            !upgraded.peers.is_empty(),
734            "peers should carry over after upgrade"
735        );
736        assert!(
737            upgraded.get_all_addrs().next().is_some(),
738            "addresses should be preserved after upgrade"
739        );
740        assert_eq!(
741            upgraded.cache_version,
742            cache_data_v1::CacheData::CACHE_DATA_VERSION.to_string()
743        );
744    }
745
746    #[tokio::test]
747    async fn test_backwards_compatible_writes() {
748        let dir = TempDir::new().expect("temp dir");
749        let cache_dir = dir.path();
750
751        let config = BootstrapConfig::default()
752            .with_cache_dir(cache_dir)
753            .with_backwards_compatible_writes(true);
754        let cache = BootstrapCacheStore::new(config.clone()).expect("create cache");
755        let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
756            .parse()
757            .unwrap();
758        cache.add_addr(addr).await;
759        cache.write().await.expect("write cache");
760
761        let filename = BootstrapCacheStore::cache_file_name(false);
762        let v0_data =
763            cache_data_v0::CacheData::read_from_file(cache_dir, &filename).expect("read v0");
764        let v1_data =
765            cache_data_v1::CacheData::read_from_file(cache_dir, &filename).expect("read v1");
766        assert!(!v0_data.peers.is_empty(), "v0 data should be populated");
767        assert!(!v1_data.peers.is_empty(), "v1 data should be populated");
768    }
769
770    #[tokio::test]
771    async fn test_version_specific_file_paths() {
772        let dir = TempDir::new().expect("temp dir");
773        let cache_dir = dir.path();
774
775        let filename = BootstrapCacheStore::cache_file_name(false);
776        let v0_path = cache_data_v0::CacheData::cache_file_path(cache_dir, &filename);
777        let v1_path = cache_data_v1::CacheData::cache_file_path(cache_dir, &filename);
778
779        assert!(
780            v1_path.to_string_lossy().contains(&format!(
781                "version_{}",
782                cache_data_v1::CacheData::CACHE_DATA_VERSION
783            )),
784            "v1 path should include version directory"
785        );
786        assert!(
787            !v0_path.to_string_lossy().contains("version_"),
788            "v0 path should not include version segment"
789        );
790    }
791}