atlas_accounts_db/
cache_hash_data.rs

1//! Cached data for hashing accounts
2#[cfg(test)]
3use crate::pubkey_bins::PubkeyBinCalculator24;
4use {
5    crate::{accounts_hash::CalculateHashIntermediate, cache_hash_data_stats::CacheHashDataStats},
6    bytemuck_derive::{Pod, Zeroable},
7    memmap2::MmapMut,
8    atlas_clock::Slot,
9    atlas_measure::measure::Measure,
10    std::{
11        collections::HashSet,
12        fs::{self, remove_file, File, OpenOptions},
13        io::{Seek, SeekFrom, Write},
14        path::{Path, PathBuf},
15        sync::{atomic::Ordering, Arc, Mutex},
16    },
17};
18
19pub type EntryType = CalculateHashIntermediate;
20pub type SavedTypeSlice = [Vec<EntryType>];
21
22#[cfg(test)]
23pub type SavedType = Vec<Vec<EntryType>>;
24
25#[repr(C)]
26#[derive(Debug, Clone, Copy, Pod, Zeroable)]
27pub struct Header {
28    pub count: usize,
29}
30
31// In order to safely guarantee Header is Pod, it cannot have any padding
32// This is obvious by inspection, but this will also catch any inadvertent
33// changes in the future (i.e. it is a test).
34// Additionally, we compare the header size with `u64` instead of `usize`
35// to ensure binary compatibility doesn't break.
36const _: () = assert!(
37    std::mem::size_of::<Header>() == std::mem::size_of::<u64>(),
38    "Header cannot have any padding and must be the same size as u64",
39);
40
41/// cache hash data file to be mmapped later
42pub(crate) struct CacheHashDataFileReference {
43    file: File,
44    file_len: u64,
45    path: PathBuf,
46    stats: Arc<CacheHashDataStats>,
47}
48
49/// mmapped cache hash data file
50pub(crate) struct CacheHashDataFile {
51    cell_size: u64,
52    mmap: MmapMut,
53    capacity: u64,
54}
55
56impl CacheHashDataFileReference {
57    /// convert the open file reference to a mmapped file that can be returned as a slice
58    pub(crate) fn map(&self) -> Result<CacheHashDataFile, std::io::Error> {
59        let file_len = self.file_len;
60        let mut m1 = Measure::start("read_file");
61        let mmap = CacheHashDataFileReference::load_map(&self.file)?;
62        m1.stop();
63        self.stats.read_us.fetch_add(m1.as_us(), Ordering::Relaxed);
64        let header_size = std::mem::size_of::<Header>() as u64;
65        if file_len < header_size {
66            return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof));
67        }
68
69        let cell_size = std::mem::size_of::<EntryType>() as u64;
70        unsafe {
71            assert_eq!(
72                mmap.align_to::<EntryType>().0.len(),
73                0,
74                "mmap is not aligned"
75            );
76        }
77        assert_eq!((cell_size as usize) % std::mem::size_of::<u64>(), 0);
78        let mut cache_file = CacheHashDataFile {
79            mmap,
80            cell_size,
81            capacity: 0,
82        };
83        let header = cache_file.get_header_mut();
84        let entries = header.count;
85
86        let capacity = cell_size * (entries as u64) + header_size;
87        if file_len < capacity {
88            return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof));
89        }
90        cache_file.capacity = capacity;
91        assert_eq!(
92            capacity, file_len,
93            "expected: {capacity}, len on disk: {file_len} {}, entries: {entries}, cell_size: {cell_size}", self.path.display(),
94        );
95
96        self.stats
97            .total_entries
98            .fetch_add(entries, Ordering::Relaxed);
99        self.stats
100            .cache_file_size
101            .fetch_add(capacity as usize, Ordering::Relaxed);
102
103        self.stats.loaded_from_cache.fetch_add(1, Ordering::Relaxed);
104        self.stats
105            .entries_loaded_from_cache
106            .fetch_add(entries, Ordering::Relaxed);
107        Ok(cache_file)
108    }
109
110    fn load_map(file: &File) -> Result<MmapMut, std::io::Error> {
111        Ok(unsafe { MmapMut::map_mut(file).unwrap() })
112    }
113}
114
115impl CacheHashDataFile {
116    /// return a slice of a reference to all the cache hash data from the mmapped file
117    pub fn get_cache_hash_data(&self) -> &[EntryType] {
118        self.get_slice(0)
119    }
120
121    #[cfg(test)]
122    /// Populate 'accumulator' from entire contents of the cache file.
123    pub fn load_all(
124        &self,
125        accumulator: &mut SavedType,
126        start_bin_index: usize,
127        bin_calculator: &PubkeyBinCalculator24,
128    ) {
129        let mut m2 = Measure::start("decode");
130        let slices = self.get_cache_hash_data();
131        for d in slices {
132            let mut pubkey_to_bin_index = bin_calculator.bin_from_pubkey(&d.pubkey);
133            assert!(
134                pubkey_to_bin_index >= start_bin_index,
135                "{pubkey_to_bin_index}, {start_bin_index}"
136            ); // this would indicate we put a pubkey in too high of a bin
137            pubkey_to_bin_index -= start_bin_index;
138            accumulator[pubkey_to_bin_index].push(*d); // may want to avoid copy here
139        }
140
141        m2.stop();
142    }
143
144    /// get '&mut EntryType' from cache file [ix]
145    fn get_mut(&mut self, ix: u64) -> &mut EntryType {
146        let start = self.get_element_offset_byte(ix);
147        let end = start + std::mem::size_of::<EntryType>();
148        assert!(
149            end <= self.capacity as usize,
150            "end: {end}, capacity: {}, ix: {ix}, cell size: {}",
151            self.capacity,
152            self.cell_size,
153        );
154        let bytes = &mut self.mmap[start..end];
155        bytemuck::from_bytes_mut(bytes)
156    }
157
158    /// get '&[EntryType]' from cache file [ix..]
159    fn get_slice(&self, ix: u64) -> &[EntryType] {
160        let start = self.get_element_offset_byte(ix);
161        let bytes = &self.mmap[start..];
162        // the `bytes` slice *must* contain whole `EntryType`s
163        debug_assert_eq!(bytes.len() % std::mem::size_of::<EntryType>(), 0);
164        bytemuck::cast_slice(bytes)
165    }
166
167    /// return byte offset of entry 'ix' into a slice which contains a header and at least ix elements
168    fn get_element_offset_byte(&self, ix: u64) -> usize {
169        let start = (ix * self.cell_size) as usize + std::mem::size_of::<Header>();
170        debug_assert_eq!(start % std::mem::align_of::<EntryType>(), 0);
171        start
172    }
173
174    fn get_header_mut(&mut self) -> &mut Header {
175        let bytes = &mut self.mmap[..std::mem::size_of::<Header>()];
176        bytemuck::from_bytes_mut(bytes)
177    }
178
179    fn new_map(file: impl AsRef<Path>, capacity: u64) -> Result<MmapMut, std::io::Error> {
180        let mut data = OpenOptions::new()
181            .read(true)
182            .write(true)
183            .create_new(true)
184            .open(file)?;
185
186        // Theoretical performance optimization: write a zero to the end of
187        // the file so that we won't have to resize it later, which may be
188        // expensive.
189        data.seek(SeekFrom::Start(capacity - 1)).unwrap();
190        data.write_all(&[0]).unwrap();
191        data.rewind().unwrap();
192        data.flush().unwrap();
193        Ok(unsafe { MmapMut::map_mut(&data).unwrap() })
194    }
195}
196
197pub(crate) struct CacheHashData {
198    cache_dir: PathBuf,
199    pre_existing_cache_files: Arc<Mutex<HashSet<PathBuf>>>,
200    deletion_policy: DeletionPolicy,
201    pub stats: Arc<CacheHashDataStats>,
202}
203
204impl Drop for CacheHashData {
205    fn drop(&mut self) {
206        self.delete_old_cache_files();
207        self.stats.report();
208    }
209}
210
211/// The suffix to append to a cache hash data filename to indicate that the file is being written.
212const IN_PROGRESS_SUFFIX: &str = ".in-progress";
213
214impl CacheHashData {
215    pub(crate) fn new(cache_dir: PathBuf, deletion_policy: DeletionPolicy) -> CacheHashData {
216        std::fs::create_dir_all(&cache_dir).unwrap_or_else(|err| {
217            panic!("error creating cache dir {}: {err}", cache_dir.display())
218        });
219
220        let result = CacheHashData {
221            cache_dir,
222            pre_existing_cache_files: Arc::new(Mutex::new(HashSet::default())),
223            deletion_policy,
224            stats: Arc::new(CacheHashDataStats::default()),
225        };
226
227        result.get_cache_files();
228        result
229    }
230
231    /// delete all pre-existing files that will not be used
232    pub(crate) fn delete_old_cache_files(&self) {
233        // all the renaming files in `pre_existing_cache_files` were *not* used for this
234        // accounts hash calculation
235        let mut old_cache_files =
236            std::mem::take(&mut *self.pre_existing_cache_files.lock().unwrap());
237
238        match self.deletion_policy {
239            DeletionPolicy::AllUnused => {
240                // no additional work to do here; we will delete everything in `old_cache_files`
241            }
242            DeletionPolicy::UnusedAtLeast(storages_start_slot) => {
243                // when calculating an incremental accounts hash, we only want to delete the unused
244                // cache files *that IAH considered*
245                old_cache_files.retain(|old_cache_file| {
246                    let Some(parsed_filename) = parse_filename(old_cache_file) else {
247                        // if parsing the cache filename fails, we *do* want to delete it
248                        return true;
249                    };
250
251                    // if the old cache file is in the incremental accounts hash calculation range,
252                    // then delete it
253                    parsed_filename.slot_range_start >= storages_start_slot
254                });
255            }
256        }
257
258        if !old_cache_files.is_empty() {
259            self.stats
260                .unused_cache_files
261                .fetch_add(old_cache_files.len(), Ordering::Relaxed);
262            for file_name in old_cache_files.iter() {
263                let result = self.cache_dir.join(file_name);
264                let _ = fs::remove_file(result);
265            }
266        }
267    }
268
269    fn get_cache_files(&self) {
270        if self.cache_dir.is_dir() {
271            let dir = fs::read_dir(&self.cache_dir);
272            if let Ok(dir) = dir {
273                let mut pre_existing = self.pre_existing_cache_files.lock().unwrap();
274                for entry in dir.flatten() {
275                    if entry.path().ends_with(IN_PROGRESS_SUFFIX) {
276                        // ignore in-progress files and delete them
277                        let _ = fs::remove_file(entry.path());
278                        continue;
279                    }
280
281                    if let Some(name) = entry.path().file_name() {
282                        pre_existing.insert(PathBuf::from(name));
283                    }
284                }
285                self.stats
286                    .cache_file_count
287                    .fetch_add(pre_existing.len(), Ordering::Relaxed);
288            }
289        }
290    }
291
292    /// open a cache hash file, but don't map it.
293    /// This allows callers to know a file exists, but preserves the # mmapped files.
294    pub(crate) fn get_file_reference_to_map_later(
295        &self,
296        file_name: impl AsRef<Path>,
297    ) -> Result<CacheHashDataFileReference, std::io::Error> {
298        let path = self.cache_dir.join(&file_name);
299        let file_len = std::fs::metadata(&path)?.len();
300        let mut m1 = Measure::start("read_file");
301
302        let file = OpenOptions::new()
303            .read(true)
304            .write(true)
305            .create(false)
306            .open(&path)?;
307        m1.stop();
308        self.stats.read_us.fetch_add(m1.as_us(), Ordering::Relaxed);
309        self.pre_existing_cache_file_will_be_used(file_name);
310
311        Ok(CacheHashDataFileReference {
312            file,
313            file_len,
314            path,
315            stats: Arc::clone(&self.stats),
316        })
317    }
318
319    fn pre_existing_cache_file_will_be_used(&self, file_name: impl AsRef<Path>) {
320        self.pre_existing_cache_files
321            .lock()
322            .unwrap()
323            .remove(file_name.as_ref());
324    }
325
326    /// save 'data' to 'file_name'
327    pub(crate) fn save(
328        &self,
329        file_name: impl AsRef<Path>,
330        data: &SavedTypeSlice,
331    ) -> Result<(), std::io::Error> {
332        // delete any existing file at this path
333        let cache_path = self.cache_dir.join(file_name.as_ref());
334        let _ignored = remove_file(&cache_path);
335
336        // Append ".in-progress" to the filename to indicate that the file is
337        // being written
338        let work_in_progress_file_full_path = self
339            .cache_dir
340            .join(Self::get_work_in_progress_file_name(file_name.as_ref()));
341        Self::save_internal(&work_in_progress_file_full_path, data, &self.stats)?;
342        // Rename the file to remove the ".in-progress" suffix after the file
343        // has been successfully written. This is done to ensure that the file is
344        // not read before it has been completely written. For example, if the
345        // validator was stopped or crashed in the middle of writing the file, the file
346        // would be incomplete and would not be read by the validator on next restart.
347        fs::rename(work_in_progress_file_full_path, cache_path)
348    }
349
350    fn get_work_in_progress_file_name(file_name: impl AsRef<Path>) -> PathBuf {
351        let mut s = PathBuf::from(file_name.as_ref()).into_os_string();
352        s.push(IN_PROGRESS_SUFFIX);
353        s.into()
354    }
355
356    fn save_internal(
357        in_progress_cache_file_full_path: impl AsRef<Path>,
358        data: &SavedTypeSlice,
359        stats: &CacheHashDataStats,
360    ) -> Result<(), std::io::Error> {
361        let mut m = Measure::start("save");
362        let _ignored = remove_file(&in_progress_cache_file_full_path);
363        let cell_size = std::mem::size_of::<EntryType>() as u64;
364        let mut m1 = Measure::start("create save");
365        let entries = data.iter().map(Vec::len).sum::<usize>();
366        let capacity = cell_size * (entries as u64) + std::mem::size_of::<Header>() as u64;
367
368        let mmap = CacheHashDataFile::new_map(&in_progress_cache_file_full_path, capacity)?;
369        m1.stop();
370        stats
371            .create_save_us
372            .fetch_add(m1.as_us(), Ordering::Relaxed);
373        let mut cache_file = CacheHashDataFile {
374            mmap,
375            cell_size,
376            capacity,
377        };
378
379        let header = cache_file.get_header_mut();
380        header.count = entries;
381
382        stats
383            .cache_file_size
384            .fetch_add(capacity as usize, Ordering::Relaxed);
385        stats.total_entries.fetch_add(entries, Ordering::Relaxed);
386
387        let mut m2 = Measure::start("write_to_mmap");
388        let mut i = 0;
389        data.iter().for_each(|x| {
390            x.iter().for_each(|item| {
391                let d = cache_file.get_mut(i as u64);
392                i += 1;
393                *d = *item;
394            })
395        });
396        assert_eq!(i, entries);
397        m2.stop();
398        m.stop();
399        stats
400            .write_to_mmap_us
401            .fetch_add(m2.as_us(), Ordering::Relaxed);
402        stats.save_us.fetch_add(m.as_us(), Ordering::Relaxed);
403        stats.saved_to_cache.fetch_add(1, Ordering::Relaxed);
404        Ok(())
405    }
406}
407
408/// The values of each part of a cache hash data filename
409#[derive(Debug)]
410pub struct ParsedFilename {
411    pub slot_range_start: Slot,
412    pub slot_range_end: Slot,
413    pub bin_range_start: u64,
414    pub bin_range_end: u64,
415    pub hash: u64,
416}
417
418/// Parses a cache hash data filename into its parts
419///
420/// Returns None if the filename is invalid
421pub fn parse_filename(cache_filename: impl AsRef<Path>) -> Option<ParsedFilename> {
422    let filename = cache_filename.as_ref().to_string_lossy().to_string();
423    let parts: Vec<_> = filename.split('.').collect(); // The parts are separated by a `.`
424    if parts.len() != 5 {
425        return None;
426    }
427    let slot_range_start = parts.first()?.parse().ok()?;
428    let slot_range_end = parts.get(1)?.parse().ok()?;
429    let bin_range_start = parts.get(2)?.parse().ok()?;
430    let bin_range_end = parts.get(3)?.parse().ok()?;
431    let hash = u64::from_str_radix(parts.get(4)?, 16).ok()?; // the hash is in hex
432    Some(ParsedFilename {
433        slot_range_start,
434        slot_range_end,
435        bin_range_start,
436        bin_range_end,
437        hash,
438    })
439}
440
441/// Decides which old cache files to delete
442///
443/// See `delete_old_cache_files()` for more info.
444#[derive(Debug, Copy, Clone, Eq, PartialEq)]
445pub enum DeletionPolicy {
446    /// Delete *all* the unused cache files
447    /// Should be used when calculating full accounts hash
448    AllUnused,
449    /// Delete *only* the unused cache files with starting slot range *at least* this slot
450    /// Should be used when calculating incremental accounts hash
451    UnusedAtLeast(Slot),
452}
453
454#[cfg(test)]
455mod tests {
456    use {super::*, crate::accounts_hash::AccountHash, rand::Rng};
457
458    impl CacheHashData {
459        /// load from 'file_name' into 'accumulator'
460        fn load(
461            &self,
462            file_name: impl AsRef<Path>,
463            accumulator: &mut SavedType,
464            start_bin_index: usize,
465            bin_calculator: &PubkeyBinCalculator24,
466        ) -> Result<(), std::io::Error> {
467            let mut m = Measure::start("overall");
468            let cache_file = self.load_map(file_name)?;
469            cache_file.load_all(accumulator, start_bin_index, bin_calculator);
470            m.stop();
471            self.stats.load_us.fetch_add(m.as_us(), Ordering::Relaxed);
472            Ok(())
473        }
474
475        /// map 'file_name' into memory
476        fn load_map(
477            &self,
478            file_name: impl AsRef<Path>,
479        ) -> Result<CacheHashDataFile, std::io::Error> {
480            let reference = self.get_file_reference_to_map_later(file_name)?;
481            reference.map()
482        }
483    }
484
485    #[test]
486    fn test_read_write() {
487        // generate sample data
488        // write to file
489        // read
490        // compare
491        use tempfile::TempDir;
492        let tmpdir = TempDir::new().unwrap();
493        let cache_dir = tmpdir.path().to_path_buf();
494        std::fs::create_dir_all(&cache_dir).unwrap();
495
496        for bins in [1, 2, 4] {
497            let bin_calculator = PubkeyBinCalculator24::new(bins);
498            let num_points = 5;
499            let (data, _total_points) = generate_test_data(num_points, bins, &bin_calculator);
500            for passes in [1, 2] {
501                let bins_per_pass = bins / passes;
502                if bins_per_pass == 0 {
503                    continue; // illegal test case
504                }
505                for pass in 0..passes {
506                    for flatten_data in [true, false] {
507                        let mut data_this_pass = if flatten_data {
508                            vec![vec![], vec![]]
509                        } else {
510                            vec![]
511                        };
512                        let start_bin_this_pass = pass * bins_per_pass;
513                        for bin in 0..bins_per_pass {
514                            let mut this_bin_data = data[bin + start_bin_this_pass].clone();
515                            if flatten_data {
516                                data_this_pass[0].append(&mut this_bin_data);
517                            } else {
518                                data_this_pass.push(this_bin_data);
519                            }
520                        }
521                        let cache =
522                            CacheHashData::new(cache_dir.clone(), DeletionPolicy::AllUnused);
523                        let file_name = PathBuf::from("test");
524                        cache.save(&file_name, &data_this_pass).unwrap();
525                        cache.get_cache_files();
526                        assert_eq!(
527                            cache
528                                .pre_existing_cache_files
529                                .lock()
530                                .unwrap()
531                                .iter()
532                                .collect::<Vec<_>>(),
533                            vec![&file_name],
534                        );
535                        let mut accum = (0..bins_per_pass).map(|_| vec![]).collect();
536                        cache
537                            .load(&file_name, &mut accum, start_bin_this_pass, &bin_calculator)
538                            .unwrap();
539                        if flatten_data {
540                            bin_data(
541                                &mut data_this_pass,
542                                &bin_calculator,
543                                bins_per_pass,
544                                start_bin_this_pass,
545                            );
546                        }
547                        assert_eq!(
548                            accum, data_this_pass,
549                            "bins: {bins}, start_bin_this_pass: {start_bin_this_pass}, pass: {pass}, flatten: {flatten_data}, passes: {passes}"
550                        );
551                    }
552                }
553            }
554        }
555    }
556
557    fn bin_data(
558        data: &mut SavedType,
559        bin_calculator: &PubkeyBinCalculator24,
560        bins: usize,
561        start_bin: usize,
562    ) {
563        let mut accum: SavedType = (0..bins).map(|_| vec![]).collect();
564        data.drain(..).for_each(|mut x| {
565            x.drain(..).for_each(|item| {
566                let bin = bin_calculator.bin_from_pubkey(&item.pubkey);
567                accum[bin - start_bin].push(item);
568            })
569        });
570        *data = accum;
571    }
572
573    fn generate_test_data(
574        count: usize,
575        bins: usize,
576        binner: &PubkeyBinCalculator24,
577    ) -> (SavedType, usize) {
578        let mut rng = rand::thread_rng();
579        let mut ct = 0;
580        (
581            (0..bins)
582                .map(|bin| {
583                    let rnd = rng.gen::<u64>() % (bins as u64);
584                    if rnd < count as u64 {
585                        (0..std::cmp::max(1, count / bins))
586                            .map(|_| {
587                                ct += 1;
588                                let mut pk;
589                                loop {
590                                    // expensive, but small numbers and for tests, so ok
591                                    pk = atlas_pubkey::new_rand();
592                                    if binner.bin_from_pubkey(&pk) == bin {
593                                        break;
594                                    }
595                                }
596
597                                CalculateHashIntermediate {
598                                    hash: AccountHash(atlas_hash::Hash::new_unique()),
599                                    lamports: ct as u64,
600                                    pubkey: pk,
601                                }
602                            })
603                            .collect::<Vec<_>>()
604                    } else {
605                        vec![]
606                    }
607                })
608                .collect::<Vec<_>>(),
609            ct,
610        )
611    }
612
613    #[test]
614    #[allow(clippy::used_underscore_binding)]
615    fn test_parse_filename() {
616        let good_filename = "123.456.0.65536.537d65697d9b2baa";
617        let parsed_filename = parse_filename(good_filename).unwrap();
618        assert_eq!(parsed_filename.slot_range_start, 123);
619        assert_eq!(parsed_filename.slot_range_end, 456);
620        assert_eq!(parsed_filename.bin_range_start, 0);
621        assert_eq!(parsed_filename.bin_range_end, 65536);
622        assert_eq!(parsed_filename.hash, 0x537d65697d9b2baa);
623
624        let bad_filenames = [
625            // bad separator
626            "123-456-0-65536.537d65697d9b2baa",
627            // bad values
628            "abc.456.0.65536.537d65697d9b2baa",
629            "123.xyz.0.65536.537d65697d9b2baa",
630            "123.456.?.65536.537d65697d9b2baa",
631            "123.456.0.@#$%^.537d65697d9b2baa",
632            "123.456.0.65536.base19shouldfail",
633            "123.456.0.65536.123456789012345678901234567890",
634            // missing values
635            "123.456.0.65536.",
636            "123.456.0.65536",
637            // extra junk
638            "123.456.0.65536.537d65697d9b2baa.42",
639            "123.456.0.65536.537d65697d9b2baa.",
640            "123.456.0.65536.537d65697d9b2baa/",
641            ".123.456.0.65536.537d65697d9b2baa",
642            "/123.456.0.65536.537d65697d9b2baa",
643        ];
644        for bad_filename in bad_filenames {
645            assert!(parse_filename(bad_filename).is_none());
646        }
647    }
648
649    #[test]
650    fn tet_get_work_in_progress_file_name() {
651        let filename = "test";
652        let work_in_progress_filename = CacheHashData::get_work_in_progress_file_name(filename);
653        assert_eq!(work_in_progress_filename.as_os_str(), "test.in-progress");
654    }
655}