1use {
2 crate::{bucket_stats::BucketStats, MaxSearch},
3 memmap2::MmapMut,
4 rand::{thread_rng, Rng},
5 solana_measure::measure::Measure,
6 std::{
7 fs::{remove_file, OpenOptions},
8 io::{Seek, SeekFrom, Write},
9 num::NonZeroU64,
10 path::{Path, PathBuf},
11 sync::{
12 atomic::{AtomicU64, Ordering},
13 Arc,
14 },
15 },
16};
17
18pub const DEFAULT_CAPACITY_POW2: u8 = 5;
37
38pub trait BucketOccupied: BucketCapacity {
42 fn occupy(&mut self, element: &mut [u8], ix: usize);
44 fn free(&mut self, element: &mut [u8], ix: usize);
46 fn is_free(&self, element: &[u8], ix: usize) -> bool;
48 fn offset_to_first_data() -> usize;
52 fn new(capacity: Capacity) -> Self;
55 fn copying_entry(
58 &mut self,
59 _element_new: &mut [u8],
60 _ix_new: usize,
61 _other: &Self,
62 _element_old: &[u8],
63 _ix_old: usize,
64 ) {
65 }
66}
67
68pub trait BucketCapacity {
69 fn capacity(&self) -> u64;
70 fn capacity_pow2(&self) -> u8 {
71 unimplemented!();
72 }
73}
74
75pub struct BucketStorage<O: BucketOccupied> {
76 path: PathBuf,
77 mmap: MmapMut,
78 pub cell_size: u64,
79 pub count: Arc<AtomicU64>,
80 pub stats: Arc<BucketStats>,
81 pub max_search: MaxSearch,
82 pub contents: O,
83 pub delete_file_on_drop: bool,
85}
86
87#[derive(Debug)]
88pub enum BucketStorageError {
89 AlreadyOccupied,
90}
91
92impl<O: BucketOccupied> Drop for BucketStorage<O> {
93 fn drop(&mut self) {
94 if self.delete_file_on_drop {
95 self.delete();
96 }
97 }
98}
99
100#[derive(Debug, Eq, PartialEq, Copy, Clone)]
101pub(crate) enum IncludeHeader {
102 Header,
104 NoHeader,
106}
107
108#[derive(Debug, PartialEq, Copy, Clone)]
110pub enum Capacity {
111 Pow2(u8),
113 Actual(u64),
115}
116
117impl BucketCapacity for Capacity {
118 fn capacity(&self) -> u64 {
119 match self {
120 Capacity::Pow2(pow2) => 1 << *pow2,
121 Capacity::Actual(elements) => *elements,
122 }
123 }
124 fn capacity_pow2(&self) -> u8 {
125 match self {
126 Capacity::Pow2(pow2) => *pow2,
127 Capacity::Actual(_elements) => {
128 panic!("illegal to ask for pow2 from random capacity");
129 }
130 }
131 }
132}
133
134impl<O: BucketOccupied> BucketStorage<O> {
135 pub fn new_with_capacity(
138 drives: Arc<Vec<PathBuf>>,
139 num_elems: u64,
140 elem_size: u64,
141 mut capacity: Capacity,
142 max_search: MaxSearch,
143 stats: Arc<BucketStats>,
144 count: Arc<AtomicU64>,
145 ) -> (Self, u128) {
146 let offset = Self::get_offset_to_first_data();
147 let cell_size = elem_size * num_elems + offset;
148 let bytes = Self::allocate_to_fill_page(&mut capacity, cell_size);
149 let (mmap, path, file_name) = Self::new_map(&drives, bytes, &stats);
150 (
151 Self {
152 path,
153 mmap,
154 cell_size,
155 count,
156 stats,
157 max_search,
158 contents: O::new(capacity),
159 delete_file_on_drop: true,
161 },
162 file_name,
163 )
164 }
165
166 fn allocate_to_fill_page(capacity: &mut Capacity, cell_size: u64) -> u64 {
167 let mut bytes = capacity.capacity() * cell_size;
168 if let Capacity::Actual(_) = capacity {
169 const PAGE_SIZE: u64 = 4 * 1024;
171 let full_page_bytes = bytes / PAGE_SIZE * PAGE_SIZE / cell_size * cell_size;
172 if full_page_bytes < bytes {
173 let bytes_new = ((bytes / PAGE_SIZE) + 1) * PAGE_SIZE / cell_size * cell_size;
174 assert!(
175 bytes_new >= bytes,
176 "allocating less than requested, capacity: {}, bytes: {bytes}, bytes_new: \
177 {bytes_new}, full_page_bytes: {full_page_bytes}",
178 capacity.capacity()
179 );
180 assert_eq!(bytes_new % cell_size, 0);
181 bytes = bytes_new;
182 *capacity = Capacity::Actual(bytes / cell_size);
183 }
184 }
185 bytes
186 }
187
188 fn delete(&self) {
190 _ = remove_file(&self.path);
191 }
192
193 pub fn max_search(&self) -> u64 {
194 self.max_search as u64
195 }
196
197 pub fn new(
198 drives: Arc<Vec<PathBuf>>,
199 num_elems: u64,
200 elem_size: u64,
201 max_search: MaxSearch,
202 stats: Arc<BucketStats>,
203 count: Arc<AtomicU64>,
204 ) -> (Self, u128) {
205 Self::new_with_capacity(
206 drives,
207 num_elems,
208 elem_size,
209 Capacity::Pow2(DEFAULT_CAPACITY_POW2),
210 max_search,
211 stats,
212 count,
213 )
214 }
215
216 fn get_offset_to_first_data() -> u64 {
217 let offset = O::offset_to_first_data() as u64;
218 let size_of_u64 = std::mem::size_of::<u64>() as u64;
219 assert_eq!(
220 offset / size_of_u64 * size_of_u64,
221 offset,
222 "header size must be a multiple of u64"
223 );
224 offset
225 }
226
227 pub(crate) fn load_on_restart(
229 path: PathBuf,
230 elem_size: NonZeroU64,
231 max_search: MaxSearch,
232 stats: Arc<BucketStats>,
233 count: Arc<AtomicU64>,
234 ) -> Option<Self> {
235 let offset = Self::get_offset_to_first_data();
236 let num_elems = std::fs::metadata(&path)
237 .ok()
238 .map(|metadata| metadata.len().saturating_sub(offset) / elem_size)?;
239 if num_elems == 0 {
240 return None;
241 }
242 let mmap = Self::map_open_file(&path, false, 0, &stats)?;
243 Some(Self {
244 path,
245 mmap,
246 cell_size: elem_size.into(),
247 count,
248 stats,
249 max_search,
250 contents: O::new(Capacity::Actual(num_elems)),
251 delete_file_on_drop: false,
253 })
254 }
255
256 pub(crate) fn copying_entry(&mut self, ix_new: u64, other: &Self, ix_old: u64) {
257 let start = self.get_start_offset_with_header(ix_new);
258 let start_old = other.get_start_offset_with_header(ix_old);
259 self.contents.copying_entry(
260 &mut self.mmap[start..],
261 ix_new as usize,
262 &other.contents,
263 &other.mmap[start_old..],
264 ix_old as usize,
265 );
266 }
267
268 pub fn is_free(&self, ix: u64) -> bool {
270 let start = self.get_start_offset_with_header(ix);
271 let entry = &self.mmap[start..];
272 self.contents.is_free(entry, ix as usize)
273 }
274
275 pub(crate) fn try_lock(&mut self, ix: u64) -> bool {
277 let start = self.get_start_offset_with_header(ix);
278 let entry = &mut self.mmap[start..];
279 if self.contents.is_free(entry, ix as usize) {
280 self.contents.occupy(entry, ix as usize);
281 true
282 } else {
283 false
284 }
285 }
286
287 pub fn occupy(&mut self, ix: u64, is_resizing: bool) -> Result<(), BucketStorageError> {
290 debug_assert!(ix < self.capacity(), "occupy: bad index size");
291 if self.try_lock(ix) {
293 if !is_resizing {
294 self.count.fetch_add(1, Ordering::Relaxed);
295 }
296 Ok(())
297 } else {
298 Err(BucketStorageError::AlreadyOccupied)
299 }
300 }
301
302 pub fn free(&mut self, ix: u64) {
303 debug_assert!(ix < self.capacity(), "bad index size");
304 let start = self.get_start_offset_with_header(ix);
305 self.contents.free(&mut self.mmap[start..], ix as usize);
306 self.count.fetch_sub(1, Ordering::Relaxed);
307 }
308
309 fn get_start_offset_with_header(&self, ix: u64) -> usize {
310 debug_assert!(ix < self.capacity(), "bad index size");
311 (self.cell_size * ix) as usize
312 }
313
314 fn get_start_offset(&self, ix: u64, header: IncludeHeader) -> usize {
315 self.get_start_offset_with_header(ix)
316 + match header {
317 IncludeHeader::Header => 0,
318 IncludeHeader::NoHeader => O::offset_to_first_data(),
319 }
320 }
321
322 pub(crate) fn get_header<T>(&self, ix: u64) -> &T {
323 let slice = self.get_slice::<T>(ix, 1, IncludeHeader::Header);
324 unsafe { slice.get_unchecked(0) }
326 }
327
328 pub(crate) fn get_header_mut<T>(&mut self, ix: u64) -> &mut T {
329 let slice = self.get_slice_mut::<T>(ix, 1, IncludeHeader::Header);
330 unsafe { slice.get_unchecked_mut(0) }
332 }
333
334 pub(crate) fn get<T>(&self, ix: u64) -> &T {
335 let slice = self.get_slice::<T>(ix, 1, IncludeHeader::NoHeader);
336 unsafe { slice.get_unchecked(0) }
338 }
339
340 pub(crate) fn get_mut<T>(&mut self, ix: u64) -> &mut T {
341 let slice = self.get_slice_mut::<T>(ix, 1, IncludeHeader::NoHeader);
342 unsafe { slice.get_unchecked_mut(0) }
344 }
345
346 pub(crate) fn get_slice<T>(&self, ix: u64, len: u64, header: IncludeHeader) -> &[T] {
347 debug_assert!(
349 (header == IncludeHeader::NoHeader) || (header == IncludeHeader::Header && len == 1)
350 );
351 let start = self.get_start_offset(ix, header);
352 let slice = {
353 let size = std::mem::size_of::<T>() * len as usize;
354 let slice = &self.mmap[start..];
355 debug_assert!(slice.len() >= size);
356 &slice[..size]
357 };
358 let ptr = {
359 let ptr = slice.as_ptr().cast();
360 debug_assert!(ptr as usize % std::mem::align_of::<T>() == 0);
361 ptr
362 };
363 unsafe { std::slice::from_raw_parts(ptr, len as usize) }
364 }
365
366 pub(crate) fn get_slice_mut<T>(
367 &mut self,
368 ix: u64,
369 len: u64,
370 header: IncludeHeader,
371 ) -> &mut [T] {
372 debug_assert!(
374 (header == IncludeHeader::NoHeader) || (header == IncludeHeader::Header && len == 1)
375 );
376 let start = self.get_start_offset(ix, header);
377 let slice = {
378 let size = std::mem::size_of::<T>() * len as usize;
379 let slice = &mut self.mmap[start..];
380 debug_assert!(slice.len() >= size);
381 &mut slice[..size]
382 };
383 let ptr = {
384 let ptr = slice.as_mut_ptr().cast();
385 debug_assert!(ptr as usize % std::mem::align_of::<T>() == 0);
386 ptr
387 };
388 unsafe { std::slice::from_raw_parts_mut(ptr, len as usize) }
389 }
390
391 fn map_open_file(
394 path: impl AsRef<Path> + std::fmt::Debug + Clone,
395 create: bool,
396 create_bytes: u64,
397 stats: &BucketStats,
398 ) -> Option<MmapMut> {
399 let mut measure_new_file = Measure::start("measure_new_file");
400 let data = OpenOptions::new()
401 .read(true)
402 .write(true)
403 .create(create)
404 .open(&path);
405 if let Err(err) = data {
406 if !create {
407 return None;
409 }
410 panic!(
411 "Unable to create data file '{}' in current dir ({:?}): {err}",
412 path.as_ref().display(),
413 std::env::current_dir(),
414 );
415 }
416 let mut data = data.unwrap();
417
418 if create {
419 data.seek(SeekFrom::Start(create_bytes - 1)).unwrap();
424 data.write_all(&[0]).unwrap();
425 data.rewind().unwrap();
426 measure_new_file.stop();
427 let measure_flush = Measure::start("measure_flush");
428 data.flush().unwrap(); stats
430 .flush_file_us
431 .fetch_add(measure_flush.end_as_us(), Ordering::Relaxed);
432 }
433 let mut measure_mmap = Measure::start("measure_mmap");
434 let mmap = unsafe { MmapMut::map_mut(&data) }.unwrap_or_else(|err| {
435 panic!(
436 "Unable to mmap file '{}' in current dir ({:?}): {err}",
437 path.as_ref().display(),
438 std::env::current_dir(),
439 );
440 });
441 #[cfg(unix)]
444 mmap.advise(memmap2::Advice::Random).unwrap();
445 measure_mmap.stop();
446 stats
447 .new_file_us
448 .fetch_add(measure_new_file.as_us(), Ordering::Relaxed);
449 stats
450 .mmap_us
451 .fetch_add(measure_mmap.as_us(), Ordering::Relaxed);
452 Some(mmap)
453 }
454
455 fn new_map(drives: &[PathBuf], bytes: u64, stats: &BucketStats) -> (MmapMut, PathBuf, u128) {
457 let r = thread_rng().gen_range(0..drives.len());
458 let drive = &drives[r];
459 let file_random = thread_rng().gen_range(0..u128::MAX);
460 let pos = format!("{file_random}");
461 let file = drive.join(pos);
462 let res = Self::map_open_file(file.clone(), true, bytes, stats).unwrap();
463
464 (res, file, file_random)
465 }
466
467 fn copy_contents(&mut self, old_bucket: &Self) {
470 let mut m = Measure::start("grow");
471 let old_cap = old_bucket.capacity();
472 let old_map = &old_bucket.mmap;
473
474 let increment = self.contents.capacity_pow2() - old_bucket.contents.capacity_pow2();
475 let index_grow = 1 << increment;
476 (0..old_cap as usize).for_each(|i| {
477 if !old_bucket.is_free(i as u64) {
478 self.copying_entry((i * index_grow) as u64, old_bucket, i as u64);
479
480 {
481 let start = self.get_start_offset_with_header((i * index_grow) as u64);
484 self.contents
485 .occupy(&mut self.mmap[start..], i * index_grow);
486 }
487 let old_ix = i * old_bucket.cell_size as usize;
488 let new_ix = old_ix * index_grow;
489 let dst_slice: &[u8] = &self.mmap[new_ix..new_ix + old_bucket.cell_size as usize];
490 let src_slice: &[u8] = &old_map[old_ix..old_ix + old_bucket.cell_size as usize];
491
492 unsafe {
493 let dst = dst_slice.as_ptr() as *mut _;
494 let src = src_slice.as_ptr();
495 std::ptr::copy_nonoverlapping(src, dst, old_bucket.cell_size as usize);
496 };
497 }
498 });
499 m.stop();
500 self.stats.resizes.fetch_add(1, Ordering::Relaxed);
502 self.stats.resize_us.fetch_add(m.as_us(), Ordering::Relaxed);
503 }
504
505 pub fn update_max_size(&self) {
506 self.stats.update_max_size(self.capacity());
507 }
508
509 pub fn new_resized(
511 drives: &Arc<Vec<PathBuf>>,
512 max_search: MaxSearch,
513 bucket: Option<&Self>,
514 capacity: Capacity,
515 num_elems: u64,
516 elem_size: u64,
517 stats: &Arc<BucketStats>,
518 ) -> (Self, u128) {
519 let (mut new_bucket, file_name) = Self::new_with_capacity(
520 Arc::clone(drives),
521 num_elems,
522 elem_size,
523 capacity,
524 max_search,
525 Arc::clone(stats),
526 #[allow(clippy::map_clone)] bucket
528 .map(|bucket| Arc::clone(&bucket.count))
529 .unwrap_or_default(),
530 );
531 if let Some(bucket) = bucket {
532 new_bucket.copy_contents(bucket);
533 }
534 new_bucket.update_max_size();
535 (new_bucket, file_name)
536 }
537
538 pub(crate) fn capacity_bytes(&self) -> u64 {
540 self.capacity() * self.cell_size
541 }
542
543 pub fn capacity(&self) -> u64 {
545 self.contents.capacity()
546 }
547}
548
549#[cfg(test)]
550mod test {
551 use {
552 super::*,
553 crate::{
554 bucket_storage::BucketOccupied,
555 index_entry::{BucketWithHeader, IndexBucket},
556 },
557 tempfile::tempdir,
558 };
559
560 #[test]
561 fn test_bucket_storage_index_bucket() {
562 let tmpdir = tempdir().unwrap();
563 let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
564 assert!(!paths.is_empty());
565
566 let drives = Arc::new(paths);
567 let num_elems = 1;
568 let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
569 let max_search = 1;
570 let stats = Arc::default();
571 let count = Arc::default();
572 let mut storage = BucketStorage::<IndexBucket<u64>>::new(
574 drives, num_elems, elem_size, max_search, stats, count,
575 )
576 .0;
577 let ix = 0;
578 assert!(storage.is_free(ix));
579 assert!(storage.occupy(ix, false).is_ok());
580 }
581
582 #[test]
583 fn test_bucket_storage_using_header() {
584 let tmpdir = tempdir().unwrap();
585 let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
586 assert!(!paths.is_empty());
587
588 let drives = Arc::new(paths);
589 let num_elems = 1;
590 let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
591 let max_search = 1;
592 let stats = Arc::default();
593 let count = Arc::default();
594 let mut storage = BucketStorage::<BucketWithHeader>::new(
595 drives, num_elems, elem_size, max_search, stats, count,
596 )
597 .0;
598 let ix = 0;
599 assert!(storage.is_free(ix));
600 assert!(storage.occupy(ix, false).is_ok());
601 assert!(storage.occupy(ix, false).is_err());
602 assert!(!storage.is_free(ix));
603 storage.free(ix);
604 assert!(storage.is_free(ix));
605 assert!(storage.is_free(ix));
606 assert!(storage.occupy(ix, false).is_ok());
607 assert!(storage.occupy(ix, false).is_err());
608 assert!(!storage.is_free(ix));
609 storage.free(ix);
610 assert!(storage.is_free(ix));
611 }
612
613 #[test]
614 fn test_load_on_restart_failures() {
615 let tmpdir = tempdir().unwrap();
616 let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
617 assert!(!paths.is_empty());
618 let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
619 let max_search = 1;
620 let stats = Arc::new(BucketStats::default());
621 let count = Arc::new(AtomicU64::default());
622 assert!(BucketStorage::<IndexBucket<u64>>::load_on_restart(
624 PathBuf::from(tmpdir.path()),
625 NonZeroU64::new(elem_size).unwrap(),
626 max_search,
627 stats.clone(),
628 count.clone(),
629 )
630 .is_none());
631 solana_logger::setup();
632 for len in [0, 1, 47, 48, 49, 4097] {
633 let path = tmpdir.path().join("small");
635 let mut file = OpenOptions::new()
636 .read(true)
637 .write(true)
638 .create(true)
639 .truncate(true)
640 .open(path.clone())
641 .unwrap();
642 _ = file.write_all(&vec![1u8; len]);
643 drop(file);
644 assert_eq!(std::fs::metadata(&path).unwrap().len(), len as u64);
645 let result = BucketStorage::<IndexBucket<u64>>::load_on_restart(
646 path,
647 NonZeroU64::new(elem_size).unwrap(),
648 max_search,
649 stats.clone(),
650 count.clone(),
651 );
652 if let Some(result) = result.as_ref() {
653 assert_eq!(result.capacity() as usize, len / elem_size as usize);
654 assert_eq!(
655 result.capacity_bytes() as usize,
656 len / elem_size as usize * elem_size as usize
657 );
658 }
659 assert_eq!(result.is_none(), len < elem_size as usize, "{len}");
660 }
661 }
662
663 #[test]
664 fn test_load_on_restart() {
665 for request in [Some(7), None] {
666 let tmpdir = tempdir().unwrap();
667 let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
668 assert!(!paths.is_empty());
669 let drives = Arc::new(paths);
670 let num_elems = 1;
671 let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
672 let max_search = 1;
673 let stats = Arc::new(BucketStats::default());
674 let count = Arc::new(AtomicU64::default());
675 let mut storage = if let Some(actual_elems) = request {
676 BucketStorage::<IndexBucket<u64>>::new_with_capacity(
677 drives,
678 num_elems,
679 elem_size,
680 Capacity::Actual(actual_elems),
681 max_search,
682 stats.clone(),
683 count.clone(),
684 )
685 .0
686 } else {
687 BucketStorage::<IndexBucket<u64>>::new(
688 drives,
689 num_elems,
690 elem_size,
691 max_search,
692 stats.clone(),
693 count.clone(),
694 )
695 .0
696 };
697 let expected_capacity = storage.capacity();
698 (0..num_elems).for_each(|ix| {
699 assert!(storage.is_free(ix));
700 assert!(storage.occupy(ix, false).is_ok());
701 });
702 storage.delete_file_on_drop = false;
703 let len = storage.mmap.len();
704 (0..expected_capacity as usize).for_each(|i| {
705 storage.mmap[i] = (i % 256) as u8;
706 });
707 let path = storage.path.clone();
709 drop(storage);
710
711 let storage = BucketStorage::<IndexBucket<u64>>::load_on_restart(
713 path,
714 NonZeroU64::new(elem_size).unwrap(),
715 max_search,
716 stats,
717 count,
718 )
719 .unwrap();
720 assert_eq!(storage.capacity(), expected_capacity);
721 assert_eq!(len, storage.mmap.len());
722 (0..expected_capacity as usize).for_each(|i| {
723 assert_eq!(storage.mmap[i], (i % 256) as u8);
724 });
725 (0..num_elems).for_each(|ix| {
726 assert!(storage.is_free(ix));
728 });
729 }
730 }
731
732 #[test]
733 #[should_panic]
734 fn test_header_bad_size() {
735 struct BucketBadHeader;
736 impl BucketCapacity for BucketBadHeader {
737 fn capacity(&self) -> u64 {
738 unimplemented!();
739 }
740 }
741 impl BucketOccupied for BucketBadHeader {
742 fn occupy(&mut self, _element: &mut [u8], _ix: usize) {
743 unimplemented!();
744 }
745 fn free(&mut self, _element: &mut [u8], _ix: usize) {
746 unimplemented!();
747 }
748 fn is_free(&self, _element: &[u8], _ix: usize) -> bool {
749 unimplemented!();
750 }
751 fn offset_to_first_data() -> usize {
752 std::mem::size_of::<u64>() - 1
754 }
755 fn new(_num_elements: Capacity) -> Self {
756 Self
757 }
758 }
759
760 BucketStorage::<BucketBadHeader>::new_with_capacity(
762 Arc::default(),
763 0,
764 0,
765 Capacity::Pow2(0),
766 0,
767 Arc::default(),
768 Arc::default(),
769 );
770 }
771}