ant_bootstrap/cache_store/
cache_data_v1.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::Error;
10
11use libp2p::{Multiaddr, PeerId};
12use serde::{Deserialize, Serialize};
13use std::{
14    collections::{HashMap, VecDeque},
15    fs::{self, OpenOptions},
16    io::{Read, Seek, SeekFrom, Write},
17    path::{Path, PathBuf},
18    thread,
19    time::{Duration, SystemTime},
20};
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct CacheData {
24    pub peers: VecDeque<(PeerId, VecDeque<Multiaddr>)>,
25    pub last_updated: SystemTime,
26    pub network_version: String,
27    pub cache_version: String,
28}
29
30impl CacheData {
31    /// The version of the cache data format
32    /// This has to be bumped whenever the cache data format changes to ensure compatibility.
33    pub const CACHE_DATA_VERSION: u32 = 1;
34
35    /// Sync the self cache with another cache. Self peers (newer) are preserved by pushing
36    /// other peers to the back.
37    pub fn sync(&mut self, other: &CacheData, max_addrs_per_peer: usize, max_peers: usize) {
38        let old_len = self.peers.len();
39        let mut other_peers = other.peers.iter().cloned().collect::<HashMap<_, _>>();
40
41        for (peer, addrs) in self.peers.iter_mut() {
42            if let Some(other_addrs) = other_peers.get(peer) {
43                for other_addr in other_addrs.iter() {
44                    // push the other addr to the back if we don't already have it
45                    if !addrs.contains(other_addr) {
46                        addrs.push_back(other_addr.clone());
47                    }
48                }
49                // remove excess addrs from the back (oldest) if we exceed max_addrs_per_peer
50                while addrs.len() > max_addrs_per_peer {
51                    addrs.pop_back();
52                }
53            }
54            // remove from other_peers to avoid re-processing
55            other_peers.remove(peer);
56        }
57
58        // Apply max_peers limit by removing from back (oldest from other)
59        while self.peers.len() > max_peers {
60            self.peers.pop_back();
61        }
62
63        // Now add any remaining peers from other_peers
64        let required_len = max_peers.saturating_sub(self.peers.len());
65        let other_len = other_peers.len();
66        let other_peers = other_peers.into_iter().take(required_len);
67        self.peers.extend(other_peers);
68
69        let new_len = self.peers.len();
70
71        info!(
72            "Synced {other_len} peers to our current {old_len:?} peers to have a final count of {new_len:?} peers"
73        );
74
75        self.last_updated = SystemTime::now();
76    }
77
78    /// Add a peer to front of the cache as the newest, pruning old from tail
79    pub fn add_peer<'a>(
80        &mut self,
81        peer_id: PeerId,
82        addrs: impl Iterator<Item = &'a Multiaddr>,
83        max_addrs_per_peer: usize,
84        max_peers: usize,
85    ) {
86        if let Some((_, present_addrs)) = self.peers.iter_mut().find(|(id, _)| id == &peer_id) {
87            for addr in addrs {
88                if !present_addrs.contains(addr) {
89                    present_addrs.push_front(addr.clone());
90                }
91            }
92            while present_addrs.len() > max_addrs_per_peer {
93                present_addrs.pop_back();
94            }
95        } else {
96            self.peers.push_front((
97                peer_id,
98                addrs
99                    .into_iter()
100                    .take(max_addrs_per_peer)
101                    .cloned()
102                    .collect(),
103            ));
104        }
105
106        while self.peers.len() > max_peers {
107            self.peers.pop_back();
108        }
109    }
110
111    /// Remove a peer from the cache. This does not update the cache on disk.
112    pub fn remove_peer(&mut self, peer_id: &PeerId) {
113        self.peers.retain(|(id, _)| id != peer_id);
114    }
115
116    pub fn get_all_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
117        self.peers
118            .iter()
119            .flat_map(|(_, bootstrap_addresses)| bootstrap_addresses.iter().next())
120    }
121
122    fn lock_with_retry<F, L>(mut operation: F, mut log_failure: L) -> std::io::Result<()>
123    where
124        F: FnMut() -> std::io::Result<()>,
125        L: FnMut(&std::io::Error, usize, usize),
126    {
127        const MAX_ATTEMPTS: usize = 3;
128        const RETRY_DELAY_MS: u64 = 50;
129
130        for attempt in 1..=MAX_ATTEMPTS {
131            match operation() {
132                Ok(()) => return Ok(()),
133                Err(err) => {
134                    log_failure(&err, attempt, MAX_ATTEMPTS);
135                    if attempt == MAX_ATTEMPTS {
136                        return Err(err);
137                    }
138
139                    thread::sleep(Duration::from_millis(RETRY_DELAY_MS));
140                }
141            }
142        }
143
144        Ok(())
145    }
146
147    pub fn read_from_file(cache_dir: &Path, file_name: &str) -> Result<Self, Error> {
148        let file_path = Self::cache_file_path(cache_dir, file_name);
149
150        let contents = {
151            let mut file = OpenOptions::new()
152                .read(true)
153                .open(&file_path)
154                .inspect_err(|err| warn!("Failed to open cache file at {file_path:?} : {err}",))?;
155
156            debug!("Attempting to lock cache file for reading: {file_path:?}");
157            Self::lock_with_retry(
158                || file.lock_shared(),
159                |err, attempt, max_attempts| {
160                    warn!(
161                        "Failed to acquire shared lock on cache file {file_path:?} (attempt {attempt}/{max_attempts}): {err}"
162                    );
163                },
164            )?;
165
166            let mut contents = String::new();
167            file.read_to_string(&mut contents).inspect_err(|err| {
168                warn!("Failed to read cache file: {err}");
169            })?;
170
171            contents
172        };
173
174        let data = serde_json::from_str::<Self>(&contents).map_err(|err| {
175            warn!("Failed to parse cache data: {err}");
176            Error::FailedToParseCacheData
177        })?;
178
179        Ok(data)
180    }
181
182    pub fn write_to_file(&self, cache_dir: &Path, file_name: &str) -> Result<(), Error> {
183        let file_path = Self::cache_file_path(cache_dir, file_name);
184
185        // Create parent directory if it doesn't exist
186        if let Some(parent) = file_path.parent() {
187            fs::create_dir_all(parent)?;
188        }
189
190        // Manual truncation after lock acquisition is required for thread-safety.
191        // Using .truncate(true) would truncate during open() before the lock is acquired,
192        // creating a race window where readers can see empty/partial file content.
193        #[allow(clippy::suspicious_open_options)]
194        let mut file = OpenOptions::new()
195            .create(true)
196            .read(true)
197            .write(true)
198            .open(&file_path)
199            .inspect_err(|err| {
200                error!("Failed to open cache file at {file_path:?}: {err}");
201            })?;
202
203        debug!("Attempting to lock cache file for writing: {file_path:?}");
204        Self::lock_with_retry(
205            || file.lock(),
206            |err, attempt, max_attempts| {
207                error!(
208                    "Failed to acquire exclusive lock on cache file {file_path:?} (attempt {attempt}/{max_attempts}): {err}"
209                );
210            },
211        )?;
212
213        let data = serde_json::to_string_pretty(&self).inspect_err(|err| {
214            error!("Failed to serialize cache data: {err}");
215        })?;
216
217        file.set_len(0).inspect_err(|err| {
218            error!("Failed to truncate cache file {file_path:?} before writing: {err}");
219        })?;
220
221        file.seek(SeekFrom::Start(0)).inspect_err(|err| {
222            error!("Failed to seek cache file {file_path:?} before writing: {err}");
223        })?;
224
225        file.write_all(data.as_bytes()).inspect_err(|err| {
226            error!("Failed to write cache file {file_path:?}: {err}");
227        })?;
228
229        file.write_all(b"\n").inspect_err(|err| {
230            error!("Failed to write newline to cache file {file_path:?}: {err}");
231        })?;
232
233        file.flush().inspect_err(|err| {
234            error!("Failed to flush cache file {file_path:?}: {err}");
235        })?;
236
237        file.sync_all().inspect_err(|err| {
238            error!("Failed to sync cache file {file_path:?}: {err}");
239        })?;
240
241        info!(
242            "Cache with {} peers written to disk: {file_path:?}",
243            self.peers.len()
244        );
245
246        Ok(())
247    }
248
249    pub fn cache_file_path(cache_dir: &Path, file_name: &str) -> PathBuf {
250        cache_dir
251            .join(format!("version_{}", Self::CACHE_DATA_VERSION))
252            .join(file_name)
253    }
254}
255
256impl Default for CacheData {
257    fn default() -> Self {
258        Self {
259            peers: Default::default(),
260            last_updated: SystemTime::now(),
261            network_version: crate::get_network_version(),
262            cache_version: Self::CACHE_DATA_VERSION.to_string(),
263        }
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use rand::{Rng, SeedableRng, rngs::SmallRng};
271    use serde_json::Value;
272    use std::{
273        fs,
274        str::FromStr,
275        sync::{Arc, Barrier},
276        thread,
277    };
278
279    const THREAD_COUNT: usize = 100;
280    const ITERATIONS_PER_THREAD: usize = 25;
281    const READ_PROBABILITY: f64 = 0.4;
282
283    fn create_test_peer_data(rng: &mut SmallRng) -> CacheData {
284        let mut data = CacheData::default();
285        let peer = PeerId::random();
286        let port = rng.gen_range(1000..2000);
287        let addr = Multiaddr::from_str(&format!("/ip4/192.168.1.3/udp/{port}/quic-v1/p2p/{peer}"))
288            .expect("valid multiaddr");
289
290        data.add_peer(peer, [addr].iter(), 5, 10);
291        data
292    }
293
294    fn perform_random_cache_operation(cache_dir: &Path, file_name: &str, rng: &mut SmallRng) {
295        if rng.gen_bool(READ_PROBABILITY) {
296            CacheData::read_from_file(cache_dir, file_name)
297                .expect("concurrent read should succeed");
298        } else {
299            let data = create_test_peer_data(rng);
300            data.write_to_file(cache_dir, file_name)
301                .expect("concurrent write should succeed");
302        }
303    }
304
305    /// Validates that concurrent reads and writes maintain file integrity.
306    ///
307    /// This test ensures that the lock-then-truncate approach prevents readers
308    /// from seeing partial or corrupted JSON data during concurrent write operations.
309    /// It spawns 100 threads performing 25 random read/write operations each.
310    ///
311    /// The test verifies:
312    /// - No parse errors occur during concurrent access
313    /// - Final cache file contains valid JSON
314    /// - Cache version remains consistent
315    #[test]
316    fn cache_file_remains_valid_under_concurrent_access() {
317        let _ = tracing_subscriber::fmt::try_init();
318
319        let temp_dir = tempfile::tempdir().expect("create temp dir");
320        let cache_dir = Arc::new(temp_dir.path().to_path_buf());
321        let file_name = "cache.json";
322
323        CacheData::default()
324            .write_to_file(cache_dir.as_path(), file_name)
325            .expect("initial cache write");
326
327        let start_barrier = Arc::new(Barrier::new(THREAD_COUNT + 1));
328        let mut handles = Vec::with_capacity(THREAD_COUNT);
329
330        for thread_seed in 0..THREAD_COUNT {
331            let cache_dir = Arc::clone(&cache_dir);
332            let barrier = Arc::clone(&start_barrier);
333
334            handles.push(thread::spawn(move || {
335                let mut rng = SmallRng::seed_from_u64(thread_seed as u64 + 1);
336
337                barrier.wait();
338
339                for _ in 0..ITERATIONS_PER_THREAD {
340                    perform_random_cache_operation(cache_dir.as_path(), file_name, &mut rng);
341                }
342            }));
343        }
344
345        start_barrier.wait();
346
347        for handle in handles {
348            handle
349                .join()
350                .expect("all threads should complete successfully");
351        }
352
353        let final_data = CacheData::read_from_file(cache_dir.as_path(), file_name)
354            .expect("should read final cache state");
355
356        assert_eq!(
357            final_data.cache_version,
358            CacheData::CACHE_DATA_VERSION.to_string(),
359            "cache version should remain consistent after concurrent access"
360        );
361
362        let cache_file = CacheData::cache_file_path(cache_dir.as_path(), file_name);
363        let contents =
364            fs::read_to_string(&cache_file).expect("should read final cache file contents");
365
366        serde_json::from_str::<Value>(&contents)
367            .expect("final cache file should contain valid, parseable JSON");
368    }
369}