ant_bootstrap/
cache_store.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::{
10    craft_valid_multiaddr, multiaddr_get_peer_id, BootstrapAddr, BootstrapAddresses,
11    BootstrapCacheConfig, Error, InitialPeersConfig, Result,
12};
13use atomic_write_file::AtomicWriteFile;
14use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
15use serde::{Deserialize, Serialize};
16use std::{
17    collections::{hash_map::Entry, HashMap},
18    fs::{self, OpenOptions},
19    io::{Read, Write},
20    path::PathBuf,
21    time::{Duration, SystemTime},
22};
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct CacheData {
26    pub peers: std::collections::HashMap<PeerId, BootstrapAddresses>,
27    pub last_updated: SystemTime,
28    pub network_version: String,
29}
30
31impl CacheData {
32    pub fn insert(&mut self, peer_id: PeerId, bootstrap_addr: BootstrapAddr) {
33        match self.peers.entry(peer_id) {
34            Entry::Occupied(mut occupied_entry) => {
35                occupied_entry.get_mut().insert_addr(&bootstrap_addr);
36            }
37            Entry::Vacant(vacant_entry) => {
38                vacant_entry.insert(BootstrapAddresses(vec![bootstrap_addr]));
39            }
40        }
41    }
42
43    /// Sync the self cache with another cache. This would just add the 'other' state to self.
44    pub fn sync(&mut self, other: &CacheData) {
45        for (peer, other_addresses_state) in other.peers.iter() {
46            let bootstrap_addresses = self
47                .peers
48                .entry(*peer)
49                .or_insert(other_addresses_state.clone());
50
51            trace!("Syncing {peer:?} from other with addrs count: {:?}. Our in memory state count: {:?}", other_addresses_state.0.len(), bootstrap_addresses.0.len());
52
53            bootstrap_addresses.sync(other_addresses_state);
54        }
55
56        self.last_updated = SystemTime::now();
57    }
58
59    /// Remove the oldest peers until we're under the max_peers limit
60    pub fn try_remove_oldest_peers(&mut self, cfg: &BootstrapCacheConfig) {
61        if self.peers.len() > cfg.max_peers {
62            let mut peer_last_seen_map = HashMap::new();
63            for (peer, addrs) in self.peers.iter() {
64                let mut latest_seen = Duration::from_secs(u64::MAX);
65                for addr in addrs.0.iter() {
66                    if let Ok(elapsed) = addr.last_seen.elapsed() {
67                        trace!("Time elapsed for {addr:?} is {elapsed:?}");
68                        if elapsed < latest_seen {
69                            trace!("Updating latest_seen to {elapsed:?}");
70                            latest_seen = elapsed;
71                        }
72                    }
73                }
74                trace!("Last seen for {peer:?} is {latest_seen:?}");
75                peer_last_seen_map.insert(*peer, latest_seen);
76            }
77
78            while self.peers.len() > cfg.max_peers {
79                // find the peer with the largest last_seen
80                if let Some((&oldest_peer, last_seen)) = peer_last_seen_map
81                    .iter()
82                    .max_by_key(|(_, last_seen)| **last_seen)
83                {
84                    debug!("Found the oldest peer to remove: {oldest_peer:?} with last_seen of {last_seen:?}");
85                    self.peers.remove(&oldest_peer);
86                    peer_last_seen_map.remove(&oldest_peer);
87                }
88            }
89        }
90    }
91}
92
93impl Default for CacheData {
94    fn default() -> Self {
95        Self {
96            peers: std::collections::HashMap::new(),
97            last_updated: SystemTime::now(),
98            network_version: crate::get_network_version(),
99        }
100    }
101}
102
103#[derive(Clone, Debug)]
104pub struct BootstrapCacheStore {
105    pub(crate) cache_path: PathBuf,
106    pub(crate) config: BootstrapCacheConfig,
107    pub(crate) data: CacheData,
108}
109
110impl BootstrapCacheStore {
111    pub fn config(&self) -> &BootstrapCacheConfig {
112        &self.config
113    }
114
115    /// Create an empty CacheStore with the given configuration
116    pub fn new(config: BootstrapCacheConfig) -> Result<Self> {
117        info!("Creating new CacheStore with config: {:?}", config);
118        let cache_path = config.cache_file_path.clone();
119
120        // Create cache directory if it doesn't exist
121        if let Some(parent) = cache_path.parent() {
122            if !parent.exists() {
123                info!("Attempting to create cache directory at {parent:?}");
124                fs::create_dir_all(parent).inspect_err(|err| {
125                    warn!("Failed to create cache directory at {parent:?}: {err}");
126                })?;
127            }
128        }
129
130        let store = Self {
131            cache_path,
132            config,
133            data: CacheData::default(),
134        };
135
136        Ok(store)
137    }
138
139    /// Create an empty CacheStore from the given Initial Peers Configuration.
140    /// This also modifies the `BootstrapCacheConfig` if provided based on the `InitialPeersConfig`.
141    /// And also performs some actions based on the `InitialPeersConfig`.
142    ///
143    /// `InitialPeersConfig::bootstrap_cache_dir` will take precedence over the path provided inside `config`.
144    pub fn new_from_initial_peers_config(
145        init_peers_config: &InitialPeersConfig,
146        config: Option<BootstrapCacheConfig>,
147    ) -> Result<Self> {
148        let mut config = if let Some(cfg) = config {
149            cfg
150        } else {
151            BootstrapCacheConfig::default_config(init_peers_config.local)?
152        };
153        if let Some(bootstrap_cache_path) = init_peers_config.get_bootstrap_cache_path()? {
154            config.cache_file_path = bootstrap_cache_path;
155        }
156
157        let mut store = Self::new(config)?;
158
159        // If it is the first node, clear the cache.
160        if init_peers_config.first {
161            info!("First node in network, writing empty cache to disk");
162            store.write()?;
163        } else {
164            info!("Flushing cache to disk on init.");
165            store.sync_and_flush_to_disk()?;
166        }
167
168        Ok(store)
169    }
170
171    /// Load cache data from disk
172    /// Make sure to have clean addrs inside the cache as we don't call craft_valid_multiaddr
173    pub fn load_cache_data(cfg: &BootstrapCacheConfig) -> Result<CacheData> {
174        // Try to open the file with read permissions
175        let mut file = OpenOptions::new()
176            .read(true)
177            .open(&cfg.cache_file_path)
178            .inspect_err(|err| warn!("Failed to open cache file: {err}",))?;
179
180        // Read the file contents
181        let mut contents = String::new();
182        file.read_to_string(&mut contents).inspect_err(|err| {
183            warn!("Failed to read cache file: {err}");
184        })?;
185
186        // Parse the cache data
187        let mut data = serde_json::from_str::<CacheData>(&contents).map_err(|err| {
188            warn!("Failed to parse cache data: {err}");
189            Error::FailedToParseCacheData
190        })?;
191
192        data.try_remove_oldest_peers(cfg);
193
194        Ok(data)
195    }
196
197    pub fn peer_count(&self) -> usize {
198        self.data.peers.len()
199    }
200
201    pub fn get_all_addrs(&self) -> impl Iterator<Item = &BootstrapAddr> {
202        self.data
203            .peers
204            .values()
205            .flat_map(|bootstrap_addresses| bootstrap_addresses.0.iter())
206    }
207
208    /// Get a list containing single addr per peer. We use the least faulty addr for each peer.
209    /// This list is sorted by the failure rate of the addr.
210    pub fn get_sorted_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
211        let mut addrs = self
212            .data
213            .peers
214            .values()
215            .flat_map(|bootstrap_addresses| bootstrap_addresses.get_least_faulty())
216            .collect::<Vec<_>>();
217
218        addrs.sort_by_key(|addr| addr.failure_rate() as u64);
219
220        addrs.into_iter().map(|addr| &addr.addr)
221    }
222
223    /// Update the status of an addr in the cache. The peer must be added to the cache first.
224    pub fn update_addr_status(&mut self, addr: &Multiaddr, success: bool) {
225        if let Some(peer_id) = multiaddr_get_peer_id(addr) {
226            debug!("Updating addr status: {addr} (success: {success})");
227            if let Some(bootstrap_addresses) = self.data.peers.get_mut(&peer_id) {
228                bootstrap_addresses.update_addr_status(addr, success);
229            } else {
230                debug!("Peer not found in cache to update: {addr}");
231            }
232        }
233    }
234
235    /// Add a set of addresses to the cache.
236    pub fn add_addr(&mut self, addr: Multiaddr) {
237        debug!("Trying to add new addr: {addr}");
238        let Some(addr) = craft_valid_multiaddr(&addr, false) else {
239            return;
240        };
241        let peer_id = match addr.iter().find(|p| matches!(p, Protocol::P2p(_))) {
242            Some(Protocol::P2p(id)) => id,
243            _ => return,
244        };
245
246        if addr.iter().any(|p| matches!(p, Protocol::P2pCircuit)) {
247            debug!("Not adding relay address to the cache: {addr}");
248            return;
249        }
250
251        // Check if we already have this peer
252        if let Some(bootstrap_addrs) = self.data.peers.get_mut(&peer_id) {
253            if let Some(bootstrap_addr) = bootstrap_addrs.get_addr_mut(&addr) {
254                debug!("Updating existing peer's last_seen {addr}");
255                bootstrap_addr.last_seen = SystemTime::now();
256                return;
257            } else {
258                let mut bootstrap_addr = BootstrapAddr::new(addr.clone());
259                bootstrap_addr.success_count = 1;
260                bootstrap_addrs.insert_addr(&bootstrap_addr);
261            }
262        } else {
263            let mut bootstrap_addr = BootstrapAddr::new(addr.clone());
264            bootstrap_addr.success_count = 1;
265            self.data
266                .peers
267                .insert(peer_id, BootstrapAddresses(vec![bootstrap_addr]));
268        }
269
270        debug!("Added new peer {addr:?}, performing cleanup of old addrs");
271        self.try_remove_oldest_peers();
272    }
273
274    /// Remove a single address for a peer.
275    pub fn remove_addr(&mut self, addr: &Multiaddr) {
276        if let Some(peer_id) = multiaddr_get_peer_id(addr) {
277            if let Some(bootstrap_addresses) = self.data.peers.get_mut(&peer_id) {
278                bootstrap_addresses.remove_addr(addr);
279            } else {
280                debug!("Peer {peer_id:?} not found in the cache. Not removing addr: {addr:?}")
281            }
282        } else {
283            debug!("Could not obtain PeerId for {addr:?}, not removing addr from cache.");
284        }
285    }
286
287    pub fn try_remove_oldest_peers(&mut self) {
288        self.data.try_remove_oldest_peers(&self.config);
289    }
290
291    /// Flush the cache to disk after syncing with the CacheData from the file.
292    pub fn sync_and_flush_to_disk(&mut self) -> Result<()> {
293        if self.config.disable_cache_writing {
294            info!("Cache writing is disabled, skipping sync to disk");
295            return Ok(());
296        }
297
298        info!(
299            "Flushing cache to disk, with data containing: {} peers",
300            self.data.peers.len(),
301        );
302
303        if let Ok(data_from_file) = Self::load_cache_data(&self.config) {
304            self.data.sync(&data_from_file);
305        } else {
306            warn!("Failed to load cache data from file, overwriting with new data");
307        }
308
309        self.data.try_remove_oldest_peers(&self.config);
310
311        self.write().inspect_err(|e| {
312            error!("Failed to save cache to disk: {e}");
313        })?;
314
315        // Flush after writing
316        self.data.peers.clear();
317
318        Ok(())
319    }
320
321    /// Write the cache to disk atomically. This will overwrite the existing cache file, use sync_and_flush_to_disk to
322    /// sync with the file first.
323    pub fn write(&self) -> Result<()> {
324        debug!("Writing cache to disk: {:?}", self.cache_path);
325        // Create parent directory if it doesn't exist
326        if let Some(parent) = self.cache_path.parent() {
327            fs::create_dir_all(parent)?;
328        }
329
330        let mut file = AtomicWriteFile::options()
331            .open(&self.cache_path)
332            .inspect_err(|err| {
333                error!("Failed to open cache file using AtomicWriteFile: {err}");
334            })?;
335
336        let data = serde_json::to_string_pretty(&self.data).inspect_err(|err| {
337            error!("Failed to serialize cache data: {err}");
338        })?;
339        writeln!(file, "{data}")?;
340        file.commit().inspect_err(|err| {
341            error!("Failed to commit atomic write: {err}");
342        })?;
343
344        info!("Cache written to disk: {:?}", self.cache_path);
345
346        Ok(())
347    }
348}