1#[cfg(test)]
3use crate::pubkey_bins::PubkeyBinCalculator24;
4use {
5 crate::{accounts_hash::CalculateHashIntermediate, cache_hash_data_stats::CacheHashDataStats},
6 bytemuck_derive::{Pod, Zeroable},
7 memmap2::MmapMut,
8 atlas_clock::Slot,
9 atlas_measure::measure::Measure,
10 std::{
11 collections::HashSet,
12 fs::{self, remove_file, File, OpenOptions},
13 io::{Seek, SeekFrom, Write},
14 path::{Path, PathBuf},
15 sync::{atomic::Ordering, Arc, Mutex},
16 },
17};
18
19pub type EntryType = CalculateHashIntermediate;
20pub type SavedTypeSlice = [Vec<EntryType>];
21
22#[cfg(test)]
23pub type SavedType = Vec<Vec<EntryType>>;
24
25#[repr(C)]
26#[derive(Debug, Clone, Copy, Pod, Zeroable)]
27pub struct Header {
28 pub count: usize,
29}
30
31const _: () = assert!(
37 std::mem::size_of::<Header>() == std::mem::size_of::<u64>(),
38 "Header cannot have any padding and must be the same size as u64",
39);
40
41pub(crate) struct CacheHashDataFileReference {
43 file: File,
44 file_len: u64,
45 path: PathBuf,
46 stats: Arc<CacheHashDataStats>,
47}
48
49pub(crate) struct CacheHashDataFile {
51 cell_size: u64,
52 mmap: MmapMut,
53 capacity: u64,
54}
55
56impl CacheHashDataFileReference {
57 pub(crate) fn map(&self) -> Result<CacheHashDataFile, std::io::Error> {
59 let file_len = self.file_len;
60 let mut m1 = Measure::start("read_file");
61 let mmap = CacheHashDataFileReference::load_map(&self.file)?;
62 m1.stop();
63 self.stats.read_us.fetch_add(m1.as_us(), Ordering::Relaxed);
64 let header_size = std::mem::size_of::<Header>() as u64;
65 if file_len < header_size {
66 return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof));
67 }
68
69 let cell_size = std::mem::size_of::<EntryType>() as u64;
70 unsafe {
71 assert_eq!(
72 mmap.align_to::<EntryType>().0.len(),
73 0,
74 "mmap is not aligned"
75 );
76 }
77 assert_eq!((cell_size as usize) % std::mem::size_of::<u64>(), 0);
78 let mut cache_file = CacheHashDataFile {
79 mmap,
80 cell_size,
81 capacity: 0,
82 };
83 let header = cache_file.get_header_mut();
84 let entries = header.count;
85
86 let capacity = cell_size * (entries as u64) + header_size;
87 if file_len < capacity {
88 return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof));
89 }
90 cache_file.capacity = capacity;
91 assert_eq!(
92 capacity, file_len,
93 "expected: {capacity}, len on disk: {file_len} {}, entries: {entries}, cell_size: {cell_size}", self.path.display(),
94 );
95
96 self.stats
97 .total_entries
98 .fetch_add(entries, Ordering::Relaxed);
99 self.stats
100 .cache_file_size
101 .fetch_add(capacity as usize, Ordering::Relaxed);
102
103 self.stats.loaded_from_cache.fetch_add(1, Ordering::Relaxed);
104 self.stats
105 .entries_loaded_from_cache
106 .fetch_add(entries, Ordering::Relaxed);
107 Ok(cache_file)
108 }
109
110 fn load_map(file: &File) -> Result<MmapMut, std::io::Error> {
111 Ok(unsafe { MmapMut::map_mut(file).unwrap() })
112 }
113}
114
115impl CacheHashDataFile {
116 pub fn get_cache_hash_data(&self) -> &[EntryType] {
118 self.get_slice(0)
119 }
120
121 #[cfg(test)]
122 pub fn load_all(
124 &self,
125 accumulator: &mut SavedType,
126 start_bin_index: usize,
127 bin_calculator: &PubkeyBinCalculator24,
128 ) {
129 let mut m2 = Measure::start("decode");
130 let slices = self.get_cache_hash_data();
131 for d in slices {
132 let mut pubkey_to_bin_index = bin_calculator.bin_from_pubkey(&d.pubkey);
133 assert!(
134 pubkey_to_bin_index >= start_bin_index,
135 "{pubkey_to_bin_index}, {start_bin_index}"
136 ); pubkey_to_bin_index -= start_bin_index;
138 accumulator[pubkey_to_bin_index].push(*d); }
140
141 m2.stop();
142 }
143
144 fn get_mut(&mut self, ix: u64) -> &mut EntryType {
146 let start = self.get_element_offset_byte(ix);
147 let end = start + std::mem::size_of::<EntryType>();
148 assert!(
149 end <= self.capacity as usize,
150 "end: {end}, capacity: {}, ix: {ix}, cell size: {}",
151 self.capacity,
152 self.cell_size,
153 );
154 let bytes = &mut self.mmap[start..end];
155 bytemuck::from_bytes_mut(bytes)
156 }
157
158 fn get_slice(&self, ix: u64) -> &[EntryType] {
160 let start = self.get_element_offset_byte(ix);
161 let bytes = &self.mmap[start..];
162 debug_assert_eq!(bytes.len() % std::mem::size_of::<EntryType>(), 0);
164 bytemuck::cast_slice(bytes)
165 }
166
167 fn get_element_offset_byte(&self, ix: u64) -> usize {
169 let start = (ix * self.cell_size) as usize + std::mem::size_of::<Header>();
170 debug_assert_eq!(start % std::mem::align_of::<EntryType>(), 0);
171 start
172 }
173
174 fn get_header_mut(&mut self) -> &mut Header {
175 let bytes = &mut self.mmap[..std::mem::size_of::<Header>()];
176 bytemuck::from_bytes_mut(bytes)
177 }
178
179 fn new_map(file: impl AsRef<Path>, capacity: u64) -> Result<MmapMut, std::io::Error> {
180 let mut data = OpenOptions::new()
181 .read(true)
182 .write(true)
183 .create_new(true)
184 .open(file)?;
185
186 data.seek(SeekFrom::Start(capacity - 1)).unwrap();
190 data.write_all(&[0]).unwrap();
191 data.rewind().unwrap();
192 data.flush().unwrap();
193 Ok(unsafe { MmapMut::map_mut(&data).unwrap() })
194 }
195}
196
197pub(crate) struct CacheHashData {
198 cache_dir: PathBuf,
199 pre_existing_cache_files: Arc<Mutex<HashSet<PathBuf>>>,
200 deletion_policy: DeletionPolicy,
201 pub stats: Arc<CacheHashDataStats>,
202}
203
204impl Drop for CacheHashData {
205 fn drop(&mut self) {
206 self.delete_old_cache_files();
207 self.stats.report();
208 }
209}
210
211const IN_PROGRESS_SUFFIX: &str = ".in-progress";
213
214impl CacheHashData {
215 pub(crate) fn new(cache_dir: PathBuf, deletion_policy: DeletionPolicy) -> CacheHashData {
216 std::fs::create_dir_all(&cache_dir).unwrap_or_else(|err| {
217 panic!("error creating cache dir {}: {err}", cache_dir.display())
218 });
219
220 let result = CacheHashData {
221 cache_dir,
222 pre_existing_cache_files: Arc::new(Mutex::new(HashSet::default())),
223 deletion_policy,
224 stats: Arc::new(CacheHashDataStats::default()),
225 };
226
227 result.get_cache_files();
228 result
229 }
230
231 pub(crate) fn delete_old_cache_files(&self) {
233 let mut old_cache_files =
236 std::mem::take(&mut *self.pre_existing_cache_files.lock().unwrap());
237
238 match self.deletion_policy {
239 DeletionPolicy::AllUnused => {
240 }
242 DeletionPolicy::UnusedAtLeast(storages_start_slot) => {
243 old_cache_files.retain(|old_cache_file| {
246 let Some(parsed_filename) = parse_filename(old_cache_file) else {
247 return true;
249 };
250
251 parsed_filename.slot_range_start >= storages_start_slot
254 });
255 }
256 }
257
258 if !old_cache_files.is_empty() {
259 self.stats
260 .unused_cache_files
261 .fetch_add(old_cache_files.len(), Ordering::Relaxed);
262 for file_name in old_cache_files.iter() {
263 let result = self.cache_dir.join(file_name);
264 let _ = fs::remove_file(result);
265 }
266 }
267 }
268
269 fn get_cache_files(&self) {
270 if self.cache_dir.is_dir() {
271 let dir = fs::read_dir(&self.cache_dir);
272 if let Ok(dir) = dir {
273 let mut pre_existing = self.pre_existing_cache_files.lock().unwrap();
274 for entry in dir.flatten() {
275 if entry.path().ends_with(IN_PROGRESS_SUFFIX) {
276 let _ = fs::remove_file(entry.path());
278 continue;
279 }
280
281 if let Some(name) = entry.path().file_name() {
282 pre_existing.insert(PathBuf::from(name));
283 }
284 }
285 self.stats
286 .cache_file_count
287 .fetch_add(pre_existing.len(), Ordering::Relaxed);
288 }
289 }
290 }
291
292 pub(crate) fn get_file_reference_to_map_later(
295 &self,
296 file_name: impl AsRef<Path>,
297 ) -> Result<CacheHashDataFileReference, std::io::Error> {
298 let path = self.cache_dir.join(&file_name);
299 let file_len = std::fs::metadata(&path)?.len();
300 let mut m1 = Measure::start("read_file");
301
302 let file = OpenOptions::new()
303 .read(true)
304 .write(true)
305 .create(false)
306 .open(&path)?;
307 m1.stop();
308 self.stats.read_us.fetch_add(m1.as_us(), Ordering::Relaxed);
309 self.pre_existing_cache_file_will_be_used(file_name);
310
311 Ok(CacheHashDataFileReference {
312 file,
313 file_len,
314 path,
315 stats: Arc::clone(&self.stats),
316 })
317 }
318
319 fn pre_existing_cache_file_will_be_used(&self, file_name: impl AsRef<Path>) {
320 self.pre_existing_cache_files
321 .lock()
322 .unwrap()
323 .remove(file_name.as_ref());
324 }
325
326 pub(crate) fn save(
328 &self,
329 file_name: impl AsRef<Path>,
330 data: &SavedTypeSlice,
331 ) -> Result<(), std::io::Error> {
332 let cache_path = self.cache_dir.join(file_name.as_ref());
334 let _ignored = remove_file(&cache_path);
335
336 let work_in_progress_file_full_path = self
339 .cache_dir
340 .join(Self::get_work_in_progress_file_name(file_name.as_ref()));
341 Self::save_internal(&work_in_progress_file_full_path, data, &self.stats)?;
342 fs::rename(work_in_progress_file_full_path, cache_path)
348 }
349
350 fn get_work_in_progress_file_name(file_name: impl AsRef<Path>) -> PathBuf {
351 let mut s = PathBuf::from(file_name.as_ref()).into_os_string();
352 s.push(IN_PROGRESS_SUFFIX);
353 s.into()
354 }
355
356 fn save_internal(
357 in_progress_cache_file_full_path: impl AsRef<Path>,
358 data: &SavedTypeSlice,
359 stats: &CacheHashDataStats,
360 ) -> Result<(), std::io::Error> {
361 let mut m = Measure::start("save");
362 let _ignored = remove_file(&in_progress_cache_file_full_path);
363 let cell_size = std::mem::size_of::<EntryType>() as u64;
364 let mut m1 = Measure::start("create save");
365 let entries = data.iter().map(Vec::len).sum::<usize>();
366 let capacity = cell_size * (entries as u64) + std::mem::size_of::<Header>() as u64;
367
368 let mmap = CacheHashDataFile::new_map(&in_progress_cache_file_full_path, capacity)?;
369 m1.stop();
370 stats
371 .create_save_us
372 .fetch_add(m1.as_us(), Ordering::Relaxed);
373 let mut cache_file = CacheHashDataFile {
374 mmap,
375 cell_size,
376 capacity,
377 };
378
379 let header = cache_file.get_header_mut();
380 header.count = entries;
381
382 stats
383 .cache_file_size
384 .fetch_add(capacity as usize, Ordering::Relaxed);
385 stats.total_entries.fetch_add(entries, Ordering::Relaxed);
386
387 let mut m2 = Measure::start("write_to_mmap");
388 let mut i = 0;
389 data.iter().for_each(|x| {
390 x.iter().for_each(|item| {
391 let d = cache_file.get_mut(i as u64);
392 i += 1;
393 *d = *item;
394 })
395 });
396 assert_eq!(i, entries);
397 m2.stop();
398 m.stop();
399 stats
400 .write_to_mmap_us
401 .fetch_add(m2.as_us(), Ordering::Relaxed);
402 stats.save_us.fetch_add(m.as_us(), Ordering::Relaxed);
403 stats.saved_to_cache.fetch_add(1, Ordering::Relaxed);
404 Ok(())
405 }
406}
407
408#[derive(Debug)]
410pub struct ParsedFilename {
411 pub slot_range_start: Slot,
412 pub slot_range_end: Slot,
413 pub bin_range_start: u64,
414 pub bin_range_end: u64,
415 pub hash: u64,
416}
417
418pub fn parse_filename(cache_filename: impl AsRef<Path>) -> Option<ParsedFilename> {
422 let filename = cache_filename.as_ref().to_string_lossy().to_string();
423 let parts: Vec<_> = filename.split('.').collect(); if parts.len() != 5 {
425 return None;
426 }
427 let slot_range_start = parts.first()?.parse().ok()?;
428 let slot_range_end = parts.get(1)?.parse().ok()?;
429 let bin_range_start = parts.get(2)?.parse().ok()?;
430 let bin_range_end = parts.get(3)?.parse().ok()?;
431 let hash = u64::from_str_radix(parts.get(4)?, 16).ok()?; Some(ParsedFilename {
433 slot_range_start,
434 slot_range_end,
435 bin_range_start,
436 bin_range_end,
437 hash,
438 })
439}
440
441#[derive(Debug, Copy, Clone, Eq, PartialEq)]
445pub enum DeletionPolicy {
446 AllUnused,
449 UnusedAtLeast(Slot),
452}
453
454#[cfg(test)]
455mod tests {
456 use {super::*, crate::accounts_hash::AccountHash, rand::Rng};
457
458 impl CacheHashData {
459 fn load(
461 &self,
462 file_name: impl AsRef<Path>,
463 accumulator: &mut SavedType,
464 start_bin_index: usize,
465 bin_calculator: &PubkeyBinCalculator24,
466 ) -> Result<(), std::io::Error> {
467 let mut m = Measure::start("overall");
468 let cache_file = self.load_map(file_name)?;
469 cache_file.load_all(accumulator, start_bin_index, bin_calculator);
470 m.stop();
471 self.stats.load_us.fetch_add(m.as_us(), Ordering::Relaxed);
472 Ok(())
473 }
474
475 fn load_map(
477 &self,
478 file_name: impl AsRef<Path>,
479 ) -> Result<CacheHashDataFile, std::io::Error> {
480 let reference = self.get_file_reference_to_map_later(file_name)?;
481 reference.map()
482 }
483 }
484
485 #[test]
486 fn test_read_write() {
487 use tempfile::TempDir;
492 let tmpdir = TempDir::new().unwrap();
493 let cache_dir = tmpdir.path().to_path_buf();
494 std::fs::create_dir_all(&cache_dir).unwrap();
495
496 for bins in [1, 2, 4] {
497 let bin_calculator = PubkeyBinCalculator24::new(bins);
498 let num_points = 5;
499 let (data, _total_points) = generate_test_data(num_points, bins, &bin_calculator);
500 for passes in [1, 2] {
501 let bins_per_pass = bins / passes;
502 if bins_per_pass == 0 {
503 continue; }
505 for pass in 0..passes {
506 for flatten_data in [true, false] {
507 let mut data_this_pass = if flatten_data {
508 vec![vec![], vec![]]
509 } else {
510 vec![]
511 };
512 let start_bin_this_pass = pass * bins_per_pass;
513 for bin in 0..bins_per_pass {
514 let mut this_bin_data = data[bin + start_bin_this_pass].clone();
515 if flatten_data {
516 data_this_pass[0].append(&mut this_bin_data);
517 } else {
518 data_this_pass.push(this_bin_data);
519 }
520 }
521 let cache =
522 CacheHashData::new(cache_dir.clone(), DeletionPolicy::AllUnused);
523 let file_name = PathBuf::from("test");
524 cache.save(&file_name, &data_this_pass).unwrap();
525 cache.get_cache_files();
526 assert_eq!(
527 cache
528 .pre_existing_cache_files
529 .lock()
530 .unwrap()
531 .iter()
532 .collect::<Vec<_>>(),
533 vec![&file_name],
534 );
535 let mut accum = (0..bins_per_pass).map(|_| vec![]).collect();
536 cache
537 .load(&file_name, &mut accum, start_bin_this_pass, &bin_calculator)
538 .unwrap();
539 if flatten_data {
540 bin_data(
541 &mut data_this_pass,
542 &bin_calculator,
543 bins_per_pass,
544 start_bin_this_pass,
545 );
546 }
547 assert_eq!(
548 accum, data_this_pass,
549 "bins: {bins}, start_bin_this_pass: {start_bin_this_pass}, pass: {pass}, flatten: {flatten_data}, passes: {passes}"
550 );
551 }
552 }
553 }
554 }
555 }
556
557 fn bin_data(
558 data: &mut SavedType,
559 bin_calculator: &PubkeyBinCalculator24,
560 bins: usize,
561 start_bin: usize,
562 ) {
563 let mut accum: SavedType = (0..bins).map(|_| vec![]).collect();
564 data.drain(..).for_each(|mut x| {
565 x.drain(..).for_each(|item| {
566 let bin = bin_calculator.bin_from_pubkey(&item.pubkey);
567 accum[bin - start_bin].push(item);
568 })
569 });
570 *data = accum;
571 }
572
573 fn generate_test_data(
574 count: usize,
575 bins: usize,
576 binner: &PubkeyBinCalculator24,
577 ) -> (SavedType, usize) {
578 let mut rng = rand::thread_rng();
579 let mut ct = 0;
580 (
581 (0..bins)
582 .map(|bin| {
583 let rnd = rng.gen::<u64>() % (bins as u64);
584 if rnd < count as u64 {
585 (0..std::cmp::max(1, count / bins))
586 .map(|_| {
587 ct += 1;
588 let mut pk;
589 loop {
590 pk = atlas_pubkey::new_rand();
592 if binner.bin_from_pubkey(&pk) == bin {
593 break;
594 }
595 }
596
597 CalculateHashIntermediate {
598 hash: AccountHash(atlas_hash::Hash::new_unique()),
599 lamports: ct as u64,
600 pubkey: pk,
601 }
602 })
603 .collect::<Vec<_>>()
604 } else {
605 vec![]
606 }
607 })
608 .collect::<Vec<_>>(),
609 ct,
610 )
611 }
612
613 #[test]
614 #[allow(clippy::used_underscore_binding)]
615 fn test_parse_filename() {
616 let good_filename = "123.456.0.65536.537d65697d9b2baa";
617 let parsed_filename = parse_filename(good_filename).unwrap();
618 assert_eq!(parsed_filename.slot_range_start, 123);
619 assert_eq!(parsed_filename.slot_range_end, 456);
620 assert_eq!(parsed_filename.bin_range_start, 0);
621 assert_eq!(parsed_filename.bin_range_end, 65536);
622 assert_eq!(parsed_filename.hash, 0x537d65697d9b2baa);
623
624 let bad_filenames = [
625 "123-456-0-65536.537d65697d9b2baa",
627 "abc.456.0.65536.537d65697d9b2baa",
629 "123.xyz.0.65536.537d65697d9b2baa",
630 "123.456.?.65536.537d65697d9b2baa",
631 "123.456.0.@#$%^.537d65697d9b2baa",
632 "123.456.0.65536.base19shouldfail",
633 "123.456.0.65536.123456789012345678901234567890",
634 "123.456.0.65536.",
636 "123.456.0.65536",
637 "123.456.0.65536.537d65697d9b2baa.42",
639 "123.456.0.65536.537d65697d9b2baa.",
640 "123.456.0.65536.537d65697d9b2baa/",
641 ".123.456.0.65536.537d65697d9b2baa",
642 "/123.456.0.65536.537d65697d9b2baa",
643 ];
644 for bad_filename in bad_filenames {
645 assert!(parse_filename(bad_filename).is_none());
646 }
647 }
648
649 #[test]
650 fn tet_get_work_in_progress_file_name() {
651 let filename = "test";
652 let work_in_progress_filename = CacheHashData::get_work_in_progress_file_name(filename);
653 assert_eq!(work_in_progress_filename.as_os_str(), "test.in-progress");
654 }
655}