gemachain_runtime/
cache_hash_data.rs

1//! Cached data for hashing accounts
2use crate::accounts_hash::CalculateHashIntermediate;
3use crate::cache_hash_data_stats::CacheHashDataStats;
4use crate::pubkey_bins::PubkeyBinCalculator16;
5use log::*;
6use memmap2::MmapMut;
7use gemachain_measure::measure::Measure;
8use std::collections::HashSet;
9use std::fs::{self};
10use std::fs::{remove_file, OpenOptions};
11use std::io::Seek;
12use std::io::SeekFrom;
13use std::io::Write;
14use std::path::Path;
15use std::path::PathBuf;
16use std::sync::{Arc, Mutex};
17
18pub type EntryType = CalculateHashIntermediate;
19pub type SavedType = Vec<Vec<EntryType>>;
20pub type SavedTypeSlice = [Vec<EntryType>];
21
22#[repr(C)]
23pub struct Header {
24    count: usize,
25}
26
27struct CacheHashDataFile {
28    cell_size: u64,
29    mmap: MmapMut,
30    capacity: u64,
31}
32
33impl CacheHashDataFile {
34    fn get_mut<T: Sized>(&mut self, ix: u64) -> &mut T {
35        let start = (ix * self.cell_size) as usize + std::mem::size_of::<Header>();
36        let end = start + std::mem::size_of::<T>();
37        assert!(
38            end <= self.capacity as usize,
39            "end: {}, capacity: {}, ix: {}, cell size: {}",
40            end,
41            self.capacity,
42            ix,
43            self.cell_size
44        );
45        let item_slice: &[u8] = &self.mmap[start..end];
46        unsafe {
47            let item = item_slice.as_ptr() as *mut T;
48            &mut *item
49        }
50    }
51
52    fn get_header_mut(&mut self) -> &mut Header {
53        let start = 0_usize;
54        let end = start + std::mem::size_of::<Header>();
55        let item_slice: &[u8] = &self.mmap[start..end];
56        unsafe {
57            let item = item_slice.as_ptr() as *mut Header;
58            &mut *item
59        }
60    }
61
62    fn new_map(file: &Path, capacity: u64) -> Result<MmapMut, std::io::Error> {
63        let mut data = OpenOptions::new()
64            .read(true)
65            .write(true)
66            .create(true)
67            .open(file)?;
68
69        // Theoretical performance optimization: write a zero to the end of
70        // the file so that we won't have to resize it later, which may be
71        // expensive.
72        data.seek(SeekFrom::Start(capacity - 1)).unwrap();
73        data.write_all(&[0]).unwrap();
74        data.seek(SeekFrom::Start(0)).unwrap();
75        data.flush().unwrap();
76        Ok(unsafe { MmapMut::map_mut(&data).unwrap() })
77    }
78
79    fn load_map(file: &Path) -> Result<MmapMut, std::io::Error> {
80        let data = OpenOptions::new()
81            .read(true)
82            .write(true)
83            .create(false)
84            .open(file)?;
85
86        Ok(unsafe { MmapMut::map_mut(&data).unwrap() })
87    }
88}
89
90pub type PreExistingCacheFiles = HashSet<String>;
91pub struct CacheHashData {
92    cache_folder: PathBuf,
93    pre_existing_cache_files: Arc<Mutex<PreExistingCacheFiles>>,
94    pub stats: Arc<Mutex<CacheHashDataStats>>,
95}
96
97impl Drop for CacheHashData {
98    fn drop(&mut self) {
99        self.delete_old_cache_files();
100        self.stats.lock().unwrap().report();
101    }
102}
103
104impl CacheHashData {
105    pub fn new<P: AsRef<Path> + std::fmt::Debug>(parent_folder: &P) -> CacheHashData {
106        let cache_folder = Self::get_cache_root_path(parent_folder);
107
108        std::fs::create_dir_all(cache_folder.clone())
109            .unwrap_or_else(|_| panic!("error creating cache dir: {:?}", cache_folder));
110
111        let result = CacheHashData {
112            cache_folder,
113            pre_existing_cache_files: Arc::new(Mutex::new(PreExistingCacheFiles::default())),
114            stats: Arc::new(Mutex::new(CacheHashDataStats::default())),
115        };
116
117        result.get_cache_files();
118        result
119    }
120    fn delete_old_cache_files(&self) {
121        let pre_existing_cache_files = self.pre_existing_cache_files.lock().unwrap();
122        if !pre_existing_cache_files.is_empty() {
123            self.stats.lock().unwrap().unused_cache_files += pre_existing_cache_files.len();
124            for file_name in pre_existing_cache_files.iter() {
125                let result = self.cache_folder.join(file_name);
126                let _ = fs::remove_file(result);
127            }
128        }
129    }
130    fn get_cache_files(&self) {
131        if self.cache_folder.is_dir() {
132            let dir = fs::read_dir(self.cache_folder.clone());
133            if let Ok(dir) = dir {
134                let mut pre_existing = self.pre_existing_cache_files.lock().unwrap();
135                for entry in dir.flatten() {
136                    if let Some(name) = entry.path().file_name() {
137                        pre_existing.insert(name.to_str().unwrap().to_string());
138                    }
139                }
140                self.stats.lock().unwrap().cache_file_count += pre_existing.len();
141            }
142        }
143    }
144
145    fn get_cache_root_path<P: AsRef<Path>>(parent_folder: &P) -> PathBuf {
146        parent_folder.as_ref().join("calculate_accounts_hash_cache")
147    }
148
149    pub fn load<P: AsRef<Path> + std::fmt::Debug>(
150        &self,
151        file_name: &P,
152        accumulator: &mut SavedType,
153        start_bin_index: usize,
154        bin_calculator: &PubkeyBinCalculator16,
155    ) -> Result<(), std::io::Error> {
156        let mut stats = CacheHashDataStats::default();
157        let result = self.load_internal(
158            file_name,
159            accumulator,
160            start_bin_index,
161            bin_calculator,
162            &mut stats,
163        );
164        self.stats.lock().unwrap().merge(&stats);
165        result
166    }
167
168    fn load_internal<P: AsRef<Path> + std::fmt::Debug>(
169        &self,
170        file_name: &P,
171        accumulator: &mut SavedType,
172        start_bin_index: usize,
173        bin_calculator: &PubkeyBinCalculator16,
174        stats: &mut CacheHashDataStats,
175    ) -> Result<(), std::io::Error> {
176        let mut m = Measure::start("overall");
177        let path = self.cache_folder.join(file_name);
178        let file_len = std::fs::metadata(path.clone())?.len();
179        let mut m1 = Measure::start("read_file");
180        let mmap = CacheHashDataFile::load_map(&path)?;
181        m1.stop();
182        stats.read_us = m1.as_us();
183
184        let cell_size = std::mem::size_of::<EntryType>() as u64;
185        let mut cache_file = CacheHashDataFile {
186            mmap,
187            cell_size,
188            capacity: 0,
189        };
190        let header = cache_file.get_header_mut();
191        let entries = header.count;
192
193        let capacity = cell_size * (entries as u64) + std::mem::size_of::<Header>() as u64;
194        cache_file.capacity = capacity;
195        assert_eq!(
196            capacity, file_len,
197            "expected: {}, len on disk: {} {:?}, entries: {}, cell_size: {}",
198            capacity, file_len, path, entries, cell_size
199        );
200
201        stats.total_entries = entries;
202        stats.cache_file_size += capacity as usize;
203
204        let file_name_lookup = file_name.as_ref().to_str().unwrap().to_string();
205        let found = self
206            .pre_existing_cache_files
207            .lock()
208            .unwrap()
209            .remove(&file_name_lookup);
210        if !found {
211            info!(
212                "tried to mark {:?} as used, but it wasn't in the set, one example: {:?}",
213                file_name_lookup,
214                self.pre_existing_cache_files.lock().unwrap().iter().next()
215            );
216        }
217
218        stats.loaded_from_cache += 1;
219        stats.entries_loaded_from_cache += entries;
220        let mut m2 = Measure::start("decode");
221        for i in 0..entries {
222            let d = cache_file.get_mut::<EntryType>(i as u64);
223            let mut pubkey_to_bin_index = bin_calculator.bin_from_pubkey(&d.pubkey);
224            assert!(
225                pubkey_to_bin_index >= start_bin_index,
226                "{}, {}",
227                pubkey_to_bin_index,
228                start_bin_index
229            ); // this would indicate we put a pubkey in too high of a bin
230            pubkey_to_bin_index -= start_bin_index;
231            accumulator[pubkey_to_bin_index].push(d.clone()); // may want to avoid clone here
232        }
233
234        m2.stop();
235        stats.decode_us += m2.as_us();
236        m.stop();
237        stats.load_us += m.as_us();
238        Ok(())
239    }
240
241    pub fn save(&self, file_name: &Path, data: &SavedTypeSlice) -> Result<(), std::io::Error> {
242        let mut stats = CacheHashDataStats::default();
243        let result = self.save_internal(file_name, data, &mut stats);
244        self.stats.lock().unwrap().merge(&stats);
245        result
246    }
247
248    pub fn save_internal(
249        &self,
250        file_name: &Path,
251        data: &SavedTypeSlice,
252        stats: &mut CacheHashDataStats,
253    ) -> Result<(), std::io::Error> {
254        let mut m = Measure::start("save");
255        let cache_path = self.cache_folder.join(file_name);
256        let create = true;
257        if create {
258            let _ignored = remove_file(&cache_path);
259        }
260        let cell_size = std::mem::size_of::<EntryType>() as u64;
261        let mut m1 = Measure::start("create save");
262        let entries = data
263            .iter()
264            .map(|x: &Vec<EntryType>| x.len())
265            .collect::<Vec<_>>();
266        let entries = entries.iter().sum::<usize>();
267        let capacity = cell_size * (entries as u64) + std::mem::size_of::<Header>() as u64;
268
269        let mmap = CacheHashDataFile::new_map(&cache_path, capacity)?;
270        m1.stop();
271        stats.create_save_us += m1.as_us();
272        let mut cache_file = CacheHashDataFile {
273            mmap,
274            cell_size,
275            capacity,
276        };
277
278        let mut header = cache_file.get_header_mut();
279        header.count = entries;
280
281        stats.cache_file_size = capacity as usize;
282        stats.total_entries = entries;
283
284        let mut m2 = Measure::start("write_to_mmap");
285        let mut i = 0;
286        data.iter().for_each(|x| {
287            x.iter().for_each(|item| {
288                let d = cache_file.get_mut::<EntryType>(i as u64);
289                i += 1;
290                *d = item.clone();
291            })
292        });
293        assert_eq!(i, entries);
294        m2.stop();
295        stats.write_to_mmap_us += m2.as_us();
296        m.stop();
297        stats.save_us += m.as_us();
298        stats.saved_to_cache += 1;
299        Ok(())
300    }
301}
302
303#[cfg(test)]
304pub mod tests {
305    use super::*;
306    use rand::Rng;
307
308    #[test]
309    fn test_read_write() {
310        // generate sample data
311        // write to file
312        // read
313        // compare
314        use tempfile::TempDir;
315        let tmpdir = TempDir::new().unwrap();
316        std::fs::create_dir_all(&tmpdir).unwrap();
317
318        for bins in [1, 2, 4] {
319            let bin_calculator = PubkeyBinCalculator16::new(bins);
320            let num_points = 5;
321            let (data, _total_points) = generate_test_data(num_points, bins, &bin_calculator);
322            for passes in [1, 2] {
323                let bins_per_pass = bins / passes;
324                if bins_per_pass == 0 {
325                    continue; // illegal test case
326                }
327                for pass in 0..passes {
328                    for flatten_data in [true, false] {
329                        let mut data_this_pass = if flatten_data {
330                            vec![vec![], vec![]]
331                        } else {
332                            vec![]
333                        };
334                        let start_bin_this_pass = pass * bins_per_pass;
335                        for bin in 0..bins_per_pass {
336                            let mut this_bin_data = data[bin + start_bin_this_pass].clone();
337                            if flatten_data {
338                                data_this_pass[0].append(&mut this_bin_data);
339                            } else {
340                                data_this_pass.push(this_bin_data);
341                            }
342                        }
343                        let cache = CacheHashData::new(&tmpdir);
344                        let file_name = "test";
345                        let file = Path::new(file_name).to_path_buf();
346                        cache.save(&file, &data_this_pass).unwrap();
347                        cache.get_cache_files();
348                        assert_eq!(
349                            cache
350                                .pre_existing_cache_files
351                                .lock()
352                                .unwrap()
353                                .iter()
354                                .collect::<Vec<_>>(),
355                            vec![file_name]
356                        );
357                        let mut accum = (0..bins_per_pass).into_iter().map(|_| vec![]).collect();
358                        cache
359                            .load(&file, &mut accum, start_bin_this_pass, &bin_calculator)
360                            .unwrap();
361                        if flatten_data {
362                            bin_data(
363                                &mut data_this_pass,
364                                &bin_calculator,
365                                bins_per_pass,
366                                start_bin_this_pass,
367                            );
368                        }
369                        assert_eq!(
370                            accum, data_this_pass,
371                            "bins: {}, start_bin_this_pass: {}, pass: {}, flatten: {}, passes: {}",
372                            bins, start_bin_this_pass, pass, flatten_data, passes
373                        );
374                    }
375                }
376            }
377        }
378    }
379
380    fn bin_data(
381        data: &mut SavedType,
382        bin_calculator: &PubkeyBinCalculator16,
383        bins: usize,
384        start_bin: usize,
385    ) {
386        let mut accum: SavedType = (0..bins).into_iter().map(|_| vec![]).collect();
387        data.drain(..).into_iter().for_each(|mut x| {
388            x.drain(..).into_iter().for_each(|item| {
389                let bin = bin_calculator.bin_from_pubkey(&item.pubkey);
390                accum[bin - start_bin].push(item);
391            })
392        });
393        *data = accum;
394    }
395
396    fn generate_test_data(
397        count: usize,
398        bins: usize,
399        binner: &PubkeyBinCalculator16,
400    ) -> (SavedType, usize) {
401        let mut rng = rand::thread_rng();
402        let mut ct = 0;
403        (
404            (0..bins)
405                .into_iter()
406                .map(|bin| {
407                    let rnd = rng.gen::<u64>() % (bins as u64);
408                    if rnd < count as u64 {
409                        (0..std::cmp::max(1, count / bins))
410                            .into_iter()
411                            .map(|_| {
412                                ct += 1;
413                                let mut pk;
414                                loop {
415                                    // expensive, but small numbers and for tests, so ok
416                                    pk = gemachain_sdk::pubkey::new_rand();
417                                    if binner.bin_from_pubkey(&pk) == bin {
418                                        break;
419                                    }
420                                }
421
422                                CalculateHashIntermediate::new(
423                                    gemachain_sdk::hash::new_rand(&mut rng),
424                                    ct as u64,
425                                    pk,
426                                )
427                            })
428                            .collect::<Vec<_>>()
429                    } else {
430                        vec![]
431                    }
432                })
433                .collect::<Vec<_>>(),
434            ct,
435        )
436    }
437}