miraland_bucket_map/
bucket_storage.rs

1use {
2    crate::{bucket_stats::BucketStats, MaxSearch},
3    memmap2::MmapMut,
4    miraland_measure::measure::Measure,
5    rand::{thread_rng, Rng},
6    std::{
7        fs::{remove_file, OpenOptions},
8        io::{Seek, SeekFrom, Write},
9        num::NonZeroU64,
10        path::{Path, PathBuf},
11        sync::{
12            atomic::{AtomicU64, Ordering},
13            Arc,
14        },
15    },
16};
17
18/*
191	2
202	4
213	8
224	16
235	32
246	64
257	128
268	256
279	512
2810	1,024
2911	2,048
3012	4,096
3113	8,192
3214	16,384
3323  8,388,608
3424  16,777,216
35*/
36pub const DEFAULT_CAPACITY_POW2: u8 = 5;
37
38/// keep track of an individual element's occupied vs. free state
39/// every element must either be occupied or free and should never be double occupied or double freed
40/// For parameters below, `element` is used to view/modify header fields or fields within the element data.
41pub trait BucketOccupied: BucketCapacity {
42    /// set entry at `ix` as occupied (as opposed to free)
43    fn occupy(&mut self, element: &mut [u8], ix: usize);
44    /// set entry at `ix` as free
45    fn free(&mut self, element: &mut [u8], ix: usize);
46    /// return true if entry at `ix` is free
47    fn is_free(&self, element: &[u8], ix: usize) -> bool;
48    /// # of bytes prior to first data held in the element.
49    /// This is the header size, if a header exists per element in the data.
50    /// This must be a multiple of sizeof(u64)
51    fn offset_to_first_data() -> usize;
52    /// initialize this struct
53    /// `capacity` is the number of elements allocated in the bucket
54    fn new(capacity: Capacity) -> Self;
55    /// copying entry. Any in-memory (per-bucket) data structures may need to be copied for this `ix_old`.
56    /// no-op by default
57    fn copying_entry(
58        &mut self,
59        _element_new: &mut [u8],
60        _ix_new: usize,
61        _other: &Self,
62        _element_old: &[u8],
63        _ix_old: usize,
64    ) {
65    }
66}
67
68pub trait BucketCapacity {
69    fn capacity(&self) -> u64;
70    fn capacity_pow2(&self) -> u8 {
71        unimplemented!();
72    }
73}
74
75pub struct BucketStorage<O: BucketOccupied> {
76    path: PathBuf,
77    mmap: MmapMut,
78    pub cell_size: u64,
79    pub count: Arc<AtomicU64>,
80    pub stats: Arc<BucketStats>,
81    pub max_search: MaxSearch,
82    pub contents: O,
83    /// true if when this bucket is dropped, the file should be deleted
84    pub delete_file_on_drop: bool,
85}
86
87#[derive(Debug)]
88pub enum BucketStorageError {
89    AlreadyOccupied,
90}
91
92impl<O: BucketOccupied> Drop for BucketStorage<O> {
93    fn drop(&mut self) {
94        if self.delete_file_on_drop {
95            self.delete();
96        }
97    }
98}
99
100#[derive(Debug, Eq, PartialEq, Copy, Clone)]
101pub(crate) enum IncludeHeader {
102    /// caller wants header included
103    Header,
104    /// caller wants header skipped
105    NoHeader,
106}
107
108/// 2 common ways of specifying capacity
109#[derive(Debug, PartialEq, Copy, Clone)]
110pub enum Capacity {
111    /// 1 << Pow2 produces # elements
112    Pow2(u8),
113    /// Actual # elements
114    Actual(u64),
115}
116
117impl BucketCapacity for Capacity {
118    fn capacity(&self) -> u64 {
119        match self {
120            Capacity::Pow2(pow2) => 1 << *pow2,
121            Capacity::Actual(elements) => *elements,
122        }
123    }
124    fn capacity_pow2(&self) -> u8 {
125        match self {
126            Capacity::Pow2(pow2) => *pow2,
127            Capacity::Actual(_elements) => {
128                panic!("illegal to ask for pow2 from random capacity");
129            }
130        }
131    }
132}
133
134impl<O: BucketOccupied> BucketStorage<O> {
135    /// allocate a bucket of at least `capacity` elements.
136    /// if capacity can be random, more may be allocated to fill the last page.
137    pub fn new_with_capacity(
138        drives: Arc<Vec<PathBuf>>,
139        num_elems: u64,
140        elem_size: u64,
141        mut capacity: Capacity,
142        max_search: MaxSearch,
143        stats: Arc<BucketStats>,
144        count: Arc<AtomicU64>,
145    ) -> (Self, u128) {
146        let offset = Self::get_offset_to_first_data();
147        let cell_size = elem_size * num_elems + offset;
148        let bytes = Self::allocate_to_fill_page(&mut capacity, cell_size);
149        let (mmap, path, file_name) = Self::new_map(&drives, bytes, &stats);
150        (
151            Self {
152                path,
153                mmap,
154                cell_size,
155                count,
156                stats,
157                max_search,
158                contents: O::new(capacity),
159                // by default, newly created files will get deleted when dropped
160                delete_file_on_drop: true,
161            },
162            file_name,
163        )
164    }
165
166    fn allocate_to_fill_page(capacity: &mut Capacity, cell_size: u64) -> u64 {
167        let mut bytes = capacity.capacity() * cell_size;
168        if let Capacity::Actual(_) = capacity {
169            // maybe bump up allocation to fit a page size
170            const PAGE_SIZE: u64 = 4 * 1024;
171            let full_page_bytes = bytes / PAGE_SIZE * PAGE_SIZE / cell_size * cell_size;
172            if full_page_bytes < bytes {
173                let bytes_new = ((bytes / PAGE_SIZE) + 1) * PAGE_SIZE / cell_size * cell_size;
174                assert!(bytes_new >= bytes, "allocating less than requested, capacity: {}, bytes: {}, bytes_new: {}, full_page_bytes: {}", capacity.capacity(), bytes, bytes_new, full_page_bytes);
175                assert_eq!(bytes_new % cell_size, 0);
176                bytes = bytes_new;
177                *capacity = Capacity::Actual(bytes / cell_size);
178            }
179        }
180        bytes
181    }
182
183    /// delete the backing file on disk
184    fn delete(&self) {
185        _ = remove_file(&self.path);
186    }
187
188    pub fn max_search(&self) -> u64 {
189        self.max_search as u64
190    }
191
192    pub fn new(
193        drives: Arc<Vec<PathBuf>>,
194        num_elems: u64,
195        elem_size: u64,
196        max_search: MaxSearch,
197        stats: Arc<BucketStats>,
198        count: Arc<AtomicU64>,
199    ) -> (Self, u128) {
200        Self::new_with_capacity(
201            drives,
202            num_elems,
203            elem_size,
204            Capacity::Pow2(DEFAULT_CAPACITY_POW2),
205            max_search,
206            stats,
207            count,
208        )
209    }
210
211    fn get_offset_to_first_data() -> u64 {
212        let offset = O::offset_to_first_data() as u64;
213        let size_of_u64 = std::mem::size_of::<u64>() as u64;
214        assert_eq!(
215            offset / size_of_u64 * size_of_u64,
216            offset,
217            "header size must be a multiple of u64"
218        );
219        offset
220    }
221
222    /// load and mmap the file that is this disk bucket if possible
223    pub(crate) fn load_on_restart(
224        path: PathBuf,
225        elem_size: NonZeroU64,
226        max_search: MaxSearch,
227        stats: Arc<BucketStats>,
228        count: Arc<AtomicU64>,
229    ) -> Option<Self> {
230        let offset = Self::get_offset_to_first_data();
231        let num_elems = std::fs::metadata(&path)
232            .ok()
233            .map(|metadata| metadata.len().saturating_sub(offset) / elem_size)?;
234        if num_elems == 0 {
235            return None;
236        }
237        let mmap = Self::map_open_file(&path, false, 0, &stats)?;
238        Some(Self {
239            path,
240            mmap,
241            cell_size: elem_size.into(),
242            count,
243            stats,
244            max_search,
245            contents: O::new(Capacity::Actual(num_elems)),
246            // since we loaded it, it persisted from last time, so we obviously want to keep it present disk.
247            delete_file_on_drop: false,
248        })
249    }
250
251    pub(crate) fn copying_entry(&mut self, ix_new: u64, other: &Self, ix_old: u64) {
252        let start = self.get_start_offset_with_header(ix_new);
253        let start_old = other.get_start_offset_with_header(ix_old);
254        self.contents.copying_entry(
255            &mut self.mmap[start..],
256            ix_new as usize,
257            &other.contents,
258            &other.mmap[start_old..],
259            ix_old as usize,
260        );
261    }
262
263    /// true if the entry at index 'ix' is free (as opposed to being occupied)
264    pub fn is_free(&self, ix: u64) -> bool {
265        let start = self.get_start_offset_with_header(ix);
266        let entry = &self.mmap[start..];
267        self.contents.is_free(entry, ix as usize)
268    }
269
270    /// try to occupy `ix`. return true if successful
271    pub(crate) fn try_lock(&mut self, ix: u64) -> bool {
272        let start = self.get_start_offset_with_header(ix);
273        let entry = &mut self.mmap[start..];
274        if self.contents.is_free(entry, ix as usize) {
275            self.contents.occupy(entry, ix as usize);
276            true
277        } else {
278            false
279        }
280    }
281
282    /// 'is_resizing' true if caller is resizing the index (so don't increment count)
283    /// 'is_resizing' false if caller is adding an item to the index (so increment count)
284    pub fn occupy(&mut self, ix: u64, is_resizing: bool) -> Result<(), BucketStorageError> {
285        debug_assert!(ix < self.capacity(), "occupy: bad index size");
286        let mut e = Err(BucketStorageError::AlreadyOccupied);
287        //debug!("ALLOC {} {}", ix, uid);
288        if self.try_lock(ix) {
289            e = Ok(());
290            if !is_resizing {
291                self.count.fetch_add(1, Ordering::Relaxed);
292            }
293        }
294        e
295    }
296
297    pub fn free(&mut self, ix: u64) {
298        debug_assert!(ix < self.capacity(), "bad index size");
299        let start = self.get_start_offset_with_header(ix);
300        self.contents.free(&mut self.mmap[start..], ix as usize);
301        self.count.fetch_sub(1, Ordering::Relaxed);
302    }
303
304    fn get_start_offset_with_header(&self, ix: u64) -> usize {
305        debug_assert!(ix < self.capacity(), "bad index size");
306        (self.cell_size * ix) as usize
307    }
308
309    fn get_start_offset(&self, ix: u64, header: IncludeHeader) -> usize {
310        self.get_start_offset_with_header(ix)
311            + match header {
312                IncludeHeader::Header => 0,
313                IncludeHeader::NoHeader => O::offset_to_first_data(),
314            }
315    }
316
317    pub(crate) fn get_header<T>(&self, ix: u64) -> &T {
318        let slice = self.get_slice::<T>(ix, 1, IncludeHeader::Header);
319        // SAFETY: `get_cell_slice` ensures there's at least one element in the slice
320        unsafe { slice.get_unchecked(0) }
321    }
322
323    pub(crate) fn get_header_mut<T>(&mut self, ix: u64) -> &mut T {
324        let slice = self.get_slice_mut::<T>(ix, 1, IncludeHeader::Header);
325        // SAFETY: `get_mut_cell_slice` ensures there's at least one element in the slice
326        unsafe { slice.get_unchecked_mut(0) }
327    }
328
329    pub(crate) fn get<T>(&self, ix: u64) -> &T {
330        let slice = self.get_slice::<T>(ix, 1, IncludeHeader::NoHeader);
331        // SAFETY: `get_cell_slice` ensures there's at least one element in the slice
332        unsafe { slice.get_unchecked(0) }
333    }
334
335    pub(crate) fn get_mut<T>(&mut self, ix: u64) -> &mut T {
336        let slice = self.get_slice_mut::<T>(ix, 1, IncludeHeader::NoHeader);
337        // SAFETY: `get_mut_cell_slice` ensures there's at least one element in the slice
338        unsafe { slice.get_unchecked_mut(0) }
339    }
340
341    pub(crate) fn get_slice<T>(&self, ix: u64, len: u64, header: IncludeHeader) -> &[T] {
342        // If the caller is including the header, then `len` *must* be 1
343        debug_assert!(
344            (header == IncludeHeader::NoHeader) || (header == IncludeHeader::Header && len == 1)
345        );
346        let start = self.get_start_offset(ix, header);
347        let slice = {
348            let size = std::mem::size_of::<T>() * len as usize;
349            let slice = &self.mmap[start..];
350            debug_assert!(slice.len() >= size);
351            &slice[..size]
352        };
353        let ptr = {
354            let ptr = slice.as_ptr() as *const T;
355            debug_assert!(ptr as usize % std::mem::align_of::<T>() == 0);
356            ptr
357        };
358        unsafe { std::slice::from_raw_parts(ptr, len as usize) }
359    }
360
361    pub(crate) fn get_slice_mut<T>(
362        &mut self,
363        ix: u64,
364        len: u64,
365        header: IncludeHeader,
366    ) -> &mut [T] {
367        // If the caller is including the header, then `len` *must* be 1
368        debug_assert!(
369            (header == IncludeHeader::NoHeader) || (header == IncludeHeader::Header && len == 1)
370        );
371        let start = self.get_start_offset(ix, header);
372        let slice = {
373            let size = std::mem::size_of::<T>() * len as usize;
374            let slice = &mut self.mmap[start..];
375            debug_assert!(slice.len() >= size);
376            &mut slice[..size]
377        };
378        let ptr = {
379            let ptr = slice.as_mut_ptr() as *mut T;
380            debug_assert!(ptr as usize % std::mem::align_of::<T>() == 0);
381            ptr
382        };
383        unsafe { std::slice::from_raw_parts_mut(ptr, len as usize) }
384    }
385
386    /// open a disk bucket file and mmap it
387    /// optionally creates it.
388    fn map_open_file(
389        path: impl AsRef<Path> + std::fmt::Debug + Clone,
390        create: bool,
391        create_bytes: u64,
392        stats: &BucketStats,
393    ) -> Option<MmapMut> {
394        let mut measure_new_file = Measure::start("measure_new_file");
395        let data = OpenOptions::new()
396            .read(true)
397            .write(true)
398            .create(create)
399            .open(path.clone());
400        if let Err(e) = data {
401            if !create {
402                // we can't load this file, so bail without error
403                return None;
404            }
405            panic!(
406                "Unable to create data file {:?} in current dir({:?}): {:?}",
407                path,
408                std::env::current_dir(),
409                e
410            );
411        }
412        let mut data = data.unwrap();
413
414        if create {
415            // Theoretical performance optimization: write a zero to the end of
416            // the file so that we won't have to resize it later, which may be
417            // expensive.
418            //debug!("GROWING file {}", capacity * cell_size as u64);
419            data.seek(SeekFrom::Start(create_bytes - 1)).unwrap();
420            data.write_all(&[0]).unwrap();
421            data.rewind().unwrap();
422            measure_new_file.stop();
423            let measure_flush = Measure::start("measure_flush");
424            data.flush().unwrap(); // can we skip this?
425            stats
426                .flush_file_us
427                .fetch_add(measure_flush.end_as_us(), Ordering::Relaxed);
428        }
429        let mut measure_mmap = Measure::start("measure_mmap");
430        let res = unsafe { MmapMut::map_mut(&data) };
431        if let Err(e) = res {
432            panic!(
433                "Unable to mmap file {:?} in current dir({:?}): {:?}",
434                path,
435                std::env::current_dir(),
436                e
437            );
438        }
439        measure_mmap.stop();
440        stats
441            .new_file_us
442            .fetch_add(measure_new_file.as_us(), Ordering::Relaxed);
443        stats
444            .mmap_us
445            .fetch_add(measure_mmap.as_us(), Ordering::Relaxed);
446        res.ok()
447    }
448
449    /// allocate a new memory mapped file of size `bytes` on one of `drives`
450    fn new_map(drives: &[PathBuf], bytes: u64, stats: &BucketStats) -> (MmapMut, PathBuf, u128) {
451        let r = thread_rng().gen_range(0..drives.len());
452        let drive = &drives[r];
453        let file_random = thread_rng().gen_range(0..u128::MAX);
454        let pos = format!("{}", file_random,);
455        let file = drive.join(pos);
456        let res = Self::map_open_file(file.clone(), true, bytes, stats).unwrap();
457
458        (res, file, file_random)
459    }
460
461    /// copy contents from 'old_bucket' to 'self'
462    /// This is used by data buckets
463    fn copy_contents(&mut self, old_bucket: &Self) {
464        let mut m = Measure::start("grow");
465        let old_cap = old_bucket.capacity();
466        let old_map = &old_bucket.mmap;
467
468        let increment = self.contents.capacity_pow2() - old_bucket.contents.capacity_pow2();
469        let index_grow = 1 << increment;
470        (0..old_cap as usize).for_each(|i| {
471            if !old_bucket.is_free(i as u64) {
472                self.copying_entry((i * index_grow) as u64, old_bucket, i as u64);
473
474                {
475                    // copying from old to new. If 'occupied' bit is stored outside the data, then
476                    // occupied has to be set on the new entry in the new bucket.
477                    let start = self.get_start_offset_with_header((i * index_grow) as u64);
478                    self.contents
479                        .occupy(&mut self.mmap[start..], i * index_grow);
480                }
481                let old_ix = i * old_bucket.cell_size as usize;
482                let new_ix = old_ix * index_grow;
483                let dst_slice: &[u8] = &self.mmap[new_ix..new_ix + old_bucket.cell_size as usize];
484                let src_slice: &[u8] = &old_map[old_ix..old_ix + old_bucket.cell_size as usize];
485
486                unsafe {
487                    let dst = dst_slice.as_ptr() as *mut u8;
488                    let src = src_slice.as_ptr();
489                    std::ptr::copy_nonoverlapping(src, dst, old_bucket.cell_size as usize);
490                };
491            }
492        });
493        m.stop();
494        // resized so update total file size
495        self.stats.resizes.fetch_add(1, Ordering::Relaxed);
496        self.stats.resize_us.fetch_add(m.as_us(), Ordering::Relaxed);
497    }
498
499    pub fn update_max_size(&self) {
500        self.stats.update_max_size(self.capacity());
501    }
502
503    /// allocate a new bucket, copying data from 'bucket'
504    pub fn new_resized(
505        drives: &Arc<Vec<PathBuf>>,
506        max_search: MaxSearch,
507        bucket: Option<&Self>,
508        capacity: Capacity,
509        num_elems: u64,
510        elem_size: u64,
511        stats: &Arc<BucketStats>,
512    ) -> (Self, u128) {
513        let (mut new_bucket, file_name) = Self::new_with_capacity(
514            Arc::clone(drives),
515            num_elems,
516            elem_size,
517            capacity,
518            max_search,
519            Arc::clone(stats),
520            bucket
521                .map(|bucket| Arc::clone(&bucket.count))
522                .unwrap_or_default(),
523        );
524        if let Some(bucket) = bucket {
525            new_bucket.copy_contents(bucket);
526        }
527        new_bucket.update_max_size();
528        (new_bucket, file_name)
529    }
530
531    /// Return the number of bytes currently allocated
532    pub(crate) fn capacity_bytes(&self) -> u64 {
533        self.capacity() * self.cell_size
534    }
535
536    /// Return the number of cells currently allocated
537    pub fn capacity(&self) -> u64 {
538        self.contents.capacity()
539    }
540}
541
542#[cfg(test)]
543mod test {
544    use {
545        super::*,
546        crate::{bucket_storage::BucketOccupied, index_entry::IndexBucket},
547        tempfile::tempdir,
548    };
549
550    #[test]
551    fn test_bucket_storage() {
552        let tmpdir = tempdir().unwrap();
553        let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
554        assert!(!paths.is_empty());
555
556        let drives = Arc::new(paths);
557        let num_elems = 1;
558        let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
559        let max_search = 1;
560        let stats = Arc::default();
561        let count = Arc::default();
562        let mut storage = BucketStorage::<IndexBucket<u64>>::new(
563            drives, num_elems, elem_size, max_search, stats, count,
564        )
565        .0;
566        let ix = 0;
567        assert!(storage.is_free(ix));
568        assert!(storage.occupy(ix, false).is_ok());
569        assert!(storage.occupy(ix, false).is_err());
570        assert!(!storage.is_free(ix));
571        storage.free(ix);
572        assert!(storage.is_free(ix));
573        assert!(storage.is_free(ix));
574        assert!(storage.occupy(ix, false).is_ok());
575        assert!(storage.occupy(ix, false).is_err());
576        assert!(!storage.is_free(ix));
577        storage.free(ix);
578        assert!(storage.is_free(ix));
579    }
580
581    #[test]
582    fn test_load_on_restart_failures() {
583        let tmpdir = tempdir().unwrap();
584        let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
585        assert!(!paths.is_empty());
586        let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
587        let max_search = 1;
588        let stats = Arc::new(BucketStats::default());
589        let count = Arc::new(AtomicU64::default());
590        // file doesn't exist
591        assert!(BucketStorage::<IndexBucket<u64>>::load_on_restart(
592            PathBuf::from(tmpdir.path()),
593            NonZeroU64::new(elem_size).unwrap(),
594            max_search,
595            stats.clone(),
596            count.clone(),
597        )
598        .is_none());
599        miraland_logger::setup();
600        for len in [0, 1, 47, 48, 49, 4097] {
601            // create a zero len file. That will fail to load since it is too small.
602            let path = tmpdir.path().join("small");
603            let mut file = OpenOptions::new()
604                .read(true)
605                .write(true)
606                .create(true)
607                .open(path.clone())
608                .unwrap();
609            _ = file.write_all(&vec![1u8; len]);
610            drop(file);
611            assert_eq!(std::fs::metadata(&path).unwrap().len(), len as u64);
612            let result = BucketStorage::<IndexBucket<u64>>::load_on_restart(
613                path,
614                NonZeroU64::new(elem_size).unwrap(),
615                max_search,
616                stats.clone(),
617                count.clone(),
618            );
619            if let Some(result) = result.as_ref() {
620                assert_eq!(result.capacity() as usize, len / elem_size as usize);
621                assert_eq!(
622                    result.capacity_bytes() as usize,
623                    len / elem_size as usize * elem_size as usize
624                );
625            }
626            assert_eq!(result.is_none(), len < elem_size as usize, "{len}");
627        }
628    }
629
630    #[test]
631    fn test_load_on_restart() {
632        for request in [Some(7), None] {
633            let tmpdir = tempdir().unwrap();
634            let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
635            assert!(!paths.is_empty());
636            let drives = Arc::new(paths);
637            let num_elems = 1;
638            let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
639            let max_search = 1;
640            let stats = Arc::new(BucketStats::default());
641            let count = Arc::new(AtomicU64::default());
642            let mut storage = if let Some(actual_elems) = request {
643                BucketStorage::<IndexBucket<u64>>::new_with_capacity(
644                    drives,
645                    num_elems,
646                    elem_size,
647                    Capacity::Actual(actual_elems),
648                    max_search,
649                    stats.clone(),
650                    count.clone(),
651                )
652                .0
653            } else {
654                BucketStorage::<IndexBucket<u64>>::new(
655                    drives,
656                    num_elems,
657                    elem_size,
658                    max_search,
659                    stats.clone(),
660                    count.clone(),
661                )
662                .0
663            };
664            let expected_capacity = storage.capacity();
665            (0..num_elems).for_each(|ix| {
666                assert!(storage.is_free(ix));
667                assert!(storage.occupy(ix, false).is_ok());
668            });
669            storage.delete_file_on_drop = false;
670            let len = storage.mmap.len();
671            (0..expected_capacity as usize).for_each(|i| {
672                storage.mmap[i] = (i % 256) as u8;
673            });
674            // close storage
675            let path = storage.path.clone();
676            drop(storage);
677
678            // re load and remap storage file
679            let storage = BucketStorage::<IndexBucket<u64>>::load_on_restart(
680                path,
681                NonZeroU64::new(elem_size).unwrap(),
682                max_search,
683                stats,
684                count,
685            )
686            .unwrap();
687            assert_eq!(storage.capacity(), expected_capacity);
688            assert_eq!(len, storage.mmap.len());
689            (0..expected_capacity as usize).for_each(|i| {
690                assert_eq!(storage.mmap[i], (i % 256) as u8);
691            });
692            (0..num_elems).for_each(|ix| {
693                // all should be marked as free
694                assert!(storage.is_free(ix));
695            });
696        }
697    }
698
699    #[test]
700    #[should_panic]
701    fn test_header_bad_size() {
702        struct BucketBadHeader;
703        impl BucketCapacity for BucketBadHeader {
704            fn capacity(&self) -> u64 {
705                unimplemented!();
706            }
707        }
708        impl BucketOccupied for BucketBadHeader {
709            fn occupy(&mut self, _element: &mut [u8], _ix: usize) {
710                unimplemented!();
711            }
712            fn free(&mut self, _element: &mut [u8], _ix: usize) {
713                unimplemented!();
714            }
715            fn is_free(&self, _element: &[u8], _ix: usize) -> bool {
716                unimplemented!();
717            }
718            fn offset_to_first_data() -> usize {
719                // not multiple of u64
720                std::mem::size_of::<u64>() - 1
721            }
722            fn new(_num_elements: Capacity) -> Self {
723                Self
724            }
725        }
726
727        // ensure we panic if the header size (i.e. offset to first data) is not aligned to eight bytes
728        BucketStorage::<BucketBadHeader>::new_with_capacity(
729            Arc::default(),
730            0,
731            0,
732            Capacity::Pow2(0),
733            0,
734            Arc::default(),
735            Arc::default(),
736        );
737    }
738}