1use crate::{
10 craft_valid_multiaddr, multiaddr_get_peer_id, BootstrapAddr, BootstrapAddresses,
11 BootstrapCacheConfig, Error, InitialPeersConfig, Result,
12};
13use atomic_write_file::AtomicWriteFile;
14use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
15use serde::{Deserialize, Serialize};
16use std::{
17 collections::{hash_map::Entry, HashMap},
18 fs::{self, OpenOptions},
19 io::{Read, Write},
20 path::PathBuf,
21 time::{Duration, SystemTime},
22};
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct CacheData {
26 pub peers: std::collections::HashMap<PeerId, BootstrapAddresses>,
27 pub last_updated: SystemTime,
28 pub network_version: String,
29}
30
31impl CacheData {
32 pub fn insert(&mut self, peer_id: PeerId, bootstrap_addr: BootstrapAddr) {
33 match self.peers.entry(peer_id) {
34 Entry::Occupied(mut occupied_entry) => {
35 occupied_entry.get_mut().insert_addr(&bootstrap_addr);
36 }
37 Entry::Vacant(vacant_entry) => {
38 vacant_entry.insert(BootstrapAddresses(vec![bootstrap_addr]));
39 }
40 }
41 }
42
43 pub fn sync(&mut self, other: &CacheData) {
45 for (peer, other_addresses_state) in other.peers.iter() {
46 let bootstrap_addresses = self
47 .peers
48 .entry(*peer)
49 .or_insert(other_addresses_state.clone());
50
51 trace!("Syncing {peer:?} from other with addrs count: {:?}. Our in memory state count: {:?}", other_addresses_state.0.len(), bootstrap_addresses.0.len());
52
53 bootstrap_addresses.sync(other_addresses_state);
54 }
55
56 self.last_updated = SystemTime::now();
57 }
58
59 pub fn perform_cleanup(&mut self, cfg: &BootstrapCacheConfig) {
66 self.peers.values_mut().for_each(|bootstrap_addresses| {
67 bootstrap_addresses.0.retain(|bootstrap_addr| {
68 let now = SystemTime::now();
69 let has_not_expired =
70 if let Ok(duration) = now.duration_since(bootstrap_addr.last_seen) {
71 duration < cfg.addr_expiry_duration
72 } else {
73 false
74 };
75 bootstrap_addr.is_reliable() && has_not_expired
76 })
77 });
78
79 self.peers
80 .retain(|_, bootstrap_addresses| !bootstrap_addresses.0.is_empty());
81
82 self.peers.values_mut().for_each(|bootstrap_addresses| {
83 if bootstrap_addresses.0.len() > cfg.max_addrs_per_peer {
84 bootstrap_addresses
86 .0
87 .sort_by_key(|addr| addr.failure_rate() as u64);
88 bootstrap_addresses.0.truncate(cfg.max_addrs_per_peer);
89 }
90 });
91
92 self.try_remove_oldest_peers(cfg);
93 }
94
95 pub fn try_remove_oldest_peers(&mut self, cfg: &BootstrapCacheConfig) {
97 if self.peers.len() > cfg.max_peers {
98 let mut peer_last_seen_map = HashMap::new();
99 for (peer, addrs) in self.peers.iter() {
100 let mut latest_seen = Duration::from_secs(u64::MAX);
101 for addr in addrs.0.iter() {
102 if let Ok(elapsed) = addr.last_seen.elapsed() {
103 trace!("Time elapsed for {addr:?} is {elapsed:?}");
104 if elapsed < latest_seen {
105 trace!("Updating latest_seen to {elapsed:?}");
106 latest_seen = elapsed;
107 }
108 }
109 }
110 trace!("Last seen for {peer:?} is {latest_seen:?}");
111 peer_last_seen_map.insert(*peer, latest_seen);
112 }
113
114 while self.peers.len() > cfg.max_peers {
115 if let Some((&oldest_peer, last_seen)) = peer_last_seen_map
117 .iter()
118 .max_by_key(|(_, last_seen)| **last_seen)
119 {
120 debug!("Found the oldest peer to remove: {oldest_peer:?} with last_seen of {last_seen:?}");
121 self.peers.remove(&oldest_peer);
122 peer_last_seen_map.remove(&oldest_peer);
123 }
124 }
125 }
126 }
127}
128
129impl Default for CacheData {
130 fn default() -> Self {
131 Self {
132 peers: std::collections::HashMap::new(),
133 last_updated: SystemTime::now(),
134 network_version: crate::get_network_version(),
135 }
136 }
137}
138
139#[derive(Clone, Debug)]
140pub struct BootstrapCacheStore {
141 pub(crate) cache_path: PathBuf,
142 pub(crate) config: BootstrapCacheConfig,
143 pub(crate) data: CacheData,
144}
145
146impl BootstrapCacheStore {
147 pub fn config(&self) -> &BootstrapCacheConfig {
148 &self.config
149 }
150
151 pub fn new(config: BootstrapCacheConfig) -> Result<Self> {
153 info!("Creating new CacheStore with config: {:?}", config);
154 let cache_path = config.cache_file_path.clone();
155
156 if let Some(parent) = cache_path.parent() {
158 if !parent.exists() {
159 info!("Attempting to create cache directory at {parent:?}");
160 fs::create_dir_all(parent).inspect_err(|err| {
161 warn!("Failed to create cache directory at {parent:?}: {err}");
162 })?;
163 }
164 }
165
166 let store = Self {
167 cache_path,
168 config,
169 data: CacheData::default(),
170 };
171
172 Ok(store)
173 }
174
175 pub fn new_from_initial_peers_config(
181 init_peers_config: &InitialPeersConfig,
182 config: Option<BootstrapCacheConfig>,
183 ) -> Result<Self> {
184 let mut config = if let Some(cfg) = config {
185 cfg
186 } else {
187 BootstrapCacheConfig::default_config(init_peers_config.local)?
188 };
189 if let Some(bootstrap_cache_path) = init_peers_config.get_bootstrap_cache_path()? {
190 config.cache_file_path = bootstrap_cache_path;
191 }
192
193 let store = Self::new(config)?;
194
195 if init_peers_config.first {
197 info!("First node in network, writing empty cache to disk");
198 store.write()?;
199 }
200
201 Ok(store)
202 }
203
204 pub fn load_cache_data(cfg: &BootstrapCacheConfig) -> Result<CacheData> {
207 let mut file = OpenOptions::new()
209 .read(true)
210 .open(&cfg.cache_file_path)
211 .inspect_err(|err| warn!("Failed to open cache file: {err}",))?;
212
213 let mut contents = String::new();
215 file.read_to_string(&mut contents).inspect_err(|err| {
216 warn!("Failed to read cache file: {err}");
217 })?;
218
219 let mut data = serde_json::from_str::<CacheData>(&contents).map_err(|err| {
221 warn!("Failed to parse cache data: {err}");
222 Error::FailedToParseCacheData
223 })?;
224
225 data.perform_cleanup(cfg);
226
227 Ok(data)
228 }
229
230 pub fn peer_count(&self) -> usize {
231 self.data.peers.len()
232 }
233
234 pub fn get_all_addrs(&self) -> impl Iterator<Item = &BootstrapAddr> {
235 self.data
236 .peers
237 .values()
238 .flat_map(|bootstrap_addresses| bootstrap_addresses.0.iter())
239 }
240
241 pub fn get_sorted_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
244 let mut addrs = self
245 .data
246 .peers
247 .values()
248 .flat_map(|bootstrap_addresses| bootstrap_addresses.get_least_faulty())
249 .collect::<Vec<_>>();
250
251 addrs.sort_by_key(|addr| addr.failure_rate() as u64);
252
253 addrs.into_iter().map(|addr| &addr.addr)
254 }
255
256 pub fn update_addr_status(&mut self, addr: &Multiaddr, success: bool) {
258 if let Some(peer_id) = multiaddr_get_peer_id(addr) {
259 debug!("Updating addr status: {addr} (success: {success})");
260 if let Some(bootstrap_addresses) = self.data.peers.get_mut(&peer_id) {
261 bootstrap_addresses.update_addr_status(addr, success);
262 } else {
263 debug!("Peer not found in cache to update: {addr}");
264 }
265 }
266 }
267
268 pub fn add_addr(&mut self, addr: Multiaddr) {
270 debug!("Trying to add new addr: {addr}");
271 let Some(addr) = craft_valid_multiaddr(&addr, false) else {
272 return;
273 };
274 let peer_id = match addr.iter().find(|p| matches!(p, Protocol::P2p(_))) {
275 Some(Protocol::P2p(id)) => id,
276 _ => return,
277 };
278
279 if let Some(bootstrap_addrs) = self.data.peers.get_mut(&peer_id) {
281 if let Some(bootstrap_addr) = bootstrap_addrs.get_addr_mut(&addr) {
282 debug!("Updating existing peer's last_seen {addr}");
283 bootstrap_addr.last_seen = SystemTime::now();
284 return;
285 } else {
286 let mut bootstrap_addr = BootstrapAddr::new(addr.clone());
287 bootstrap_addr.success_count = 1;
288 bootstrap_addrs.insert_addr(&bootstrap_addr);
289 }
290 } else {
291 let mut bootstrap_addr = BootstrapAddr::new(addr.clone());
292 bootstrap_addr.success_count = 1;
293 self.data
294 .peers
295 .insert(peer_id, BootstrapAddresses(vec![bootstrap_addr]));
296 }
297
298 debug!("Added new peer {addr:?}, performing cleanup of old addrs");
299 self.perform_cleanup();
300 }
301
302 pub fn remove_addr(&mut self, addr: &Multiaddr) {
304 if let Some(peer_id) = multiaddr_get_peer_id(addr) {
305 if let Some(bootstrap_addresses) = self.data.peers.get_mut(&peer_id) {
306 bootstrap_addresses.remove_addr(addr);
307 } else {
308 debug!("Peer {peer_id:?} not found in the cache. Not removing addr: {addr:?}")
309 }
310 } else {
311 debug!("Could not obtain PeerId for {addr:?}, not removing addr from cache.");
312 }
313 }
314
315 pub fn perform_cleanup(&mut self) {
316 self.data.perform_cleanup(&self.config);
317 }
318
319 pub fn sync_and_flush_to_disk(&mut self, with_cleanup: bool) -> Result<()> {
322 if self.config.disable_cache_writing {
323 info!("Cache writing is disabled, skipping sync to disk");
324 return Ok(());
325 }
326
327 info!(
328 "Flushing cache to disk, with data containing: {} peers",
329 self.data.peers.len(),
330 );
331
332 if let Ok(data_from_file) = Self::load_cache_data(&self.config) {
333 self.data.sync(&data_from_file);
334 } else {
335 warn!("Failed to load cache data from file, overwriting with new data");
336 }
337
338 if with_cleanup {
339 self.data.perform_cleanup(&self.config);
340 self.data.try_remove_oldest_peers(&self.config);
341 }
342
343 self.write().inspect_err(|e| {
344 error!("Failed to save cache to disk: {e}");
345 })?;
346
347 self.data.peers.clear();
349
350 Ok(())
351 }
352
353 pub fn write(&self) -> Result<()> {
356 debug!("Writing cache to disk: {:?}", self.cache_path);
357 if let Some(parent) = self.cache_path.parent() {
359 fs::create_dir_all(parent)?;
360 }
361
362 let mut file = AtomicWriteFile::options()
363 .open(&self.cache_path)
364 .inspect_err(|err| {
365 error!("Failed to open cache file using AtomicWriteFile: {err}");
366 })?;
367
368 let data = serde_json::to_string_pretty(&self.data).inspect_err(|err| {
369 error!("Failed to serialize cache data: {err}");
370 })?;
371 writeln!(file, "{data}")?;
372 file.commit().inspect_err(|err| {
373 error!("Failed to commit atomic write: {err}");
374 })?;
375
376 info!("Cache written to disk: {:?}", self.cache_path);
377
378 Ok(())
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use tempfile::tempdir;
386
387 async fn create_test_store() -> (BootstrapCacheStore, PathBuf) {
388 let temp_dir = tempdir().unwrap();
389 let cache_file = temp_dir.path().join("cache.json");
390
391 let config = crate::BootstrapCacheConfig::empty().with_cache_path(&cache_file);
392
393 let store = BootstrapCacheStore::new(config).unwrap();
394 (store.clone(), store.cache_path.clone())
395 }
396
397 #[tokio::test]
398 async fn test_peer_cleanup() {
399 let (mut store, _) = create_test_store().await;
400 let good_addr: Multiaddr =
401 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
402 .parse()
403 .unwrap();
404 let bad_addr: Multiaddr =
405 "/ip4/127.0.0.1/tcp/8081/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
406 .parse()
407 .unwrap();
408
409 store.add_addr(good_addr.clone());
411 store.add_addr(bad_addr.clone());
412
413 store.update_addr_status(&good_addr, true);
415
416 for _ in 0..5 {
418 store.update_addr_status(&bad_addr, false);
419 }
420
421 store.perform_cleanup();
423
424 let peers = store.get_all_addrs().collect::<Vec<_>>();
426 assert_eq!(peers.len(), 1);
427 assert_eq!(peers[0].addr, good_addr);
428 }
429
430 #[tokio::test]
431 async fn test_peer_not_removed_if_successful() {
432 let (mut store, _) = create_test_store().await;
433 let addr: Multiaddr =
434 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
435 .parse()
436 .unwrap();
437
438 store.add_addr(addr.clone());
440 store.update_addr_status(&addr, true);
441
442 tokio::time::sleep(Duration::from_millis(100)).await;
444
445 store.perform_cleanup();
447
448 let peers = store.get_all_addrs().collect::<Vec<_>>();
450 assert_eq!(peers.len(), 1);
451 assert_eq!(peers[0].addr, addr);
452 }
453}