Skip to main content

solana_bucket_map/
bucket_storage.rs

1use {
2    crate::{bucket_stats::BucketStats, MaxSearch},
3    memmap2::MmapMut,
4    rand::{thread_rng, Rng},
5    solana_measure::measure::Measure,
6    std::{
7        fs::{remove_file, OpenOptions},
8        io::{Seek, SeekFrom, Write},
9        num::NonZeroU64,
10        path::{Path, PathBuf},
11        sync::{
12            atomic::{AtomicU64, Ordering},
13            Arc,
14        },
15    },
16};
17
18/*
191	2
202	4
213	8
224	16
235	32
246	64
257	128
268	256
279	512
2810	1,024
2911	2,048
3012	4,096
3113	8,192
3214	16,384
3323  8,388,608
3424  16,777,216
35*/
36pub const DEFAULT_CAPACITY_POW2: u8 = 5;
37
38/// keep track of an individual element's occupied vs. free state
39/// every element must either be occupied or free and should never be double occupied or double freed
40/// For parameters below, `element` is used to view/modify header fields or fields within the element data.
41pub trait BucketOccupied: BucketCapacity {
42    /// set entry at `ix` as occupied (as opposed to free)
43    fn occupy(&mut self, element: &mut [u8], ix: usize);
44    /// set entry at `ix` as free
45    fn free(&mut self, element: &mut [u8], ix: usize);
46    /// return true if entry at `ix` is free
47    fn is_free(&self, element: &[u8], ix: usize) -> bool;
48    /// # of bytes prior to first data held in the element.
49    /// This is the header size, if a header exists per element in the data.
50    /// This must be a multiple of sizeof(u64)
51    fn offset_to_first_data() -> usize;
52    /// initialize this struct
53    /// `capacity` is the number of elements allocated in the bucket
54    fn new(capacity: Capacity) -> Self;
55    /// copying entry. Any in-memory (per-bucket) data structures may need to be copied for this `ix_old`.
56    /// no-op by default
57    fn copying_entry(
58        &mut self,
59        _element_new: &mut [u8],
60        _ix_new: usize,
61        _other: &Self,
62        _element_old: &[u8],
63        _ix_old: usize,
64    ) {
65    }
66}
67
68pub trait BucketCapacity {
69    fn capacity(&self) -> u64;
70    fn capacity_pow2(&self) -> u8 {
71        unimplemented!();
72    }
73}
74
75pub struct BucketStorage<O: BucketOccupied> {
76    path: PathBuf,
77    mmap: MmapMut,
78    pub cell_size: u64,
79    pub count: Arc<AtomicU64>,
80    pub stats: Arc<BucketStats>,
81    pub max_search: MaxSearch,
82    pub contents: O,
83    /// true if when this bucket is dropped, the file should be deleted
84    pub delete_file_on_drop: bool,
85}
86
87#[derive(Debug)]
88pub enum BucketStorageError {
89    AlreadyOccupied,
90}
91
92impl<O: BucketOccupied> Drop for BucketStorage<O> {
93    fn drop(&mut self) {
94        if self.delete_file_on_drop {
95            self.delete();
96        }
97    }
98}
99
100#[derive(Debug, Eq, PartialEq, Copy, Clone)]
101pub(crate) enum IncludeHeader {
102    /// caller wants header included
103    Header,
104    /// caller wants header skipped
105    NoHeader,
106}
107
108/// 2 common ways of specifying capacity
109#[derive(Debug, PartialEq, Copy, Clone)]
110pub enum Capacity {
111    /// 1 << Pow2 produces # elements
112    Pow2(u8),
113    /// Actual # elements
114    Actual(u64),
115}
116
117impl BucketCapacity for Capacity {
118    fn capacity(&self) -> u64 {
119        match self {
120            Capacity::Pow2(pow2) => 1 << *pow2,
121            Capacity::Actual(elements) => *elements,
122        }
123    }
124    fn capacity_pow2(&self) -> u8 {
125        match self {
126            Capacity::Pow2(pow2) => *pow2,
127            Capacity::Actual(_elements) => {
128                panic!("illegal to ask for pow2 from random capacity");
129            }
130        }
131    }
132}
133
134impl<O: BucketOccupied> BucketStorage<O> {
135    /// allocate a bucket of at least `capacity` elements.
136    /// if capacity can be random, more may be allocated to fill the last page.
137    pub fn new_with_capacity(
138        drives: Arc<Vec<PathBuf>>,
139        num_elems: u64,
140        elem_size: u64,
141        mut capacity: Capacity,
142        max_search: MaxSearch,
143        stats: Arc<BucketStats>,
144        count: Arc<AtomicU64>,
145    ) -> (Self, u128) {
146        let offset = Self::get_offset_to_first_data();
147        let cell_size = elem_size * num_elems + offset;
148        let bytes = Self::allocate_to_fill_page(&mut capacity, cell_size);
149        let (mmap, path, file_name) = Self::new_map(&drives, bytes, &stats);
150        (
151            Self {
152                path,
153                mmap,
154                cell_size,
155                count,
156                stats,
157                max_search,
158                contents: O::new(capacity),
159                // by default, newly created files will get deleted when dropped
160                delete_file_on_drop: true,
161            },
162            file_name,
163        )
164    }
165
166    fn allocate_to_fill_page(capacity: &mut Capacity, cell_size: u64) -> u64 {
167        let mut bytes = capacity.capacity() * cell_size;
168        if let Capacity::Actual(_) = capacity {
169            // maybe bump up allocation to fit a page size
170            const PAGE_SIZE: u64 = 4 * 1024;
171            let full_page_bytes = bytes / PAGE_SIZE * PAGE_SIZE / cell_size * cell_size;
172            if full_page_bytes < bytes {
173                let bytes_new = ((bytes / PAGE_SIZE) + 1) * PAGE_SIZE / cell_size * cell_size;
174                assert!(
175                    bytes_new >= bytes,
176                    "allocating less than requested, capacity: {}, bytes: {bytes}, bytes_new: \
177                     {bytes_new}, full_page_bytes: {full_page_bytes}",
178                    capacity.capacity()
179                );
180                assert_eq!(bytes_new % cell_size, 0);
181                bytes = bytes_new;
182                *capacity = Capacity::Actual(bytes / cell_size);
183            }
184        }
185        bytes
186    }
187
188    /// delete the backing file on disk
189    fn delete(&self) {
190        _ = remove_file(&self.path);
191    }
192
193    pub fn max_search(&self) -> u64 {
194        self.max_search as u64
195    }
196
197    pub fn new(
198        drives: Arc<Vec<PathBuf>>,
199        num_elems: u64,
200        elem_size: u64,
201        max_search: MaxSearch,
202        stats: Arc<BucketStats>,
203        count: Arc<AtomicU64>,
204    ) -> (Self, u128) {
205        Self::new_with_capacity(
206            drives,
207            num_elems,
208            elem_size,
209            Capacity::Pow2(DEFAULT_CAPACITY_POW2),
210            max_search,
211            stats,
212            count,
213        )
214    }
215
216    fn get_offset_to_first_data() -> u64 {
217        let offset = O::offset_to_first_data() as u64;
218        let size_of_u64 = std::mem::size_of::<u64>() as u64;
219        assert_eq!(
220            offset / size_of_u64 * size_of_u64,
221            offset,
222            "header size must be a multiple of u64"
223        );
224        offset
225    }
226
227    /// load and mmap the file that is this disk bucket if possible
228    pub(crate) fn load_on_restart(
229        path: PathBuf,
230        elem_size: NonZeroU64,
231        max_search: MaxSearch,
232        stats: Arc<BucketStats>,
233        count: Arc<AtomicU64>,
234    ) -> Option<Self> {
235        let offset = Self::get_offset_to_first_data();
236        let num_elems = std::fs::metadata(&path)
237            .ok()
238            .map(|metadata| metadata.len().saturating_sub(offset) / elem_size)?;
239        if num_elems == 0 {
240            return None;
241        }
242        let mmap = Self::map_open_file(&path, false, 0, &stats)?;
243        Some(Self {
244            path,
245            mmap,
246            cell_size: elem_size.into(),
247            count,
248            stats,
249            max_search,
250            contents: O::new(Capacity::Actual(num_elems)),
251            // since we loaded it, it persisted from last time, so we obviously want to keep it present disk.
252            delete_file_on_drop: false,
253        })
254    }
255
256    pub(crate) fn copying_entry(&mut self, ix_new: u64, other: &Self, ix_old: u64) {
257        let start = self.get_start_offset_with_header(ix_new);
258        let start_old = other.get_start_offset_with_header(ix_old);
259        self.contents.copying_entry(
260            &mut self.mmap[start..],
261            ix_new as usize,
262            &other.contents,
263            &other.mmap[start_old..],
264            ix_old as usize,
265        );
266    }
267
268    /// true if the entry at index 'ix' is free (as opposed to being occupied)
269    pub fn is_free(&self, ix: u64) -> bool {
270        let start = self.get_start_offset_with_header(ix);
271        let entry = &self.mmap[start..];
272        self.contents.is_free(entry, ix as usize)
273    }
274
275    /// try to occupy `ix`. return true if successful
276    pub(crate) fn try_lock(&mut self, ix: u64) -> bool {
277        let start = self.get_start_offset_with_header(ix);
278        let entry = &mut self.mmap[start..];
279        if self.contents.is_free(entry, ix as usize) {
280            self.contents.occupy(entry, ix as usize);
281            true
282        } else {
283            false
284        }
285    }
286
287    /// 'is_resizing' true if caller is resizing the index (so don't increment count)
288    /// 'is_resizing' false if caller is adding an item to the index (so increment count)
289    pub fn occupy(&mut self, ix: u64, is_resizing: bool) -> Result<(), BucketStorageError> {
290        debug_assert!(ix < self.capacity(), "occupy: bad index size");
291        //debug!("ALLOC {} {}", ix, uid);
292        if self.try_lock(ix) {
293            if !is_resizing {
294                self.count.fetch_add(1, Ordering::Relaxed);
295            }
296            Ok(())
297        } else {
298            Err(BucketStorageError::AlreadyOccupied)
299        }
300    }
301
302    pub fn free(&mut self, ix: u64) {
303        debug_assert!(ix < self.capacity(), "bad index size");
304        let start = self.get_start_offset_with_header(ix);
305        self.contents.free(&mut self.mmap[start..], ix as usize);
306        self.count.fetch_sub(1, Ordering::Relaxed);
307    }
308
309    fn get_start_offset_with_header(&self, ix: u64) -> usize {
310        debug_assert!(ix < self.capacity(), "bad index size");
311        (self.cell_size * ix) as usize
312    }
313
314    fn get_start_offset(&self, ix: u64, header: IncludeHeader) -> usize {
315        self.get_start_offset_with_header(ix)
316            + match header {
317                IncludeHeader::Header => 0,
318                IncludeHeader::NoHeader => O::offset_to_first_data(),
319            }
320    }
321
322    pub(crate) fn get_header<T>(&self, ix: u64) -> &T {
323        let slice = self.get_slice::<T>(ix, 1, IncludeHeader::Header);
324        // SAFETY: `get_cell_slice` ensures there's at least one element in the slice
325        unsafe { slice.get_unchecked(0) }
326    }
327
328    pub(crate) fn get_header_mut<T>(&mut self, ix: u64) -> &mut T {
329        let slice = self.get_slice_mut::<T>(ix, 1, IncludeHeader::Header);
330        // SAFETY: `get_mut_cell_slice` ensures there's at least one element in the slice
331        unsafe { slice.get_unchecked_mut(0) }
332    }
333
334    pub(crate) fn get<T>(&self, ix: u64) -> &T {
335        let slice = self.get_slice::<T>(ix, 1, IncludeHeader::NoHeader);
336        // SAFETY: `get_cell_slice` ensures there's at least one element in the slice
337        unsafe { slice.get_unchecked(0) }
338    }
339
340    pub(crate) fn get_mut<T>(&mut self, ix: u64) -> &mut T {
341        let slice = self.get_slice_mut::<T>(ix, 1, IncludeHeader::NoHeader);
342        // SAFETY: `get_mut_cell_slice` ensures there's at least one element in the slice
343        unsafe { slice.get_unchecked_mut(0) }
344    }
345
346    pub(crate) fn get_slice<T>(&self, ix: u64, len: u64, header: IncludeHeader) -> &[T] {
347        // If the caller is including the header, then `len` *must* be 1
348        debug_assert!(
349            (header == IncludeHeader::NoHeader) || (header == IncludeHeader::Header && len == 1)
350        );
351        let start = self.get_start_offset(ix, header);
352        let slice = {
353            let size = std::mem::size_of::<T>() * len as usize;
354            let slice = &self.mmap[start..];
355            debug_assert!(slice.len() >= size);
356            &slice[..size]
357        };
358        let ptr = {
359            let ptr = slice.as_ptr().cast();
360            debug_assert!(ptr as usize % std::mem::align_of::<T>() == 0);
361            ptr
362        };
363        unsafe { std::slice::from_raw_parts(ptr, len as usize) }
364    }
365
366    pub(crate) fn get_slice_mut<T>(
367        &mut self,
368        ix: u64,
369        len: u64,
370        header: IncludeHeader,
371    ) -> &mut [T] {
372        // If the caller is including the header, then `len` *must* be 1
373        debug_assert!(
374            (header == IncludeHeader::NoHeader) || (header == IncludeHeader::Header && len == 1)
375        );
376        let start = self.get_start_offset(ix, header);
377        let slice = {
378            let size = std::mem::size_of::<T>() * len as usize;
379            let slice = &mut self.mmap[start..];
380            debug_assert!(slice.len() >= size);
381            &mut slice[..size]
382        };
383        let ptr = {
384            let ptr = slice.as_mut_ptr().cast();
385            debug_assert!(ptr as usize % std::mem::align_of::<T>() == 0);
386            ptr
387        };
388        unsafe { std::slice::from_raw_parts_mut(ptr, len as usize) }
389    }
390
391    /// open a disk bucket file and mmap it
392    /// optionally creates it.
393    fn map_open_file(
394        path: impl AsRef<Path> + std::fmt::Debug + Clone,
395        create: bool,
396        create_bytes: u64,
397        stats: &BucketStats,
398    ) -> Option<MmapMut> {
399        let mut measure_new_file = Measure::start("measure_new_file");
400        let data = OpenOptions::new()
401            .read(true)
402            .write(true)
403            .create(create)
404            .open(&path);
405        if let Err(err) = data {
406            if !create {
407                // we can't load this file, so bail without error
408                return None;
409            }
410            panic!(
411                "Unable to create data file '{}' in current dir ({:?}): {err}",
412                path.as_ref().display(),
413                std::env::current_dir(),
414            );
415        }
416        let mut data = data.unwrap();
417
418        if create {
419            // Theoretical performance optimization: write a zero to the end of
420            // the file so that we won't have to resize it later, which may be
421            // expensive.
422            //debug!("GROWING file {}", capacity * cell_size as u64);
423            data.seek(SeekFrom::Start(create_bytes - 1)).unwrap();
424            data.write_all(&[0]).unwrap();
425            data.rewind().unwrap();
426            measure_new_file.stop();
427            let measure_flush = Measure::start("measure_flush");
428            data.flush().unwrap(); // can we skip this?
429            stats
430                .flush_file_us
431                .fetch_add(measure_flush.end_as_us(), Ordering::Relaxed);
432        }
433        let mut measure_mmap = Measure::start("measure_mmap");
434        let mmap = unsafe { MmapMut::map_mut(&data) }.unwrap_or_else(|err| {
435            panic!(
436                "Unable to mmap file '{}' in current dir ({:?}): {err}",
437                path.as_ref().display(),
438                std::env::current_dir(),
439            );
440        });
441        // Access to the disk bucket files are random (excluding the linear search on collisions),
442        // so advise the kernel to treat the mmaps as such.
443        #[cfg(unix)]
444        mmap.advise(memmap2::Advice::Random).unwrap();
445        measure_mmap.stop();
446        stats
447            .new_file_us
448            .fetch_add(measure_new_file.as_us(), Ordering::Relaxed);
449        stats
450            .mmap_us
451            .fetch_add(measure_mmap.as_us(), Ordering::Relaxed);
452        Some(mmap)
453    }
454
455    /// allocate a new memory mapped file of size `bytes` on one of `drives`
456    fn new_map(drives: &[PathBuf], bytes: u64, stats: &BucketStats) -> (MmapMut, PathBuf, u128) {
457        let r = thread_rng().gen_range(0..drives.len());
458        let drive = &drives[r];
459        let file_random = thread_rng().gen_range(0..u128::MAX);
460        let pos = format!("{file_random}");
461        let file = drive.join(pos);
462        let res = Self::map_open_file(file.clone(), true, bytes, stats).unwrap();
463
464        (res, file, file_random)
465    }
466
467    /// copy contents from 'old_bucket' to 'self'
468    /// This is used by data buckets
469    fn copy_contents(&mut self, old_bucket: &Self) {
470        let mut m = Measure::start("grow");
471        let old_cap = old_bucket.capacity();
472        let old_map = &old_bucket.mmap;
473
474        let increment = self.contents.capacity_pow2() - old_bucket.contents.capacity_pow2();
475        let index_grow = 1 << increment;
476        (0..old_cap as usize).for_each(|i| {
477            if !old_bucket.is_free(i as u64) {
478                self.copying_entry((i * index_grow) as u64, old_bucket, i as u64);
479
480                {
481                    // copying from old to new. If 'occupied' bit is stored outside the data, then
482                    // occupied has to be set on the new entry in the new bucket.
483                    let start = self.get_start_offset_with_header((i * index_grow) as u64);
484                    self.contents
485                        .occupy(&mut self.mmap[start..], i * index_grow);
486                }
487                let old_ix = i * old_bucket.cell_size as usize;
488                let new_ix = old_ix * index_grow;
489                let dst_slice: &[u8] = &self.mmap[new_ix..new_ix + old_bucket.cell_size as usize];
490                let src_slice: &[u8] = &old_map[old_ix..old_ix + old_bucket.cell_size as usize];
491
492                unsafe {
493                    let dst = dst_slice.as_ptr() as *mut _;
494                    let src = src_slice.as_ptr();
495                    std::ptr::copy_nonoverlapping(src, dst, old_bucket.cell_size as usize);
496                };
497            }
498        });
499        m.stop();
500        // resized so update total file size
501        self.stats.resizes.fetch_add(1, Ordering::Relaxed);
502        self.stats.resize_us.fetch_add(m.as_us(), Ordering::Relaxed);
503    }
504
505    pub fn update_max_size(&self) {
506        self.stats.update_max_size(self.capacity());
507    }
508
509    /// allocate a new bucket, copying data from 'bucket'
510    pub fn new_resized(
511        drives: &Arc<Vec<PathBuf>>,
512        max_search: MaxSearch,
513        bucket: Option<&Self>,
514        capacity: Capacity,
515        num_elems: u64,
516        elem_size: u64,
517        stats: &Arc<BucketStats>,
518    ) -> (Self, u128) {
519        let (mut new_bucket, file_name) = Self::new_with_capacity(
520            Arc::clone(drives),
521            num_elems,
522            elem_size,
523            capacity,
524            max_search,
525            Arc::clone(stats),
526            #[allow(clippy::map_clone)] // https://github.com/rust-lang/rust-clippy/issues/12560
527            bucket
528                .map(|bucket| Arc::clone(&bucket.count))
529                .unwrap_or_default(),
530        );
531        if let Some(bucket) = bucket {
532            new_bucket.copy_contents(bucket);
533        }
534        new_bucket.update_max_size();
535        (new_bucket, file_name)
536    }
537
538    /// Return the number of bytes currently allocated
539    pub(crate) fn capacity_bytes(&self) -> u64 {
540        self.capacity() * self.cell_size
541    }
542
543    /// Return the number of cells currently allocated
544    pub fn capacity(&self) -> u64 {
545        self.contents.capacity()
546    }
547}
548
549#[cfg(test)]
550mod test {
551    use {
552        super::*,
553        crate::{
554            bucket_storage::BucketOccupied,
555            index_entry::{BucketWithHeader, IndexBucket},
556        },
557        tempfile::tempdir,
558    };
559
560    #[test]
561    fn test_bucket_storage_index_bucket() {
562        let tmpdir = tempdir().unwrap();
563        let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
564        assert!(!paths.is_empty());
565
566        let drives = Arc::new(paths);
567        let num_elems = 1;
568        let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
569        let max_search = 1;
570        let stats = Arc::default();
571        let count = Arc::default();
572        // this uses `IndexBucket`. `IndexBucket` doesn't change state on `occupy()`
573        let mut storage = BucketStorage::<IndexBucket<u64>>::new(
574            drives, num_elems, elem_size, max_search, stats, count,
575        )
576        .0;
577        let ix = 0;
578        assert!(storage.is_free(ix));
579        assert!(storage.occupy(ix, false).is_ok());
580    }
581
582    #[test]
583    fn test_bucket_storage_using_header() {
584        let tmpdir = tempdir().unwrap();
585        let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
586        assert!(!paths.is_empty());
587
588        let drives = Arc::new(paths);
589        let num_elems = 1;
590        let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
591        let max_search = 1;
592        let stats = Arc::default();
593        let count = Arc::default();
594        let mut storage = BucketStorage::<BucketWithHeader>::new(
595            drives, num_elems, elem_size, max_search, stats, count,
596        )
597        .0;
598        let ix = 0;
599        assert!(storage.is_free(ix));
600        assert!(storage.occupy(ix, false).is_ok());
601        assert!(storage.occupy(ix, false).is_err());
602        assert!(!storage.is_free(ix));
603        storage.free(ix);
604        assert!(storage.is_free(ix));
605        assert!(storage.is_free(ix));
606        assert!(storage.occupy(ix, false).is_ok());
607        assert!(storage.occupy(ix, false).is_err());
608        assert!(!storage.is_free(ix));
609        storage.free(ix);
610        assert!(storage.is_free(ix));
611    }
612
613    #[test]
614    fn test_load_on_restart_failures() {
615        let tmpdir = tempdir().unwrap();
616        let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
617        assert!(!paths.is_empty());
618        let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
619        let max_search = 1;
620        let stats = Arc::new(BucketStats::default());
621        let count = Arc::new(AtomicU64::default());
622        // file doesn't exist
623        assert!(BucketStorage::<IndexBucket<u64>>::load_on_restart(
624            PathBuf::from(tmpdir.path()),
625            NonZeroU64::new(elem_size).unwrap(),
626            max_search,
627            stats.clone(),
628            count.clone(),
629        )
630        .is_none());
631        agave_logger::setup();
632        for len in [0, 1, 47, 48, 49, 4097] {
633            // create a zero len file. That will fail to load since it is too small.
634            let path = tmpdir.path().join("small");
635            let mut file = OpenOptions::new()
636                .read(true)
637                .write(true)
638                .create(true)
639                .truncate(true)
640                .open(path.clone())
641                .unwrap();
642            _ = file.write_all(&vec![1u8; len]);
643            drop(file);
644            assert_eq!(std::fs::metadata(&path).unwrap().len(), len as u64);
645            let result = BucketStorage::<IndexBucket<u64>>::load_on_restart(
646                path,
647                NonZeroU64::new(elem_size).unwrap(),
648                max_search,
649                stats.clone(),
650                count.clone(),
651            );
652            if let Some(result) = result.as_ref() {
653                assert_eq!(result.capacity() as usize, len / elem_size as usize);
654                assert_eq!(
655                    result.capacity_bytes() as usize,
656                    len / elem_size as usize * elem_size as usize
657                );
658            }
659            assert_eq!(result.is_none(), len < elem_size as usize, "{len}");
660        }
661    }
662
663    #[test]
664    fn test_load_on_restart() {
665        for request in [Some(7), None] {
666            let tmpdir = tempdir().unwrap();
667            let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
668            assert!(!paths.is_empty());
669            let drives = Arc::new(paths);
670            let num_elems = 1;
671            let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
672            let max_search = 1;
673            let stats = Arc::new(BucketStats::default());
674            let count = Arc::new(AtomicU64::default());
675            let mut storage = if let Some(actual_elems) = request {
676                BucketStorage::<IndexBucket<u64>>::new_with_capacity(
677                    drives,
678                    num_elems,
679                    elem_size,
680                    Capacity::Actual(actual_elems),
681                    max_search,
682                    stats.clone(),
683                    count.clone(),
684                )
685                .0
686            } else {
687                BucketStorage::<IndexBucket<u64>>::new(
688                    drives,
689                    num_elems,
690                    elem_size,
691                    max_search,
692                    stats.clone(),
693                    count.clone(),
694                )
695                .0
696            };
697            let expected_capacity = storage.capacity();
698            (0..num_elems).for_each(|ix| {
699                assert!(storage.is_free(ix));
700                assert!(storage.occupy(ix, false).is_ok());
701            });
702            storage.delete_file_on_drop = false;
703            let len = storage.mmap.len();
704            (0..expected_capacity as usize).for_each(|i| {
705                storage.mmap[i] = (i % 256) as u8;
706            });
707            // close storage
708            let path = storage.path.clone();
709            drop(storage);
710
711            // re load and remap storage file
712            let storage = BucketStorage::<IndexBucket<u64>>::load_on_restart(
713                path,
714                NonZeroU64::new(elem_size).unwrap(),
715                max_search,
716                stats,
717                count,
718            )
719            .unwrap();
720            assert_eq!(storage.capacity(), expected_capacity);
721            assert_eq!(len, storage.mmap.len());
722            (0..expected_capacity as usize).for_each(|i| {
723                assert_eq!(storage.mmap[i], (i % 256) as u8);
724            });
725            (0..num_elems).for_each(|ix| {
726                // all should be marked as free
727                assert!(storage.is_free(ix));
728            });
729        }
730    }
731
732    #[test]
733    #[should_panic]
734    fn test_header_bad_size() {
735        struct BucketBadHeader;
736        impl BucketCapacity for BucketBadHeader {
737            fn capacity(&self) -> u64 {
738                unimplemented!();
739            }
740        }
741        impl BucketOccupied for BucketBadHeader {
742            fn occupy(&mut self, _element: &mut [u8], _ix: usize) {
743                unimplemented!();
744            }
745            fn free(&mut self, _element: &mut [u8], _ix: usize) {
746                unimplemented!();
747            }
748            fn is_free(&self, _element: &[u8], _ix: usize) -> bool {
749                unimplemented!();
750            }
751            fn offset_to_first_data() -> usize {
752                // not multiple of u64
753                std::mem::size_of::<u64>() - 1
754            }
755            fn new(_num_elements: Capacity) -> Self {
756                Self
757            }
758        }
759
760        // ensure we panic if the header size (i.e. offset to first data) is not aligned to eight bytes
761        BucketStorage::<BucketBadHeader>::new_with_capacity(
762            Arc::default(),
763            0,
764            0,
765            Capacity::Pow2(0),
766            0,
767            Arc::default(),
768            Arc::default(),
769        );
770    }
771}