ant_bootstrap/cache_store/
cache_data_v1.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::Error;
10
11use atomic_write_file::AtomicWriteFile;
12use libp2p::{Multiaddr, PeerId};
13use serde::{Deserialize, Serialize};
14use std::{
15    collections::VecDeque,
16    fs::{self, OpenOptions},
17    io::{Read, Write},
18    path::{Path, PathBuf},
19    time::SystemTime,
20};
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct CacheData {
24    pub peers: VecDeque<(PeerId, VecDeque<Multiaddr>)>,
25    pub last_updated: SystemTime,
26    pub network_version: String,
27    pub cache_version: String,
28}
29
30impl CacheData {
31    /// The version of the cache data format
32    /// This has to be bumped whenever the cache data format changes to ensure compatibility.
33    pub const CACHE_DATA_VERSION: u32 = 1;
34
35    /// Sync the self cache with another cache. Self peers (newer) are preserved.
36    pub fn sync(&mut self, other: &CacheData, max_addrs_per_peer: usize, max_peers: usize) {
37        let old_len = self.peers.len();
38        let other_len = other.peers.len();
39
40        for (other_peer, other_addrs) in other.peers.iter() {
41            if other_addrs.is_empty() {
42                continue;
43            }
44
45            let mut found_existing = false;
46            for (peer, addrs) in self.peers.iter_mut() {
47                if peer == other_peer {
48                    for addr in other_addrs.iter() {
49                        if !addrs.contains(addr) {
50                            addrs.push_back(addr.clone());
51                        }
52                    }
53                    while addrs.len() > max_addrs_per_peer {
54                        addrs.pop_front();
55                    }
56                    found_existing = true;
57                    break;
58                }
59            }
60
61            if !found_existing {
62                self.peers.push_back((*other_peer, other_addrs.clone()));
63            }
64
65            // break if limit reached
66            if self.peers.len() >= max_peers {
67                break;
68            }
69        }
70
71        // Apply max_peers limit by removing from back (oldest from other)
72        while self.peers.len() > max_peers {
73            self.peers.pop_back();
74        }
75
76        let new_len = self.peers.len();
77
78        info!(
79            "Synced {other_len} peers to our current {old_len:?} peers to have a final count of {new_len:?} peers"
80        );
81
82        self.last_updated = SystemTime::now();
83    }
84
85    /// Add a peer to the cachse data
86    pub fn add_peer<'a>(
87        &mut self,
88        peer_id: PeerId,
89        addrs: impl Iterator<Item = &'a Multiaddr>,
90        max_addrs_per_peer: usize,
91        max_peers: usize,
92    ) {
93        if let Some((_, present_addrs)) = self.peers.iter_mut().find(|(id, _)| id == &peer_id) {
94            for addr in addrs {
95                if !present_addrs.contains(addr) {
96                    present_addrs.push_back(addr.clone());
97                }
98            }
99            while present_addrs.len() > max_addrs_per_peer {
100                present_addrs.pop_front();
101            }
102        } else {
103            self.peers.push_back((
104                peer_id,
105                addrs
106                    .into_iter()
107                    .take(max_addrs_per_peer)
108                    .cloned()
109                    .collect(),
110            ));
111        }
112
113        while self.peers.len() > max_peers {
114            self.peers.pop_front();
115        }
116    }
117
118    pub fn get_all_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
119        self.peers
120            .iter()
121            .flat_map(|(_, bootstrap_addresses)| bootstrap_addresses.iter().next())
122    }
123
124    pub fn read_from_file(cache_dir: &Path, file_name: &str) -> Result<Self, Error> {
125        let file_path = Self::cache_file_path(cache_dir, file_name);
126        // Try to open the file with read permissions
127        let mut file = OpenOptions::new()
128            .read(true)
129            .open(&file_path)
130            .inspect_err(|err| warn!("Failed to open cache file at {file_path:?} : {err}",))?;
131
132        // Read the file contents
133        let mut contents = String::new();
134        file.read_to_string(&mut contents).inspect_err(|err| {
135            warn!("Failed to read cache file: {err}");
136        })?;
137
138        // Parse the cache data
139        let data = serde_json::from_str::<Self>(&contents).map_err(|err| {
140            warn!("Failed to parse cache data: {err}");
141            Error::FailedToParseCacheData
142        })?;
143
144        Ok(data)
145    }
146
147    pub fn write_to_file(&self, cache_dir: &Path, file_name: &str) -> Result<(), Error> {
148        let file_path = Self::cache_file_path(cache_dir, file_name);
149
150        // Create parent directory if it doesn't exist
151        if let Some(parent) = file_path.parent() {
152            fs::create_dir_all(parent)?;
153        }
154
155        let mut file = AtomicWriteFile::options()
156            .open(&file_path)
157            .inspect_err(|err| {
158                error!("Failed to open cache file at {file_path:?} using AtomicWriteFile: {err}");
159            })?;
160
161        let data = serde_json::to_string_pretty(&self).inspect_err(|err| {
162            error!("Failed to serialize cache data: {err}");
163        })?;
164        writeln!(file, "{data}")?;
165        file.commit().inspect_err(|err| {
166            error!("Failed to commit atomic write: {err}");
167        })?;
168
169        info!("Cache written to disk: {:?}", file_path);
170
171        Ok(())
172    }
173
174    pub fn cache_file_path(cache_dir: &Path, file_name: &str) -> PathBuf {
175        cache_dir
176            .join(format!("version_{}", Self::CACHE_DATA_VERSION))
177            .join(file_name)
178    }
179}
180
181impl Default for CacheData {
182    fn default() -> Self {
183        Self {
184            peers: Default::default(),
185            last_updated: SystemTime::now(),
186            network_version: crate::get_network_version(),
187            cache_version: Self::CACHE_DATA_VERSION.to_string(),
188        }
189    }
190}