ant_bootstrap/cache_store/
cache_data_v1.rs1use crate::Error;
10
11use atomic_write_file::AtomicWriteFile;
12use libp2p::{Multiaddr, PeerId};
13use serde::{Deserialize, Serialize};
14use std::{
15 collections::VecDeque,
16 fs::{self, OpenOptions},
17 io::{Read, Write},
18 path::{Path, PathBuf},
19 time::SystemTime,
20};
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct CacheData {
24 pub peers: VecDeque<(PeerId, VecDeque<Multiaddr>)>,
25 pub last_updated: SystemTime,
26 pub network_version: String,
27 pub cache_version: String,
28}
29
30impl CacheData {
31 pub const CACHE_DATA_VERSION: u32 = 1;
34
35 pub fn sync(&mut self, other: &CacheData, max_addrs_per_peer: usize, max_peers: usize) {
37 let old_len = self.peers.len();
38 let other_len = other.peers.len();
39
40 for (other_peer, other_addrs) in other.peers.iter() {
41 if other_addrs.is_empty() {
42 continue;
43 }
44
45 let mut found_existing = false;
46 for (peer, addrs) in self.peers.iter_mut() {
47 if peer == other_peer {
48 for addr in other_addrs.iter() {
49 if !addrs.contains(addr) {
50 addrs.push_back(addr.clone());
51 }
52 }
53 while addrs.len() > max_addrs_per_peer {
54 addrs.pop_front();
55 }
56 found_existing = true;
57 break;
58 }
59 }
60
61 if !found_existing {
62 self.peers.push_back((*other_peer, other_addrs.clone()));
63 }
64
65 if self.peers.len() >= max_peers {
67 break;
68 }
69 }
70
71 while self.peers.len() > max_peers {
73 self.peers.pop_back();
74 }
75
76 let new_len = self.peers.len();
77
78 info!(
79 "Synced {other_len} peers to our current {old_len:?} peers to have a final count of {new_len:?} peers"
80 );
81
82 self.last_updated = SystemTime::now();
83 }
84
85 pub fn add_peer<'a>(
87 &mut self,
88 peer_id: PeerId,
89 addrs: impl Iterator<Item = &'a Multiaddr>,
90 max_addrs_per_peer: usize,
91 max_peers: usize,
92 ) {
93 if let Some((_, present_addrs)) = self.peers.iter_mut().find(|(id, _)| id == &peer_id) {
94 for addr in addrs {
95 if !present_addrs.contains(addr) {
96 present_addrs.push_back(addr.clone());
97 }
98 }
99 while present_addrs.len() > max_addrs_per_peer {
100 present_addrs.pop_front();
101 }
102 } else {
103 self.peers.push_back((
104 peer_id,
105 addrs
106 .into_iter()
107 .take(max_addrs_per_peer)
108 .cloned()
109 .collect(),
110 ));
111 }
112
113 while self.peers.len() > max_peers {
114 self.peers.pop_front();
115 }
116 }
117
118 pub fn get_all_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
119 self.peers
120 .iter()
121 .flat_map(|(_, bootstrap_addresses)| bootstrap_addresses.iter().next())
122 }
123
124 pub fn read_from_file(cache_dir: &Path, file_name: &str) -> Result<Self, Error> {
125 let file_path = Self::cache_file_path(cache_dir, file_name);
126 let mut file = OpenOptions::new()
128 .read(true)
129 .open(&file_path)
130 .inspect_err(|err| warn!("Failed to open cache file at {file_path:?} : {err}",))?;
131
132 let mut contents = String::new();
134 file.read_to_string(&mut contents).inspect_err(|err| {
135 warn!("Failed to read cache file: {err}");
136 })?;
137
138 let data = serde_json::from_str::<Self>(&contents).map_err(|err| {
140 warn!("Failed to parse cache data: {err}");
141 Error::FailedToParseCacheData
142 })?;
143
144 Ok(data)
145 }
146
147 pub fn write_to_file(&self, cache_dir: &Path, file_name: &str) -> Result<(), Error> {
148 let file_path = Self::cache_file_path(cache_dir, file_name);
149
150 if let Some(parent) = file_path.parent() {
152 fs::create_dir_all(parent)?;
153 }
154
155 let mut file = AtomicWriteFile::options()
156 .open(&file_path)
157 .inspect_err(|err| {
158 error!("Failed to open cache file at {file_path:?} using AtomicWriteFile: {err}");
159 })?;
160
161 let data = serde_json::to_string_pretty(&self).inspect_err(|err| {
162 error!("Failed to serialize cache data: {err}");
163 })?;
164 writeln!(file, "{data}")?;
165 file.commit().inspect_err(|err| {
166 error!("Failed to commit atomic write: {err}");
167 })?;
168
169 info!("Cache written to disk: {:?}", file_path);
170
171 Ok(())
172 }
173
174 pub fn cache_file_path(cache_dir: &Path, file_name: &str) -> PathBuf {
175 cache_dir
176 .join(format!("version_{}", Self::CACHE_DATA_VERSION))
177 .join(file_name)
178 }
179}
180
181impl Default for CacheData {
182 fn default() -> Self {
183 Self {
184 peers: Default::default(),
185 last_updated: SystemTime::now(),
186 network_version: crate::get_network_version(),
187 cache_version: Self::CACHE_DATA_VERSION.to_string(),
188 }
189 }
190}