1use {
2 crate::{bucket_stats::BucketStats, MaxSearch},
3 memmap2::MmapMut,
4 miraland_measure::measure::Measure,
5 rand::{thread_rng, Rng},
6 std::{
7 fs::{remove_file, OpenOptions},
8 io::{Seek, SeekFrom, Write},
9 num::NonZeroU64,
10 path::{Path, PathBuf},
11 sync::{
12 atomic::{AtomicU64, Ordering},
13 Arc,
14 },
15 },
16};
17
18pub const DEFAULT_CAPACITY_POW2: u8 = 5;
37
38pub trait BucketOccupied: BucketCapacity {
42 fn occupy(&mut self, element: &mut [u8], ix: usize);
44 fn free(&mut self, element: &mut [u8], ix: usize);
46 fn is_free(&self, element: &[u8], ix: usize) -> bool;
48 fn offset_to_first_data() -> usize;
52 fn new(capacity: Capacity) -> Self;
55 fn copying_entry(
58 &mut self,
59 _element_new: &mut [u8],
60 _ix_new: usize,
61 _other: &Self,
62 _element_old: &[u8],
63 _ix_old: usize,
64 ) {
65 }
66}
67
68pub trait BucketCapacity {
69 fn capacity(&self) -> u64;
70 fn capacity_pow2(&self) -> u8 {
71 unimplemented!();
72 }
73}
74
75pub struct BucketStorage<O: BucketOccupied> {
76 path: PathBuf,
77 mmap: MmapMut,
78 pub cell_size: u64,
79 pub count: Arc<AtomicU64>,
80 pub stats: Arc<BucketStats>,
81 pub max_search: MaxSearch,
82 pub contents: O,
83 pub delete_file_on_drop: bool,
85}
86
87#[derive(Debug)]
88pub enum BucketStorageError {
89 AlreadyOccupied,
90}
91
92impl<O: BucketOccupied> Drop for BucketStorage<O> {
93 fn drop(&mut self) {
94 if self.delete_file_on_drop {
95 self.delete();
96 }
97 }
98}
99
100#[derive(Debug, Eq, PartialEq, Copy, Clone)]
101pub(crate) enum IncludeHeader {
102 Header,
104 NoHeader,
106}
107
108#[derive(Debug, PartialEq, Copy, Clone)]
110pub enum Capacity {
111 Pow2(u8),
113 Actual(u64),
115}
116
117impl BucketCapacity for Capacity {
118 fn capacity(&self) -> u64 {
119 match self {
120 Capacity::Pow2(pow2) => 1 << *pow2,
121 Capacity::Actual(elements) => *elements,
122 }
123 }
124 fn capacity_pow2(&self) -> u8 {
125 match self {
126 Capacity::Pow2(pow2) => *pow2,
127 Capacity::Actual(_elements) => {
128 panic!("illegal to ask for pow2 from random capacity");
129 }
130 }
131 }
132}
133
134impl<O: BucketOccupied> BucketStorage<O> {
135 pub fn new_with_capacity(
138 drives: Arc<Vec<PathBuf>>,
139 num_elems: u64,
140 elem_size: u64,
141 mut capacity: Capacity,
142 max_search: MaxSearch,
143 stats: Arc<BucketStats>,
144 count: Arc<AtomicU64>,
145 ) -> (Self, u128) {
146 let offset = Self::get_offset_to_first_data();
147 let cell_size = elem_size * num_elems + offset;
148 let bytes = Self::allocate_to_fill_page(&mut capacity, cell_size);
149 let (mmap, path, file_name) = Self::new_map(&drives, bytes, &stats);
150 (
151 Self {
152 path,
153 mmap,
154 cell_size,
155 count,
156 stats,
157 max_search,
158 contents: O::new(capacity),
159 delete_file_on_drop: true,
161 },
162 file_name,
163 )
164 }
165
166 fn allocate_to_fill_page(capacity: &mut Capacity, cell_size: u64) -> u64 {
167 let mut bytes = capacity.capacity() * cell_size;
168 if let Capacity::Actual(_) = capacity {
169 const PAGE_SIZE: u64 = 4 * 1024;
171 let full_page_bytes = bytes / PAGE_SIZE * PAGE_SIZE / cell_size * cell_size;
172 if full_page_bytes < bytes {
173 let bytes_new = ((bytes / PAGE_SIZE) + 1) * PAGE_SIZE / cell_size * cell_size;
174 assert!(bytes_new >= bytes, "allocating less than requested, capacity: {}, bytes: {}, bytes_new: {}, full_page_bytes: {}", capacity.capacity(), bytes, bytes_new, full_page_bytes);
175 assert_eq!(bytes_new % cell_size, 0);
176 bytes = bytes_new;
177 *capacity = Capacity::Actual(bytes / cell_size);
178 }
179 }
180 bytes
181 }
182
183 fn delete(&self) {
185 _ = remove_file(&self.path);
186 }
187
188 pub fn max_search(&self) -> u64 {
189 self.max_search as u64
190 }
191
192 pub fn new(
193 drives: Arc<Vec<PathBuf>>,
194 num_elems: u64,
195 elem_size: u64,
196 max_search: MaxSearch,
197 stats: Arc<BucketStats>,
198 count: Arc<AtomicU64>,
199 ) -> (Self, u128) {
200 Self::new_with_capacity(
201 drives,
202 num_elems,
203 elem_size,
204 Capacity::Pow2(DEFAULT_CAPACITY_POW2),
205 max_search,
206 stats,
207 count,
208 )
209 }
210
211 fn get_offset_to_first_data() -> u64 {
212 let offset = O::offset_to_first_data() as u64;
213 let size_of_u64 = std::mem::size_of::<u64>() as u64;
214 assert_eq!(
215 offset / size_of_u64 * size_of_u64,
216 offset,
217 "header size must be a multiple of u64"
218 );
219 offset
220 }
221
222 pub(crate) fn load_on_restart(
224 path: PathBuf,
225 elem_size: NonZeroU64,
226 max_search: MaxSearch,
227 stats: Arc<BucketStats>,
228 count: Arc<AtomicU64>,
229 ) -> Option<Self> {
230 let offset = Self::get_offset_to_first_data();
231 let num_elems = std::fs::metadata(&path)
232 .ok()
233 .map(|metadata| metadata.len().saturating_sub(offset) / elem_size)?;
234 if num_elems == 0 {
235 return None;
236 }
237 let mmap = Self::map_open_file(&path, false, 0, &stats)?;
238 Some(Self {
239 path,
240 mmap,
241 cell_size: elem_size.into(),
242 count,
243 stats,
244 max_search,
245 contents: O::new(Capacity::Actual(num_elems)),
246 delete_file_on_drop: false,
248 })
249 }
250
251 pub(crate) fn copying_entry(&mut self, ix_new: u64, other: &Self, ix_old: u64) {
252 let start = self.get_start_offset_with_header(ix_new);
253 let start_old = other.get_start_offset_with_header(ix_old);
254 self.contents.copying_entry(
255 &mut self.mmap[start..],
256 ix_new as usize,
257 &other.contents,
258 &other.mmap[start_old..],
259 ix_old as usize,
260 );
261 }
262
263 pub fn is_free(&self, ix: u64) -> bool {
265 let start = self.get_start_offset_with_header(ix);
266 let entry = &self.mmap[start..];
267 self.contents.is_free(entry, ix as usize)
268 }
269
270 pub(crate) fn try_lock(&mut self, ix: u64) -> bool {
272 let start = self.get_start_offset_with_header(ix);
273 let entry = &mut self.mmap[start..];
274 if self.contents.is_free(entry, ix as usize) {
275 self.contents.occupy(entry, ix as usize);
276 true
277 } else {
278 false
279 }
280 }
281
282 pub fn occupy(&mut self, ix: u64, is_resizing: bool) -> Result<(), BucketStorageError> {
285 debug_assert!(ix < self.capacity(), "occupy: bad index size");
286 let mut e = Err(BucketStorageError::AlreadyOccupied);
287 if self.try_lock(ix) {
289 e = Ok(());
290 if !is_resizing {
291 self.count.fetch_add(1, Ordering::Relaxed);
292 }
293 }
294 e
295 }
296
297 pub fn free(&mut self, ix: u64) {
298 debug_assert!(ix < self.capacity(), "bad index size");
299 let start = self.get_start_offset_with_header(ix);
300 self.contents.free(&mut self.mmap[start..], ix as usize);
301 self.count.fetch_sub(1, Ordering::Relaxed);
302 }
303
304 fn get_start_offset_with_header(&self, ix: u64) -> usize {
305 debug_assert!(ix < self.capacity(), "bad index size");
306 (self.cell_size * ix) as usize
307 }
308
309 fn get_start_offset(&self, ix: u64, header: IncludeHeader) -> usize {
310 self.get_start_offset_with_header(ix)
311 + match header {
312 IncludeHeader::Header => 0,
313 IncludeHeader::NoHeader => O::offset_to_first_data(),
314 }
315 }
316
317 pub(crate) fn get_header<T>(&self, ix: u64) -> &T {
318 let slice = self.get_slice::<T>(ix, 1, IncludeHeader::Header);
319 unsafe { slice.get_unchecked(0) }
321 }
322
323 pub(crate) fn get_header_mut<T>(&mut self, ix: u64) -> &mut T {
324 let slice = self.get_slice_mut::<T>(ix, 1, IncludeHeader::Header);
325 unsafe { slice.get_unchecked_mut(0) }
327 }
328
329 pub(crate) fn get<T>(&self, ix: u64) -> &T {
330 let slice = self.get_slice::<T>(ix, 1, IncludeHeader::NoHeader);
331 unsafe { slice.get_unchecked(0) }
333 }
334
335 pub(crate) fn get_mut<T>(&mut self, ix: u64) -> &mut T {
336 let slice = self.get_slice_mut::<T>(ix, 1, IncludeHeader::NoHeader);
337 unsafe { slice.get_unchecked_mut(0) }
339 }
340
341 pub(crate) fn get_slice<T>(&self, ix: u64, len: u64, header: IncludeHeader) -> &[T] {
342 debug_assert!(
344 (header == IncludeHeader::NoHeader) || (header == IncludeHeader::Header && len == 1)
345 );
346 let start = self.get_start_offset(ix, header);
347 let slice = {
348 let size = std::mem::size_of::<T>() * len as usize;
349 let slice = &self.mmap[start..];
350 debug_assert!(slice.len() >= size);
351 &slice[..size]
352 };
353 let ptr = {
354 let ptr = slice.as_ptr() as *const T;
355 debug_assert!(ptr as usize % std::mem::align_of::<T>() == 0);
356 ptr
357 };
358 unsafe { std::slice::from_raw_parts(ptr, len as usize) }
359 }
360
361 pub(crate) fn get_slice_mut<T>(
362 &mut self,
363 ix: u64,
364 len: u64,
365 header: IncludeHeader,
366 ) -> &mut [T] {
367 debug_assert!(
369 (header == IncludeHeader::NoHeader) || (header == IncludeHeader::Header && len == 1)
370 );
371 let start = self.get_start_offset(ix, header);
372 let slice = {
373 let size = std::mem::size_of::<T>() * len as usize;
374 let slice = &mut self.mmap[start..];
375 debug_assert!(slice.len() >= size);
376 &mut slice[..size]
377 };
378 let ptr = {
379 let ptr = slice.as_mut_ptr() as *mut T;
380 debug_assert!(ptr as usize % std::mem::align_of::<T>() == 0);
381 ptr
382 };
383 unsafe { std::slice::from_raw_parts_mut(ptr, len as usize) }
384 }
385
386 fn map_open_file(
389 path: impl AsRef<Path> + std::fmt::Debug + Clone,
390 create: bool,
391 create_bytes: u64,
392 stats: &BucketStats,
393 ) -> Option<MmapMut> {
394 let mut measure_new_file = Measure::start("measure_new_file");
395 let data = OpenOptions::new()
396 .read(true)
397 .write(true)
398 .create(create)
399 .open(path.clone());
400 if let Err(e) = data {
401 if !create {
402 return None;
404 }
405 panic!(
406 "Unable to create data file {:?} in current dir({:?}): {:?}",
407 path,
408 std::env::current_dir(),
409 e
410 );
411 }
412 let mut data = data.unwrap();
413
414 if create {
415 data.seek(SeekFrom::Start(create_bytes - 1)).unwrap();
420 data.write_all(&[0]).unwrap();
421 data.rewind().unwrap();
422 measure_new_file.stop();
423 let measure_flush = Measure::start("measure_flush");
424 data.flush().unwrap(); stats
426 .flush_file_us
427 .fetch_add(measure_flush.end_as_us(), Ordering::Relaxed);
428 }
429 let mut measure_mmap = Measure::start("measure_mmap");
430 let res = unsafe { MmapMut::map_mut(&data) };
431 if let Err(e) = res {
432 panic!(
433 "Unable to mmap file {:?} in current dir({:?}): {:?}",
434 path,
435 std::env::current_dir(),
436 e
437 );
438 }
439 measure_mmap.stop();
440 stats
441 .new_file_us
442 .fetch_add(measure_new_file.as_us(), Ordering::Relaxed);
443 stats
444 .mmap_us
445 .fetch_add(measure_mmap.as_us(), Ordering::Relaxed);
446 res.ok()
447 }
448
449 fn new_map(drives: &[PathBuf], bytes: u64, stats: &BucketStats) -> (MmapMut, PathBuf, u128) {
451 let r = thread_rng().gen_range(0..drives.len());
452 let drive = &drives[r];
453 let file_random = thread_rng().gen_range(0..u128::MAX);
454 let pos = format!("{}", file_random,);
455 let file = drive.join(pos);
456 let res = Self::map_open_file(file.clone(), true, bytes, stats).unwrap();
457
458 (res, file, file_random)
459 }
460
461 fn copy_contents(&mut self, old_bucket: &Self) {
464 let mut m = Measure::start("grow");
465 let old_cap = old_bucket.capacity();
466 let old_map = &old_bucket.mmap;
467
468 let increment = self.contents.capacity_pow2() - old_bucket.contents.capacity_pow2();
469 let index_grow = 1 << increment;
470 (0..old_cap as usize).for_each(|i| {
471 if !old_bucket.is_free(i as u64) {
472 self.copying_entry((i * index_grow) as u64, old_bucket, i as u64);
473
474 {
475 let start = self.get_start_offset_with_header((i * index_grow) as u64);
478 self.contents
479 .occupy(&mut self.mmap[start..], i * index_grow);
480 }
481 let old_ix = i * old_bucket.cell_size as usize;
482 let new_ix = old_ix * index_grow;
483 let dst_slice: &[u8] = &self.mmap[new_ix..new_ix + old_bucket.cell_size as usize];
484 let src_slice: &[u8] = &old_map[old_ix..old_ix + old_bucket.cell_size as usize];
485
486 unsafe {
487 let dst = dst_slice.as_ptr() as *mut u8;
488 let src = src_slice.as_ptr();
489 std::ptr::copy_nonoverlapping(src, dst, old_bucket.cell_size as usize);
490 };
491 }
492 });
493 m.stop();
494 self.stats.resizes.fetch_add(1, Ordering::Relaxed);
496 self.stats.resize_us.fetch_add(m.as_us(), Ordering::Relaxed);
497 }
498
499 pub fn update_max_size(&self) {
500 self.stats.update_max_size(self.capacity());
501 }
502
503 pub fn new_resized(
505 drives: &Arc<Vec<PathBuf>>,
506 max_search: MaxSearch,
507 bucket: Option<&Self>,
508 capacity: Capacity,
509 num_elems: u64,
510 elem_size: u64,
511 stats: &Arc<BucketStats>,
512 ) -> (Self, u128) {
513 let (mut new_bucket, file_name) = Self::new_with_capacity(
514 Arc::clone(drives),
515 num_elems,
516 elem_size,
517 capacity,
518 max_search,
519 Arc::clone(stats),
520 bucket
521 .map(|bucket| Arc::clone(&bucket.count))
522 .unwrap_or_default(),
523 );
524 if let Some(bucket) = bucket {
525 new_bucket.copy_contents(bucket);
526 }
527 new_bucket.update_max_size();
528 (new_bucket, file_name)
529 }
530
531 pub(crate) fn capacity_bytes(&self) -> u64 {
533 self.capacity() * self.cell_size
534 }
535
536 pub fn capacity(&self) -> u64 {
538 self.contents.capacity()
539 }
540}
541
542#[cfg(test)]
543mod test {
544 use {
545 super::*,
546 crate::{bucket_storage::BucketOccupied, index_entry::IndexBucket},
547 tempfile::tempdir,
548 };
549
550 #[test]
551 fn test_bucket_storage() {
552 let tmpdir = tempdir().unwrap();
553 let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
554 assert!(!paths.is_empty());
555
556 let drives = Arc::new(paths);
557 let num_elems = 1;
558 let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
559 let max_search = 1;
560 let stats = Arc::default();
561 let count = Arc::default();
562 let mut storage = BucketStorage::<IndexBucket<u64>>::new(
563 drives, num_elems, elem_size, max_search, stats, count,
564 )
565 .0;
566 let ix = 0;
567 assert!(storage.is_free(ix));
568 assert!(storage.occupy(ix, false).is_ok());
569 assert!(storage.occupy(ix, false).is_err());
570 assert!(!storage.is_free(ix));
571 storage.free(ix);
572 assert!(storage.is_free(ix));
573 assert!(storage.is_free(ix));
574 assert!(storage.occupy(ix, false).is_ok());
575 assert!(storage.occupy(ix, false).is_err());
576 assert!(!storage.is_free(ix));
577 storage.free(ix);
578 assert!(storage.is_free(ix));
579 }
580
581 #[test]
582 fn test_load_on_restart_failures() {
583 let tmpdir = tempdir().unwrap();
584 let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
585 assert!(!paths.is_empty());
586 let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
587 let max_search = 1;
588 let stats = Arc::new(BucketStats::default());
589 let count = Arc::new(AtomicU64::default());
590 assert!(BucketStorage::<IndexBucket<u64>>::load_on_restart(
592 PathBuf::from(tmpdir.path()),
593 NonZeroU64::new(elem_size).unwrap(),
594 max_search,
595 stats.clone(),
596 count.clone(),
597 )
598 .is_none());
599 miraland_logger::setup();
600 for len in [0, 1, 47, 48, 49, 4097] {
601 let path = tmpdir.path().join("small");
603 let mut file = OpenOptions::new()
604 .read(true)
605 .write(true)
606 .create(true)
607 .open(path.clone())
608 .unwrap();
609 _ = file.write_all(&vec![1u8; len]);
610 drop(file);
611 assert_eq!(std::fs::metadata(&path).unwrap().len(), len as u64);
612 let result = BucketStorage::<IndexBucket<u64>>::load_on_restart(
613 path,
614 NonZeroU64::new(elem_size).unwrap(),
615 max_search,
616 stats.clone(),
617 count.clone(),
618 );
619 if let Some(result) = result.as_ref() {
620 assert_eq!(result.capacity() as usize, len / elem_size as usize);
621 assert_eq!(
622 result.capacity_bytes() as usize,
623 len / elem_size as usize * elem_size as usize
624 );
625 }
626 assert_eq!(result.is_none(), len < elem_size as usize, "{len}");
627 }
628 }
629
630 #[test]
631 fn test_load_on_restart() {
632 for request in [Some(7), None] {
633 let tmpdir = tempdir().unwrap();
634 let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
635 assert!(!paths.is_empty());
636 let drives = Arc::new(paths);
637 let num_elems = 1;
638 let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
639 let max_search = 1;
640 let stats = Arc::new(BucketStats::default());
641 let count = Arc::new(AtomicU64::default());
642 let mut storage = if let Some(actual_elems) = request {
643 BucketStorage::<IndexBucket<u64>>::new_with_capacity(
644 drives,
645 num_elems,
646 elem_size,
647 Capacity::Actual(actual_elems),
648 max_search,
649 stats.clone(),
650 count.clone(),
651 )
652 .0
653 } else {
654 BucketStorage::<IndexBucket<u64>>::new(
655 drives,
656 num_elems,
657 elem_size,
658 max_search,
659 stats.clone(),
660 count.clone(),
661 )
662 .0
663 };
664 let expected_capacity = storage.capacity();
665 (0..num_elems).for_each(|ix| {
666 assert!(storage.is_free(ix));
667 assert!(storage.occupy(ix, false).is_ok());
668 });
669 storage.delete_file_on_drop = false;
670 let len = storage.mmap.len();
671 (0..expected_capacity as usize).for_each(|i| {
672 storage.mmap[i] = (i % 256) as u8;
673 });
674 let path = storage.path.clone();
676 drop(storage);
677
678 let storage = BucketStorage::<IndexBucket<u64>>::load_on_restart(
680 path,
681 NonZeroU64::new(elem_size).unwrap(),
682 max_search,
683 stats,
684 count,
685 )
686 .unwrap();
687 assert_eq!(storage.capacity(), expected_capacity);
688 assert_eq!(len, storage.mmap.len());
689 (0..expected_capacity as usize).for_each(|i| {
690 assert_eq!(storage.mmap[i], (i % 256) as u8);
691 });
692 (0..num_elems).for_each(|ix| {
693 assert!(storage.is_free(ix));
695 });
696 }
697 }
698
699 #[test]
700 #[should_panic]
701 fn test_header_bad_size() {
702 struct BucketBadHeader;
703 impl BucketCapacity for BucketBadHeader {
704 fn capacity(&self) -> u64 {
705 unimplemented!();
706 }
707 }
708 impl BucketOccupied for BucketBadHeader {
709 fn occupy(&mut self, _element: &mut [u8], _ix: usize) {
710 unimplemented!();
711 }
712 fn free(&mut self, _element: &mut [u8], _ix: usize) {
713 unimplemented!();
714 }
715 fn is_free(&self, _element: &[u8], _ix: usize) -> bool {
716 unimplemented!();
717 }
718 fn offset_to_first_data() -> usize {
719 std::mem::size_of::<u64>() - 1
721 }
722 fn new(_num_elements: Capacity) -> Self {
723 Self
724 }
725 }
726
727 BucketStorage::<BucketBadHeader>::new_with_capacity(
729 Arc::default(),
730 0,
731 0,
732 Capacity::Pow2(0),
733 0,
734 Arc::default(),
735 Arc::default(),
736 );
737 }
738}