1use crate::accounts_hash::CalculateHashIntermediate;
3use crate::cache_hash_data_stats::CacheHashDataStats;
4use crate::pubkey_bins::PubkeyBinCalculator16;
5use log::*;
6use memmap2::MmapMut;
7use gemachain_measure::measure::Measure;
8use std::collections::HashSet;
9use std::fs::{self};
10use std::fs::{remove_file, OpenOptions};
11use std::io::Seek;
12use std::io::SeekFrom;
13use std::io::Write;
14use std::path::Path;
15use std::path::PathBuf;
16use std::sync::{Arc, Mutex};
17
18pub type EntryType = CalculateHashIntermediate;
19pub type SavedType = Vec<Vec<EntryType>>;
20pub type SavedTypeSlice = [Vec<EntryType>];
21
22#[repr(C)]
23pub struct Header {
24 count: usize,
25}
26
27struct CacheHashDataFile {
28 cell_size: u64,
29 mmap: MmapMut,
30 capacity: u64,
31}
32
33impl CacheHashDataFile {
34 fn get_mut<T: Sized>(&mut self, ix: u64) -> &mut T {
35 let start = (ix * self.cell_size) as usize + std::mem::size_of::<Header>();
36 let end = start + std::mem::size_of::<T>();
37 assert!(
38 end <= self.capacity as usize,
39 "end: {}, capacity: {}, ix: {}, cell size: {}",
40 end,
41 self.capacity,
42 ix,
43 self.cell_size
44 );
45 let item_slice: &[u8] = &self.mmap[start..end];
46 unsafe {
47 let item = item_slice.as_ptr() as *mut T;
48 &mut *item
49 }
50 }
51
52 fn get_header_mut(&mut self) -> &mut Header {
53 let start = 0_usize;
54 let end = start + std::mem::size_of::<Header>();
55 let item_slice: &[u8] = &self.mmap[start..end];
56 unsafe {
57 let item = item_slice.as_ptr() as *mut Header;
58 &mut *item
59 }
60 }
61
62 fn new_map(file: &Path, capacity: u64) -> Result<MmapMut, std::io::Error> {
63 let mut data = OpenOptions::new()
64 .read(true)
65 .write(true)
66 .create(true)
67 .open(file)?;
68
69 data.seek(SeekFrom::Start(capacity - 1)).unwrap();
73 data.write_all(&[0]).unwrap();
74 data.seek(SeekFrom::Start(0)).unwrap();
75 data.flush().unwrap();
76 Ok(unsafe { MmapMut::map_mut(&data).unwrap() })
77 }
78
79 fn load_map(file: &Path) -> Result<MmapMut, std::io::Error> {
80 let data = OpenOptions::new()
81 .read(true)
82 .write(true)
83 .create(false)
84 .open(file)?;
85
86 Ok(unsafe { MmapMut::map_mut(&data).unwrap() })
87 }
88}
89
90pub type PreExistingCacheFiles = HashSet<String>;
91pub struct CacheHashData {
92 cache_folder: PathBuf,
93 pre_existing_cache_files: Arc<Mutex<PreExistingCacheFiles>>,
94 pub stats: Arc<Mutex<CacheHashDataStats>>,
95}
96
97impl Drop for CacheHashData {
98 fn drop(&mut self) {
99 self.delete_old_cache_files();
100 self.stats.lock().unwrap().report();
101 }
102}
103
104impl CacheHashData {
105 pub fn new<P: AsRef<Path> + std::fmt::Debug>(parent_folder: &P) -> CacheHashData {
106 let cache_folder = Self::get_cache_root_path(parent_folder);
107
108 std::fs::create_dir_all(cache_folder.clone())
109 .unwrap_or_else(|_| panic!("error creating cache dir: {:?}", cache_folder));
110
111 let result = CacheHashData {
112 cache_folder,
113 pre_existing_cache_files: Arc::new(Mutex::new(PreExistingCacheFiles::default())),
114 stats: Arc::new(Mutex::new(CacheHashDataStats::default())),
115 };
116
117 result.get_cache_files();
118 result
119 }
120 fn delete_old_cache_files(&self) {
121 let pre_existing_cache_files = self.pre_existing_cache_files.lock().unwrap();
122 if !pre_existing_cache_files.is_empty() {
123 self.stats.lock().unwrap().unused_cache_files += pre_existing_cache_files.len();
124 for file_name in pre_existing_cache_files.iter() {
125 let result = self.cache_folder.join(file_name);
126 let _ = fs::remove_file(result);
127 }
128 }
129 }
130 fn get_cache_files(&self) {
131 if self.cache_folder.is_dir() {
132 let dir = fs::read_dir(self.cache_folder.clone());
133 if let Ok(dir) = dir {
134 let mut pre_existing = self.pre_existing_cache_files.lock().unwrap();
135 for entry in dir.flatten() {
136 if let Some(name) = entry.path().file_name() {
137 pre_existing.insert(name.to_str().unwrap().to_string());
138 }
139 }
140 self.stats.lock().unwrap().cache_file_count += pre_existing.len();
141 }
142 }
143 }
144
145 fn get_cache_root_path<P: AsRef<Path>>(parent_folder: &P) -> PathBuf {
146 parent_folder.as_ref().join("calculate_accounts_hash_cache")
147 }
148
149 pub fn load<P: AsRef<Path> + std::fmt::Debug>(
150 &self,
151 file_name: &P,
152 accumulator: &mut SavedType,
153 start_bin_index: usize,
154 bin_calculator: &PubkeyBinCalculator16,
155 ) -> Result<(), std::io::Error> {
156 let mut stats = CacheHashDataStats::default();
157 let result = self.load_internal(
158 file_name,
159 accumulator,
160 start_bin_index,
161 bin_calculator,
162 &mut stats,
163 );
164 self.stats.lock().unwrap().merge(&stats);
165 result
166 }
167
168 fn load_internal<P: AsRef<Path> + std::fmt::Debug>(
169 &self,
170 file_name: &P,
171 accumulator: &mut SavedType,
172 start_bin_index: usize,
173 bin_calculator: &PubkeyBinCalculator16,
174 stats: &mut CacheHashDataStats,
175 ) -> Result<(), std::io::Error> {
176 let mut m = Measure::start("overall");
177 let path = self.cache_folder.join(file_name);
178 let file_len = std::fs::metadata(path.clone())?.len();
179 let mut m1 = Measure::start("read_file");
180 let mmap = CacheHashDataFile::load_map(&path)?;
181 m1.stop();
182 stats.read_us = m1.as_us();
183
184 let cell_size = std::mem::size_of::<EntryType>() as u64;
185 let mut cache_file = CacheHashDataFile {
186 mmap,
187 cell_size,
188 capacity: 0,
189 };
190 let header = cache_file.get_header_mut();
191 let entries = header.count;
192
193 let capacity = cell_size * (entries as u64) + std::mem::size_of::<Header>() as u64;
194 cache_file.capacity = capacity;
195 assert_eq!(
196 capacity, file_len,
197 "expected: {}, len on disk: {} {:?}, entries: {}, cell_size: {}",
198 capacity, file_len, path, entries, cell_size
199 );
200
201 stats.total_entries = entries;
202 stats.cache_file_size += capacity as usize;
203
204 let file_name_lookup = file_name.as_ref().to_str().unwrap().to_string();
205 let found = self
206 .pre_existing_cache_files
207 .lock()
208 .unwrap()
209 .remove(&file_name_lookup);
210 if !found {
211 info!(
212 "tried to mark {:?} as used, but it wasn't in the set, one example: {:?}",
213 file_name_lookup,
214 self.pre_existing_cache_files.lock().unwrap().iter().next()
215 );
216 }
217
218 stats.loaded_from_cache += 1;
219 stats.entries_loaded_from_cache += entries;
220 let mut m2 = Measure::start("decode");
221 for i in 0..entries {
222 let d = cache_file.get_mut::<EntryType>(i as u64);
223 let mut pubkey_to_bin_index = bin_calculator.bin_from_pubkey(&d.pubkey);
224 assert!(
225 pubkey_to_bin_index >= start_bin_index,
226 "{}, {}",
227 pubkey_to_bin_index,
228 start_bin_index
229 ); pubkey_to_bin_index -= start_bin_index;
231 accumulator[pubkey_to_bin_index].push(d.clone()); }
233
234 m2.stop();
235 stats.decode_us += m2.as_us();
236 m.stop();
237 stats.load_us += m.as_us();
238 Ok(())
239 }
240
241 pub fn save(&self, file_name: &Path, data: &SavedTypeSlice) -> Result<(), std::io::Error> {
242 let mut stats = CacheHashDataStats::default();
243 let result = self.save_internal(file_name, data, &mut stats);
244 self.stats.lock().unwrap().merge(&stats);
245 result
246 }
247
248 pub fn save_internal(
249 &self,
250 file_name: &Path,
251 data: &SavedTypeSlice,
252 stats: &mut CacheHashDataStats,
253 ) -> Result<(), std::io::Error> {
254 let mut m = Measure::start("save");
255 let cache_path = self.cache_folder.join(file_name);
256 let create = true;
257 if create {
258 let _ignored = remove_file(&cache_path);
259 }
260 let cell_size = std::mem::size_of::<EntryType>() as u64;
261 let mut m1 = Measure::start("create save");
262 let entries = data
263 .iter()
264 .map(|x: &Vec<EntryType>| x.len())
265 .collect::<Vec<_>>();
266 let entries = entries.iter().sum::<usize>();
267 let capacity = cell_size * (entries as u64) + std::mem::size_of::<Header>() as u64;
268
269 let mmap = CacheHashDataFile::new_map(&cache_path, capacity)?;
270 m1.stop();
271 stats.create_save_us += m1.as_us();
272 let mut cache_file = CacheHashDataFile {
273 mmap,
274 cell_size,
275 capacity,
276 };
277
278 let mut header = cache_file.get_header_mut();
279 header.count = entries;
280
281 stats.cache_file_size = capacity as usize;
282 stats.total_entries = entries;
283
284 let mut m2 = Measure::start("write_to_mmap");
285 let mut i = 0;
286 data.iter().for_each(|x| {
287 x.iter().for_each(|item| {
288 let d = cache_file.get_mut::<EntryType>(i as u64);
289 i += 1;
290 *d = item.clone();
291 })
292 });
293 assert_eq!(i, entries);
294 m2.stop();
295 stats.write_to_mmap_us += m2.as_us();
296 m.stop();
297 stats.save_us += m.as_us();
298 stats.saved_to_cache += 1;
299 Ok(())
300 }
301}
302
303#[cfg(test)]
304pub mod tests {
305 use super::*;
306 use rand::Rng;
307
308 #[test]
309 fn test_read_write() {
310 use tempfile::TempDir;
315 let tmpdir = TempDir::new().unwrap();
316 std::fs::create_dir_all(&tmpdir).unwrap();
317
318 for bins in [1, 2, 4] {
319 let bin_calculator = PubkeyBinCalculator16::new(bins);
320 let num_points = 5;
321 let (data, _total_points) = generate_test_data(num_points, bins, &bin_calculator);
322 for passes in [1, 2] {
323 let bins_per_pass = bins / passes;
324 if bins_per_pass == 0 {
325 continue; }
327 for pass in 0..passes {
328 for flatten_data in [true, false] {
329 let mut data_this_pass = if flatten_data {
330 vec![vec![], vec![]]
331 } else {
332 vec![]
333 };
334 let start_bin_this_pass = pass * bins_per_pass;
335 for bin in 0..bins_per_pass {
336 let mut this_bin_data = data[bin + start_bin_this_pass].clone();
337 if flatten_data {
338 data_this_pass[0].append(&mut this_bin_data);
339 } else {
340 data_this_pass.push(this_bin_data);
341 }
342 }
343 let cache = CacheHashData::new(&tmpdir);
344 let file_name = "test";
345 let file = Path::new(file_name).to_path_buf();
346 cache.save(&file, &data_this_pass).unwrap();
347 cache.get_cache_files();
348 assert_eq!(
349 cache
350 .pre_existing_cache_files
351 .lock()
352 .unwrap()
353 .iter()
354 .collect::<Vec<_>>(),
355 vec![file_name]
356 );
357 let mut accum = (0..bins_per_pass).into_iter().map(|_| vec![]).collect();
358 cache
359 .load(&file, &mut accum, start_bin_this_pass, &bin_calculator)
360 .unwrap();
361 if flatten_data {
362 bin_data(
363 &mut data_this_pass,
364 &bin_calculator,
365 bins_per_pass,
366 start_bin_this_pass,
367 );
368 }
369 assert_eq!(
370 accum, data_this_pass,
371 "bins: {}, start_bin_this_pass: {}, pass: {}, flatten: {}, passes: {}",
372 bins, start_bin_this_pass, pass, flatten_data, passes
373 );
374 }
375 }
376 }
377 }
378 }
379
380 fn bin_data(
381 data: &mut SavedType,
382 bin_calculator: &PubkeyBinCalculator16,
383 bins: usize,
384 start_bin: usize,
385 ) {
386 let mut accum: SavedType = (0..bins).into_iter().map(|_| vec![]).collect();
387 data.drain(..).into_iter().for_each(|mut x| {
388 x.drain(..).into_iter().for_each(|item| {
389 let bin = bin_calculator.bin_from_pubkey(&item.pubkey);
390 accum[bin - start_bin].push(item);
391 })
392 });
393 *data = accum;
394 }
395
396 fn generate_test_data(
397 count: usize,
398 bins: usize,
399 binner: &PubkeyBinCalculator16,
400 ) -> (SavedType, usize) {
401 let mut rng = rand::thread_rng();
402 let mut ct = 0;
403 (
404 (0..bins)
405 .into_iter()
406 .map(|bin| {
407 let rnd = rng.gen::<u64>() % (bins as u64);
408 if rnd < count as u64 {
409 (0..std::cmp::max(1, count / bins))
410 .into_iter()
411 .map(|_| {
412 ct += 1;
413 let mut pk;
414 loop {
415 pk = gemachain_sdk::pubkey::new_rand();
417 if binner.bin_from_pubkey(&pk) == bin {
418 break;
419 }
420 }
421
422 CalculateHashIntermediate::new(
423 gemachain_sdk::hash::new_rand(&mut rng),
424 ct as u64,
425 pk,
426 )
427 })
428 .collect::<Vec<_>>()
429 } else {
430 vec![]
431 }
432 })
433 .collect::<Vec<_>>(),
434 ct,
435 )
436 }
437}