ant_bootstrap/
cache_store.rs1use crate::{
10 craft_valid_multiaddr, multiaddr_get_peer_id, BootstrapAddr, BootstrapAddresses,
11 BootstrapCacheConfig, Error, InitialPeersConfig, Result,
12};
13use atomic_write_file::AtomicWriteFile;
14use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
15use serde::{Deserialize, Serialize};
16use std::{
17 collections::{hash_map::Entry, HashMap},
18 fs::{self, OpenOptions},
19 io::{Read, Write},
20 path::PathBuf,
21 time::{Duration, SystemTime},
22};
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct CacheData {
26 pub peers: std::collections::HashMap<PeerId, BootstrapAddresses>,
27 pub last_updated: SystemTime,
28 pub network_version: String,
29}
30
31impl CacheData {
32 pub fn insert(&mut self, peer_id: PeerId, bootstrap_addr: BootstrapAddr) {
33 match self.peers.entry(peer_id) {
34 Entry::Occupied(mut occupied_entry) => {
35 occupied_entry.get_mut().insert_addr(&bootstrap_addr);
36 }
37 Entry::Vacant(vacant_entry) => {
38 vacant_entry.insert(BootstrapAddresses(vec![bootstrap_addr]));
39 }
40 }
41 }
42
43 pub fn sync(&mut self, other: &CacheData) {
45 for (peer, other_addresses_state) in other.peers.iter() {
46 let bootstrap_addresses = self
47 .peers
48 .entry(*peer)
49 .or_insert(other_addresses_state.clone());
50
51 trace!("Syncing {peer:?} from other with addrs count: {:?}. Our in memory state count: {:?}", other_addresses_state.0.len(), bootstrap_addresses.0.len());
52
53 bootstrap_addresses.sync(other_addresses_state);
54 }
55
56 self.last_updated = SystemTime::now();
57 }
58
59 pub fn try_remove_oldest_peers(&mut self, cfg: &BootstrapCacheConfig) {
61 if self.peers.len() > cfg.max_peers {
62 let mut peer_last_seen_map = HashMap::new();
63 for (peer, addrs) in self.peers.iter() {
64 let mut latest_seen = Duration::from_secs(u64::MAX);
65 for addr in addrs.0.iter() {
66 if let Ok(elapsed) = addr.last_seen.elapsed() {
67 trace!("Time elapsed for {addr:?} is {elapsed:?}");
68 if elapsed < latest_seen {
69 trace!("Updating latest_seen to {elapsed:?}");
70 latest_seen = elapsed;
71 }
72 }
73 }
74 trace!("Last seen for {peer:?} is {latest_seen:?}");
75 peer_last_seen_map.insert(*peer, latest_seen);
76 }
77
78 while self.peers.len() > cfg.max_peers {
79 if let Some((&oldest_peer, last_seen)) = peer_last_seen_map
81 .iter()
82 .max_by_key(|(_, last_seen)| **last_seen)
83 {
84 debug!("Found the oldest peer to remove: {oldest_peer:?} with last_seen of {last_seen:?}");
85 self.peers.remove(&oldest_peer);
86 peer_last_seen_map.remove(&oldest_peer);
87 }
88 }
89 }
90 }
91}
92
93impl Default for CacheData {
94 fn default() -> Self {
95 Self {
96 peers: std::collections::HashMap::new(),
97 last_updated: SystemTime::now(),
98 network_version: crate::get_network_version(),
99 }
100 }
101}
102
103#[derive(Clone, Debug)]
104pub struct BootstrapCacheStore {
105 pub(crate) cache_path: PathBuf,
106 pub(crate) config: BootstrapCacheConfig,
107 pub(crate) data: CacheData,
108}
109
110impl BootstrapCacheStore {
111 pub fn config(&self) -> &BootstrapCacheConfig {
112 &self.config
113 }
114
115 pub fn new(config: BootstrapCacheConfig) -> Result<Self> {
117 info!("Creating new CacheStore with config: {:?}", config);
118 let cache_path = config.cache_file_path.clone();
119
120 if let Some(parent) = cache_path.parent() {
122 if !parent.exists() {
123 info!("Attempting to create cache directory at {parent:?}");
124 fs::create_dir_all(parent).inspect_err(|err| {
125 warn!("Failed to create cache directory at {parent:?}: {err}");
126 })?;
127 }
128 }
129
130 let store = Self {
131 cache_path,
132 config,
133 data: CacheData::default(),
134 };
135
136 Ok(store)
137 }
138
139 pub fn new_from_initial_peers_config(
145 init_peers_config: &InitialPeersConfig,
146 config: Option<BootstrapCacheConfig>,
147 ) -> Result<Self> {
148 let mut config = if let Some(cfg) = config {
149 cfg
150 } else {
151 BootstrapCacheConfig::default_config(init_peers_config.local)?
152 };
153 if let Some(bootstrap_cache_path) = init_peers_config.get_bootstrap_cache_path()? {
154 config.cache_file_path = bootstrap_cache_path;
155 }
156
157 let mut store = Self::new(config)?;
158
159 if init_peers_config.first {
161 info!("First node in network, writing empty cache to disk");
162 store.write()?;
163 } else {
164 info!("Flushing cache to disk on init.");
165 store.sync_and_flush_to_disk()?;
166 }
167
168 Ok(store)
169 }
170
171 pub fn load_cache_data(cfg: &BootstrapCacheConfig) -> Result<CacheData> {
174 let mut file = OpenOptions::new()
176 .read(true)
177 .open(&cfg.cache_file_path)
178 .inspect_err(|err| warn!("Failed to open cache file: {err}",))?;
179
180 let mut contents = String::new();
182 file.read_to_string(&mut contents).inspect_err(|err| {
183 warn!("Failed to read cache file: {err}");
184 })?;
185
186 let mut data = serde_json::from_str::<CacheData>(&contents).map_err(|err| {
188 warn!("Failed to parse cache data: {err}");
189 Error::FailedToParseCacheData
190 })?;
191
192 data.try_remove_oldest_peers(cfg);
193
194 Ok(data)
195 }
196
197 pub fn peer_count(&self) -> usize {
198 self.data.peers.len()
199 }
200
201 pub fn get_all_addrs(&self) -> impl Iterator<Item = &BootstrapAddr> {
202 self.data
203 .peers
204 .values()
205 .flat_map(|bootstrap_addresses| bootstrap_addresses.0.iter())
206 }
207
208 pub fn get_sorted_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
211 let mut addrs = self
212 .data
213 .peers
214 .values()
215 .flat_map(|bootstrap_addresses| bootstrap_addresses.get_least_faulty())
216 .collect::<Vec<_>>();
217
218 addrs.sort_by_key(|addr| addr.failure_rate() as u64);
219
220 addrs.into_iter().map(|addr| &addr.addr)
221 }
222
223 pub fn update_addr_status(&mut self, addr: &Multiaddr, success: bool) {
225 if let Some(peer_id) = multiaddr_get_peer_id(addr) {
226 debug!("Updating addr status: {addr} (success: {success})");
227 if let Some(bootstrap_addresses) = self.data.peers.get_mut(&peer_id) {
228 bootstrap_addresses.update_addr_status(addr, success);
229 } else {
230 debug!("Peer not found in cache to update: {addr}");
231 }
232 }
233 }
234
235 pub fn add_addr(&mut self, addr: Multiaddr) {
237 debug!("Trying to add new addr: {addr}");
238 let Some(addr) = craft_valid_multiaddr(&addr, false) else {
239 return;
240 };
241 let peer_id = match addr.iter().find(|p| matches!(p, Protocol::P2p(_))) {
242 Some(Protocol::P2p(id)) => id,
243 _ => return,
244 };
245
246 if addr.iter().any(|p| matches!(p, Protocol::P2pCircuit)) {
247 debug!("Not adding relay address to the cache: {addr}");
248 return;
249 }
250
251 if let Some(bootstrap_addrs) = self.data.peers.get_mut(&peer_id) {
253 if let Some(bootstrap_addr) = bootstrap_addrs.get_addr_mut(&addr) {
254 debug!("Updating existing peer's last_seen {addr}");
255 bootstrap_addr.last_seen = SystemTime::now();
256 return;
257 } else {
258 let mut bootstrap_addr = BootstrapAddr::new(addr.clone());
259 bootstrap_addr.success_count = 1;
260 bootstrap_addrs.insert_addr(&bootstrap_addr);
261 }
262 } else {
263 let mut bootstrap_addr = BootstrapAddr::new(addr.clone());
264 bootstrap_addr.success_count = 1;
265 self.data
266 .peers
267 .insert(peer_id, BootstrapAddresses(vec![bootstrap_addr]));
268 }
269
270 debug!("Added new peer {addr:?}, performing cleanup of old addrs");
271 self.try_remove_oldest_peers();
272 }
273
274 pub fn remove_addr(&mut self, addr: &Multiaddr) {
276 if let Some(peer_id) = multiaddr_get_peer_id(addr) {
277 if let Some(bootstrap_addresses) = self.data.peers.get_mut(&peer_id) {
278 bootstrap_addresses.remove_addr(addr);
279 } else {
280 debug!("Peer {peer_id:?} not found in the cache. Not removing addr: {addr:?}")
281 }
282 } else {
283 debug!("Could not obtain PeerId for {addr:?}, not removing addr from cache.");
284 }
285 }
286
287 pub fn try_remove_oldest_peers(&mut self) {
288 self.data.try_remove_oldest_peers(&self.config);
289 }
290
291 pub fn sync_and_flush_to_disk(&mut self) -> Result<()> {
293 if self.config.disable_cache_writing {
294 info!("Cache writing is disabled, skipping sync to disk");
295 return Ok(());
296 }
297
298 info!(
299 "Flushing cache to disk, with data containing: {} peers",
300 self.data.peers.len(),
301 );
302
303 if let Ok(data_from_file) = Self::load_cache_data(&self.config) {
304 self.data.sync(&data_from_file);
305 } else {
306 warn!("Failed to load cache data from file, overwriting with new data");
307 }
308
309 self.data.try_remove_oldest_peers(&self.config);
310
311 self.write().inspect_err(|e| {
312 error!("Failed to save cache to disk: {e}");
313 })?;
314
315 self.data.peers.clear();
317
318 Ok(())
319 }
320
321 pub fn write(&self) -> Result<()> {
324 debug!("Writing cache to disk: {:?}", self.cache_path);
325 if let Some(parent) = self.cache_path.parent() {
327 fs::create_dir_all(parent)?;
328 }
329
330 let mut file = AtomicWriteFile::options()
331 .open(&self.cache_path)
332 .inspect_err(|err| {
333 error!("Failed to open cache file using AtomicWriteFile: {err}");
334 })?;
335
336 let data = serde_json::to_string_pretty(&self.data).inspect_err(|err| {
337 error!("Failed to serialize cache data: {err}");
338 })?;
339 writeln!(file, "{data}")?;
340 file.commit().inspect_err(|err| {
341 error!("Failed to commit atomic write: {err}");
342 })?;
343
344 info!("Cache written to disk: {:?}", self.cache_path);
345
346 Ok(())
347 }
348}