ant_bootstrap/
cache_store.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::{
10    craft_valid_multiaddr, multiaddr_get_peer_id, BootstrapAddr, BootstrapAddresses,
11    BootstrapCacheConfig, Error, InitialPeersConfig, Result,
12};
13use atomic_write_file::AtomicWriteFile;
14use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
15use serde::{Deserialize, Serialize};
16use std::{
17    collections::{hash_map::Entry, HashMap},
18    fs::{self, OpenOptions},
19    io::{Read, Write},
20    path::PathBuf,
21    time::{Duration, SystemTime},
22};
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct CacheData {
26    pub peers: std::collections::HashMap<PeerId, BootstrapAddresses>,
27    pub last_updated: SystemTime,
28    pub network_version: String,
29}
30
31impl CacheData {
32    pub fn insert(&mut self, peer_id: PeerId, bootstrap_addr: BootstrapAddr) {
33        match self.peers.entry(peer_id) {
34            Entry::Occupied(mut occupied_entry) => {
35                occupied_entry.get_mut().insert_addr(&bootstrap_addr);
36            }
37            Entry::Vacant(vacant_entry) => {
38                vacant_entry.insert(BootstrapAddresses(vec![bootstrap_addr]));
39            }
40        }
41    }
42
43    /// Sync the self cache with another cache. This would just add the 'other' state to self.
44    pub fn sync(&mut self, other: &CacheData) {
45        for (peer, other_addresses_state) in other.peers.iter() {
46            let bootstrap_addresses = self
47                .peers
48                .entry(*peer)
49                .or_insert(other_addresses_state.clone());
50
51            trace!("Syncing {peer:?} from other with addrs count: {:?}. Our in memory state count: {:?}", other_addresses_state.0.len(), bootstrap_addresses.0.len());
52
53            bootstrap_addresses.sync(other_addresses_state);
54        }
55
56        self.last_updated = SystemTime::now();
57    }
58
59    /// Perform cleanup on the Peers
60    /// - Removes all the unreliable addrs for a peer
61    /// - Removes all the expired addrs for a peer
62    /// - Removes all peers with empty addrs set
63    /// - Maintains `max_addr` per peer by removing the addr with the lowest success rate
64    /// - Maintains `max_peers` in the list by removing the peer with the oldest last_seen
65    pub fn perform_cleanup(&mut self, cfg: &BootstrapCacheConfig) {
66        self.peers.values_mut().for_each(|bootstrap_addresses| {
67            bootstrap_addresses.0.retain(|bootstrap_addr| {
68                let now = SystemTime::now();
69                let has_not_expired =
70                    if let Ok(duration) = now.duration_since(bootstrap_addr.last_seen) {
71                        duration < cfg.addr_expiry_duration
72                    } else {
73                        false
74                    };
75                bootstrap_addr.is_reliable() && has_not_expired
76            })
77        });
78
79        self.peers
80            .retain(|_, bootstrap_addresses| !bootstrap_addresses.0.is_empty());
81
82        self.peers.values_mut().for_each(|bootstrap_addresses| {
83            if bootstrap_addresses.0.len() > cfg.max_addrs_per_peer {
84                // sort by lowest failure rate first
85                bootstrap_addresses
86                    .0
87                    .sort_by_key(|addr| addr.failure_rate() as u64);
88                bootstrap_addresses.0.truncate(cfg.max_addrs_per_peer);
89            }
90        });
91
92        self.try_remove_oldest_peers(cfg);
93    }
94
95    /// Remove the oldest peers until we're under the max_peers limit
96    pub fn try_remove_oldest_peers(&mut self, cfg: &BootstrapCacheConfig) {
97        if self.peers.len() > cfg.max_peers {
98            let mut peer_last_seen_map = HashMap::new();
99            for (peer, addrs) in self.peers.iter() {
100                let mut latest_seen = Duration::from_secs(u64::MAX);
101                for addr in addrs.0.iter() {
102                    if let Ok(elapsed) = addr.last_seen.elapsed() {
103                        trace!("Time elapsed for {addr:?} is {elapsed:?}");
104                        if elapsed < latest_seen {
105                            trace!("Updating latest_seen to {elapsed:?}");
106                            latest_seen = elapsed;
107                        }
108                    }
109                }
110                trace!("Last seen for {peer:?} is {latest_seen:?}");
111                peer_last_seen_map.insert(*peer, latest_seen);
112            }
113
114            while self.peers.len() > cfg.max_peers {
115                // find the peer with the largest last_seen
116                if let Some((&oldest_peer, last_seen)) = peer_last_seen_map
117                    .iter()
118                    .max_by_key(|(_, last_seen)| **last_seen)
119                {
120                    debug!("Found the oldest peer to remove: {oldest_peer:?} with last_seen of {last_seen:?}");
121                    self.peers.remove(&oldest_peer);
122                    peer_last_seen_map.remove(&oldest_peer);
123                }
124            }
125        }
126    }
127}
128
129impl Default for CacheData {
130    fn default() -> Self {
131        Self {
132            peers: std::collections::HashMap::new(),
133            last_updated: SystemTime::now(),
134            network_version: crate::get_network_version(),
135        }
136    }
137}
138
139#[derive(Clone, Debug)]
140pub struct BootstrapCacheStore {
141    pub(crate) cache_path: PathBuf,
142    pub(crate) config: BootstrapCacheConfig,
143    pub(crate) data: CacheData,
144}
145
146impl BootstrapCacheStore {
147    pub fn config(&self) -> &BootstrapCacheConfig {
148        &self.config
149    }
150
151    /// Create an empty CacheStore with the given configuration
152    pub fn new(config: BootstrapCacheConfig) -> Result<Self> {
153        info!("Creating new CacheStore with config: {:?}", config);
154        let cache_path = config.cache_file_path.clone();
155
156        // Create cache directory if it doesn't exist
157        if let Some(parent) = cache_path.parent() {
158            if !parent.exists() {
159                info!("Attempting to create cache directory at {parent:?}");
160                fs::create_dir_all(parent).inspect_err(|err| {
161                    warn!("Failed to create cache directory at {parent:?}: {err}");
162                })?;
163            }
164        }
165
166        let store = Self {
167            cache_path,
168            config,
169            data: CacheData::default(),
170        };
171
172        Ok(store)
173    }
174
175    /// Create an empty CacheStore from the given Initial Peers Configuration.
176    /// This also modifies the `BootstrapCacheConfig` if provided based on the `InitialPeersConfig`.
177    /// And also performs some actions based on the `InitialPeersConfig`.
178    ///
179    /// `InitialPeersConfig::bootstrap_cache_dir` will take precedence over the path provided inside `config`.
180    pub fn new_from_initial_peers_config(
181        init_peers_config: &InitialPeersConfig,
182        config: Option<BootstrapCacheConfig>,
183    ) -> Result<Self> {
184        let mut config = if let Some(cfg) = config {
185            cfg
186        } else {
187            BootstrapCacheConfig::default_config(init_peers_config.local)?
188        };
189        if let Some(bootstrap_cache_path) = init_peers_config.get_bootstrap_cache_path()? {
190            config.cache_file_path = bootstrap_cache_path;
191        }
192
193        let store = Self::new(config)?;
194
195        // If it is the first node, clear the cache.
196        if init_peers_config.first {
197            info!("First node in network, writing empty cache to disk");
198            store.write()?;
199        }
200
201        Ok(store)
202    }
203
204    /// Load cache data from disk
205    /// Make sure to have clean addrs inside the cache as we don't call craft_valid_multiaddr
206    pub fn load_cache_data(cfg: &BootstrapCacheConfig) -> Result<CacheData> {
207        // Try to open the file with read permissions
208        let mut file = OpenOptions::new()
209            .read(true)
210            .open(&cfg.cache_file_path)
211            .inspect_err(|err| warn!("Failed to open cache file: {err}",))?;
212
213        // Read the file contents
214        let mut contents = String::new();
215        file.read_to_string(&mut contents).inspect_err(|err| {
216            warn!("Failed to read cache file: {err}");
217        })?;
218
219        // Parse the cache data
220        let mut data = serde_json::from_str::<CacheData>(&contents).map_err(|err| {
221            warn!("Failed to parse cache data: {err}");
222            Error::FailedToParseCacheData
223        })?;
224
225        data.perform_cleanup(cfg);
226
227        Ok(data)
228    }
229
230    pub fn peer_count(&self) -> usize {
231        self.data.peers.len()
232    }
233
234    pub fn get_all_addrs(&self) -> impl Iterator<Item = &BootstrapAddr> {
235        self.data
236            .peers
237            .values()
238            .flat_map(|bootstrap_addresses| bootstrap_addresses.0.iter())
239    }
240
241    /// Get a list containing single addr per peer. We use the least faulty addr for each peer.
242    /// This list is sorted by the failure rate of the addr.
243    pub fn get_sorted_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
244        let mut addrs = self
245            .data
246            .peers
247            .values()
248            .flat_map(|bootstrap_addresses| bootstrap_addresses.get_least_faulty())
249            .collect::<Vec<_>>();
250
251        addrs.sort_by_key(|addr| addr.failure_rate() as u64);
252
253        addrs.into_iter().map(|addr| &addr.addr)
254    }
255
256    /// Update the status of an addr in the cache. The peer must be added to the cache first.
257    pub fn update_addr_status(&mut self, addr: &Multiaddr, success: bool) {
258        if let Some(peer_id) = multiaddr_get_peer_id(addr) {
259            debug!("Updating addr status: {addr} (success: {success})");
260            if let Some(bootstrap_addresses) = self.data.peers.get_mut(&peer_id) {
261                bootstrap_addresses.update_addr_status(addr, success);
262            } else {
263                debug!("Peer not found in cache to update: {addr}");
264            }
265        }
266    }
267
268    /// Add a set of addresses to the cache.
269    pub fn add_addr(&mut self, addr: Multiaddr) {
270        debug!("Trying to add new addr: {addr}");
271        let Some(addr) = craft_valid_multiaddr(&addr, false) else {
272            return;
273        };
274        let peer_id = match addr.iter().find(|p| matches!(p, Protocol::P2p(_))) {
275            Some(Protocol::P2p(id)) => id,
276            _ => return,
277        };
278
279        // Check if we already have this peer
280        if let Some(bootstrap_addrs) = self.data.peers.get_mut(&peer_id) {
281            if let Some(bootstrap_addr) = bootstrap_addrs.get_addr_mut(&addr) {
282                debug!("Updating existing peer's last_seen {addr}");
283                bootstrap_addr.last_seen = SystemTime::now();
284                return;
285            } else {
286                let mut bootstrap_addr = BootstrapAddr::new(addr.clone());
287                bootstrap_addr.success_count = 1;
288                bootstrap_addrs.insert_addr(&bootstrap_addr);
289            }
290        } else {
291            let mut bootstrap_addr = BootstrapAddr::new(addr.clone());
292            bootstrap_addr.success_count = 1;
293            self.data
294                .peers
295                .insert(peer_id, BootstrapAddresses(vec![bootstrap_addr]));
296        }
297
298        debug!("Added new peer {addr:?}, performing cleanup of old addrs");
299        self.perform_cleanup();
300    }
301
302    /// Remove a single address for a peer.
303    pub fn remove_addr(&mut self, addr: &Multiaddr) {
304        if let Some(peer_id) = multiaddr_get_peer_id(addr) {
305            if let Some(bootstrap_addresses) = self.data.peers.get_mut(&peer_id) {
306                bootstrap_addresses.remove_addr(addr);
307            } else {
308                debug!("Peer {peer_id:?} not found in the cache. Not removing addr: {addr:?}")
309            }
310        } else {
311            debug!("Could not obtain PeerId for {addr:?}, not removing addr from cache.");
312        }
313    }
314
315    pub fn perform_cleanup(&mut self) {
316        self.data.perform_cleanup(&self.config);
317    }
318
319    /// Flush the cache to disk after syncing with the CacheData from the file.
320    /// Do not perform cleanup when `data` is fetched from the network. The SystemTime might not be accurate.
321    pub fn sync_and_flush_to_disk(&mut self, with_cleanup: bool) -> Result<()> {
322        if self.config.disable_cache_writing {
323            info!("Cache writing is disabled, skipping sync to disk");
324            return Ok(());
325        }
326
327        info!(
328            "Flushing cache to disk, with data containing: {} peers",
329            self.data.peers.len(),
330        );
331
332        if let Ok(data_from_file) = Self::load_cache_data(&self.config) {
333            self.data.sync(&data_from_file);
334        } else {
335            warn!("Failed to load cache data from file, overwriting with new data");
336        }
337
338        if with_cleanup {
339            self.data.perform_cleanup(&self.config);
340            self.data.try_remove_oldest_peers(&self.config);
341        }
342
343        self.write().inspect_err(|e| {
344            error!("Failed to save cache to disk: {e}");
345        })?;
346
347        // Flush after writing
348        self.data.peers.clear();
349
350        Ok(())
351    }
352
353    /// Write the cache to disk atomically. This will overwrite the existing cache file, use sync_and_flush_to_disk to
354    /// sync with the file first.
355    pub fn write(&self) -> Result<()> {
356        debug!("Writing cache to disk: {:?}", self.cache_path);
357        // Create parent directory if it doesn't exist
358        if let Some(parent) = self.cache_path.parent() {
359            fs::create_dir_all(parent)?;
360        }
361
362        let mut file = AtomicWriteFile::options()
363            .open(&self.cache_path)
364            .inspect_err(|err| {
365                error!("Failed to open cache file using AtomicWriteFile: {err}");
366            })?;
367
368        let data = serde_json::to_string_pretty(&self.data).inspect_err(|err| {
369            error!("Failed to serialize cache data: {err}");
370        })?;
371        writeln!(file, "{data}")?;
372        file.commit().inspect_err(|err| {
373            error!("Failed to commit atomic write: {err}");
374        })?;
375
376        info!("Cache written to disk: {:?}", self.cache_path);
377
378        Ok(())
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385    use tempfile::tempdir;
386
387    async fn create_test_store() -> (BootstrapCacheStore, PathBuf) {
388        let temp_dir = tempdir().unwrap();
389        let cache_file = temp_dir.path().join("cache.json");
390
391        let config = crate::BootstrapCacheConfig::empty().with_cache_path(&cache_file);
392
393        let store = BootstrapCacheStore::new(config).unwrap();
394        (store.clone(), store.cache_path.clone())
395    }
396
397    #[tokio::test]
398    async fn test_peer_cleanup() {
399        let (mut store, _) = create_test_store().await;
400        let good_addr: Multiaddr =
401            "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
402                .parse()
403                .unwrap();
404        let bad_addr: Multiaddr =
405            "/ip4/127.0.0.1/tcp/8081/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
406                .parse()
407                .unwrap();
408
409        // Add peers
410        store.add_addr(good_addr.clone());
411        store.add_addr(bad_addr.clone());
412
413        // Make one peer reliable and one unreliable
414        store.update_addr_status(&good_addr, true);
415
416        // Fail the bad peer more times than max_retries
417        for _ in 0..5 {
418            store.update_addr_status(&bad_addr, false);
419        }
420
421        // Clean up unreliable peers
422        store.perform_cleanup();
423
424        // Get all peers (not just reliable ones)
425        let peers = store.get_all_addrs().collect::<Vec<_>>();
426        assert_eq!(peers.len(), 1);
427        assert_eq!(peers[0].addr, good_addr);
428    }
429
430    #[tokio::test]
431    async fn test_peer_not_removed_if_successful() {
432        let (mut store, _) = create_test_store().await;
433        let addr: Multiaddr =
434            "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
435                .parse()
436                .unwrap();
437
438        // Add a peer and make it successful
439        store.add_addr(addr.clone());
440        store.update_addr_status(&addr, true);
441
442        // Wait a bit
443        tokio::time::sleep(Duration::from_millis(100)).await;
444
445        // Run cleanup
446        store.perform_cleanup();
447
448        // Verify peer is still there
449        let peers = store.get_all_addrs().collect::<Vec<_>>();
450        assert_eq!(peers.len(), 1);
451        assert_eq!(peers[0].addr, addr);
452    }
453}