ant_bootstrap/cache_store/
cache_data_v1.rs1use crate::Error;
10
11use libp2p::{Multiaddr, PeerId};
12use serde::{Deserialize, Serialize};
13use std::{
14 collections::{HashMap, VecDeque},
15 fs::{self, OpenOptions},
16 io::{Read, Seek, SeekFrom, Write},
17 path::{Path, PathBuf},
18 thread,
19 time::{Duration, SystemTime},
20};
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct CacheData {
24 pub peers: VecDeque<(PeerId, VecDeque<Multiaddr>)>,
25 pub last_updated: SystemTime,
26 pub network_version: String,
27 pub cache_version: String,
28}
29
30impl CacheData {
31 pub const CACHE_DATA_VERSION: u32 = 1;
34
35 pub fn sync(&mut self, other: &CacheData, max_addrs_per_peer: usize, max_peers: usize) {
38 let old_len = self.peers.len();
39 let mut other_peers = other.peers.iter().cloned().collect::<HashMap<_, _>>();
40
41 for (peer, addrs) in self.peers.iter_mut() {
42 if let Some(other_addrs) = other_peers.get(peer) {
43 for other_addr in other_addrs.iter() {
44 if !addrs.contains(other_addr) {
46 addrs.push_back(other_addr.clone());
47 }
48 }
49 while addrs.len() > max_addrs_per_peer {
51 addrs.pop_back();
52 }
53 }
54 other_peers.remove(peer);
56 }
57
58 while self.peers.len() > max_peers {
60 self.peers.pop_back();
61 }
62
63 let required_len = max_peers.saturating_sub(self.peers.len());
65 let other_len = other_peers.len();
66 let other_peers = other_peers.into_iter().take(required_len);
67 self.peers.extend(other_peers);
68
69 let new_len = self.peers.len();
70
71 info!(
72 "Synced {other_len} peers to our current {old_len:?} peers to have a final count of {new_len:?} peers"
73 );
74
75 self.last_updated = SystemTime::now();
76 }
77
78 pub fn add_peer<'a>(
80 &mut self,
81 peer_id: PeerId,
82 addrs: impl Iterator<Item = &'a Multiaddr>,
83 max_addrs_per_peer: usize,
84 max_peers: usize,
85 ) {
86 if let Some((_, present_addrs)) = self.peers.iter_mut().find(|(id, _)| id == &peer_id) {
87 for addr in addrs {
88 if !present_addrs.contains(addr) {
89 present_addrs.push_front(addr.clone());
90 }
91 }
92 while present_addrs.len() > max_addrs_per_peer {
93 present_addrs.pop_back();
94 }
95 } else {
96 self.peers.push_front((
97 peer_id,
98 addrs
99 .into_iter()
100 .take(max_addrs_per_peer)
101 .cloned()
102 .collect(),
103 ));
104 }
105
106 while self.peers.len() > max_peers {
107 self.peers.pop_back();
108 }
109 }
110
111 pub fn remove_peer(&mut self, peer_id: &PeerId) {
113 self.peers.retain(|(id, _)| id != peer_id);
114 }
115
116 pub fn get_all_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
117 self.peers
118 .iter()
119 .flat_map(|(_, bootstrap_addresses)| bootstrap_addresses.iter().next())
120 }
121
122 fn lock_with_retry<F, L>(mut operation: F, mut log_failure: L) -> std::io::Result<()>
123 where
124 F: FnMut() -> std::io::Result<()>,
125 L: FnMut(&std::io::Error, usize, usize),
126 {
127 const MAX_ATTEMPTS: usize = 3;
128 const RETRY_DELAY_MS: u64 = 50;
129
130 for attempt in 1..=MAX_ATTEMPTS {
131 match operation() {
132 Ok(()) => return Ok(()),
133 Err(err) => {
134 log_failure(&err, attempt, MAX_ATTEMPTS);
135 if attempt == MAX_ATTEMPTS {
136 return Err(err);
137 }
138
139 thread::sleep(Duration::from_millis(RETRY_DELAY_MS));
140 }
141 }
142 }
143
144 Ok(())
145 }
146
147 pub fn read_from_file(cache_dir: &Path, file_name: &str) -> Result<Self, Error> {
148 let file_path = Self::cache_file_path(cache_dir, file_name);
149
150 let contents = {
151 let mut file = OpenOptions::new()
152 .read(true)
153 .open(&file_path)
154 .inspect_err(|err| warn!("Failed to open cache file at {file_path:?} : {err}",))?;
155
156 debug!("Attempting to lock cache file for reading: {file_path:?}");
157 Self::lock_with_retry(
158 || file.lock_shared(),
159 |err, attempt, max_attempts| {
160 warn!(
161 "Failed to acquire shared lock on cache file {file_path:?} (attempt {attempt}/{max_attempts}): {err}"
162 );
163 },
164 )?;
165
166 let mut contents = String::new();
167 file.read_to_string(&mut contents).inspect_err(|err| {
168 warn!("Failed to read cache file: {err}");
169 })?;
170
171 contents
172 };
173
174 let data = serde_json::from_str::<Self>(&contents).map_err(|err| {
175 warn!("Failed to parse cache data: {err}");
176 Error::FailedToParseCacheData
177 })?;
178
179 Ok(data)
180 }
181
182 pub fn write_to_file(&self, cache_dir: &Path, file_name: &str) -> Result<(), Error> {
183 let file_path = Self::cache_file_path(cache_dir, file_name);
184
185 if let Some(parent) = file_path.parent() {
187 fs::create_dir_all(parent)?;
188 }
189
190 #[allow(clippy::suspicious_open_options)]
194 let mut file = OpenOptions::new()
195 .create(true)
196 .read(true)
197 .write(true)
198 .open(&file_path)
199 .inspect_err(|err| {
200 error!("Failed to open cache file at {file_path:?}: {err}");
201 })?;
202
203 debug!("Attempting to lock cache file for writing: {file_path:?}");
204 Self::lock_with_retry(
205 || file.lock(),
206 |err, attempt, max_attempts| {
207 error!(
208 "Failed to acquire exclusive lock on cache file {file_path:?} (attempt {attempt}/{max_attempts}): {err}"
209 );
210 },
211 )?;
212
213 let data = serde_json::to_string_pretty(&self).inspect_err(|err| {
214 error!("Failed to serialize cache data: {err}");
215 })?;
216
217 file.set_len(0).inspect_err(|err| {
218 error!("Failed to truncate cache file {file_path:?} before writing: {err}");
219 })?;
220
221 file.seek(SeekFrom::Start(0)).inspect_err(|err| {
222 error!("Failed to seek cache file {file_path:?} before writing: {err}");
223 })?;
224
225 file.write_all(data.as_bytes()).inspect_err(|err| {
226 error!("Failed to write cache file {file_path:?}: {err}");
227 })?;
228
229 file.write_all(b"\n").inspect_err(|err| {
230 error!("Failed to write newline to cache file {file_path:?}: {err}");
231 })?;
232
233 file.flush().inspect_err(|err| {
234 error!("Failed to flush cache file {file_path:?}: {err}");
235 })?;
236
237 file.sync_all().inspect_err(|err| {
238 error!("Failed to sync cache file {file_path:?}: {err}");
239 })?;
240
241 info!(
242 "Cache with {} peers written to disk: {file_path:?}",
243 self.peers.len()
244 );
245
246 Ok(())
247 }
248
249 pub fn cache_file_path(cache_dir: &Path, file_name: &str) -> PathBuf {
250 cache_dir
251 .join(format!("version_{}", Self::CACHE_DATA_VERSION))
252 .join(file_name)
253 }
254}
255
256impl Default for CacheData {
257 fn default() -> Self {
258 Self {
259 peers: Default::default(),
260 last_updated: SystemTime::now(),
261 network_version: crate::get_network_version(),
262 cache_version: Self::CACHE_DATA_VERSION.to_string(),
263 }
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270 use rand::{Rng, SeedableRng, rngs::SmallRng};
271 use serde_json::Value;
272 use std::{
273 fs,
274 str::FromStr,
275 sync::{Arc, Barrier},
276 thread,
277 };
278
279 const THREAD_COUNT: usize = 100;
280 const ITERATIONS_PER_THREAD: usize = 25;
281 const READ_PROBABILITY: f64 = 0.4;
282
283 fn create_test_peer_data(rng: &mut SmallRng) -> CacheData {
284 let mut data = CacheData::default();
285 let peer = PeerId::random();
286 let port = rng.gen_range(1000..2000);
287 let addr = Multiaddr::from_str(&format!("/ip4/192.168.1.3/udp/{port}/quic-v1/p2p/{peer}"))
288 .expect("valid multiaddr");
289
290 data.add_peer(peer, [addr].iter(), 5, 10);
291 data
292 }
293
294 fn perform_random_cache_operation(cache_dir: &Path, file_name: &str, rng: &mut SmallRng) {
295 if rng.gen_bool(READ_PROBABILITY) {
296 CacheData::read_from_file(cache_dir, file_name)
297 .expect("concurrent read should succeed");
298 } else {
299 let data = create_test_peer_data(rng);
300 data.write_to_file(cache_dir, file_name)
301 .expect("concurrent write should succeed");
302 }
303 }
304
305 #[test]
316 fn cache_file_remains_valid_under_concurrent_access() {
317 let _ = tracing_subscriber::fmt::try_init();
318
319 let temp_dir = tempfile::tempdir().expect("create temp dir");
320 let cache_dir = Arc::new(temp_dir.path().to_path_buf());
321 let file_name = "cache.json";
322
323 CacheData::default()
324 .write_to_file(cache_dir.as_path(), file_name)
325 .expect("initial cache write");
326
327 let start_barrier = Arc::new(Barrier::new(THREAD_COUNT + 1));
328 let mut handles = Vec::with_capacity(THREAD_COUNT);
329
330 for thread_seed in 0..THREAD_COUNT {
331 let cache_dir = Arc::clone(&cache_dir);
332 let barrier = Arc::clone(&start_barrier);
333
334 handles.push(thread::spawn(move || {
335 let mut rng = SmallRng::seed_from_u64(thread_seed as u64 + 1);
336
337 barrier.wait();
338
339 for _ in 0..ITERATIONS_PER_THREAD {
340 perform_random_cache_operation(cache_dir.as_path(), file_name, &mut rng);
341 }
342 }));
343 }
344
345 start_barrier.wait();
346
347 for handle in handles {
348 handle
349 .join()
350 .expect("all threads should complete successfully");
351 }
352
353 let final_data = CacheData::read_from_file(cache_dir.as_path(), file_name)
354 .expect("should read final cache state");
355
356 assert_eq!(
357 final_data.cache_version,
358 CacheData::CACHE_DATA_VERSION.to_string(),
359 "cache version should remain consistent after concurrent access"
360 );
361
362 let cache_file = CacheData::cache_file_path(cache_dir.as_path(), file_name);
363 let contents =
364 fs::read_to_string(&cache_file).expect("should read final cache file contents");
365
366 serde_json::from_str::<Value>(&contents)
367 .expect("final cache file should contain valid, parseable JSON");
368 }
369}