1use crate::arch::size::MAX_MAP_SIZE;
2use crate::bucket::BucketRwIApi;
3use crate::common::bucket::BucketHeader;
4use crate::common::bump::PinBump;
5use crate::common::defaults::{
6 DEFAULT_ALLOC_SIZE, DEFAULT_MAX_BATCH_DELAY, DEFAULT_MAX_BATCH_SIZE, DEFAULT_PAGE_SIZE, MAGIC,
7 MAX_MMAP_STEP, PGID_NO_FREE_LIST, VERSION,
8};
9use crate::common::lock::LockGuard;
10use crate::common::page::freelist::{Freelist, MappedFreeListPage};
11use crate::common::page::meta::{MappedMetaPage, Meta};
12use crate::common::page::tree::leaf::MappedLeafPage;
13use crate::common::page::{CoerciblePage, MutPage, PageHeader, RefPage};
14use crate::common::pool::{SyncPool, SyncReusable};
15use crate::common::self_owned::SelfOwned;
16use crate::common::{BVec, PgId, SplitRef, TxId};
17use crate::tx::{
18 TxCell, TxClosingState, TxIApi, TxImpl, TxRef, TxRwApi, TxRwImpl, TxRwRef, TxStats,
19};
20use crate::{Error, TxApi};
21use aligners::{alignment, AlignedBytes};
22use anyhow::anyhow;
23use fs4::fs_std::FileExt;
24use memmap2::{Advice, MmapOptions, MmapRaw};
25use monotonic_timer::{Guard, Timer};
26use parking_lot::{Mutex, MutexGuard, RwLock};
27use std::fs::File;
28use std::io::{Read, Seek, SeekFrom, Write};
29use std::ops::{Deref, DerefMut};
30use std::path::{Path, PathBuf};
31use std::pin::Pin;
32use std::sync::atomic::{AtomicI64, Ordering};
33use std::sync::mpsc::{Receiver, SyncSender};
34use std::sync::{mpsc, Arc, OnceLock, Weak};
35use std::time::Duration;
36#[cfg(feature = "try-begin")]
37use std::time::Instant;
38use std::{fs, io, mem, thread};
39use typed_builder::TypedBuilder;
40
41pub trait DbApi: Clone + Send + Sync
43where
44 Self: Sized,
45{
46 fn begin(&self) -> crate::Result<impl TxApi>;
85
86 #[cfg(feature = "try-begin")]
87 fn try_begin(&self) -> crate::Result<Option<impl TxApi>>;
88
89 #[cfg(feature = "try-begin")]
90 fn try_begin_for(&self, duration: Duration) -> crate::Result<Option<impl TxApi>>;
91
92 #[cfg(feature = "try-begin")]
93 fn try_begin_until(&self, instant: Instant) -> crate::Result<Option<impl TxApi>>;
94
95 fn view<'tx, F: FnMut(TxRef<'tx>) -> crate::Result<()>>(&'tx self, f: F) -> crate::Result<()>;
120
121 fn stats(&self) -> Arc<DbStats>;
125
126 fn path(&self) -> &DbPath;
139
140 fn info(&self) -> DbInfo;
153
154 fn close(self);
179}
180
181pub trait DbRwAPI: DbApi {
183 fn begin_rw(&mut self) -> crate::Result<impl TxRwApi>;
221
222 #[cfg(feature = "try-begin")]
223 fn try_begin_rw(&self) -> crate::Result<Option<impl TxRwApi>>;
224
225 #[cfg(feature = "try-begin")]
226 fn try_begin_rw_for(&self, duration: Duration) -> crate::Result<Option<impl TxRwApi>>;
227
228 #[cfg(feature = "try-begin")]
229 fn try_begin_rw_until(&self, instant: Instant) -> crate::Result<Option<impl TxRwApi>>;
230
231 fn update<'tx, F: FnMut(TxRwRef<'tx>) -> crate::Result<()>>(
260 &'tx mut self, f: F,
261 ) -> crate::Result<()>;
262
263 fn batch<F>(&mut self, f: F) -> crate::Result<()>
303 where
304 F: FnMut(&mut TxRwRef) -> crate::Result<()> + Send + Sync + Clone + 'static;
305
306 fn sync(&mut self) -> crate::Result<()>;
328}
329
330#[derive(Default)]
331pub struct DbStats {
333 tx_stats: TxStats,
335
336 free_page_n: AtomicI64,
339 pending_page_n: AtomicI64,
341 free_alloc: AtomicI64,
343 free_list_in_use: AtomicI64,
345
346 tx_n: AtomicI64,
349 open_tx_n: AtomicI64,
351}
352
353impl DbStats {
354 pub fn tx_stats(&self) -> &TxStats {
356 &self.tx_stats
357 }
358
359 pub fn free_page_n(&self) -> i64 {
361 self.free_page_n.load(Ordering::Acquire)
362 }
363
364 pub(crate) fn set_free_page_n(&self, value: i64) {
365 self.free_page_n.store(value, Ordering::Release);
366 }
367
368 pub fn pending_page_n(&self) -> i64 {
370 self.pending_page_n.load(Ordering::Acquire)
371 }
372
373 pub(crate) fn set_pending_page_n(&self, value: i64) {
374 self.pending_page_n.store(value, Ordering::Release);
375 }
376
377 pub fn free_alloc(&self) -> i64 {
379 self.free_alloc.load(Ordering::Acquire)
380 }
381
382 pub(crate) fn set_free_alloc(&self, value: i64) {
383 self.free_alloc.store(value, Ordering::Release);
384 }
385
386 pub fn free_list_in_use(&self) -> i64 {
388 self.free_list_in_use.load(Ordering::Acquire)
389 }
390
391 pub(crate) fn set_free_list_in_use(&self, value: i64) {
392 self.free_list_in_use.store(value, Ordering::Release);
393 }
394
395 pub fn tx_n(&self) -> i64 {
397 self.tx_n.load(Ordering::Acquire)
398 }
399
400 pub(crate) fn inc_tx_n(&self, delta: i64) {
401 self.tx_n.fetch_add(delta, Ordering::Relaxed);
402 }
403
404 pub fn open_tx_n(&self) -> i64 {
406 self.open_tx_n.load(Ordering::Acquire)
407 }
408
409 pub(crate) fn sub(&self, rhs: &DbStats) -> DbStats {
410 let diff = self.clone();
411 diff.inc_tx_n(-rhs.tx_n());
412 diff.tx_stats.sub_assign(&rhs.tx_stats);
413 diff
414 }
415}
416
417impl Clone for DbStats {
418 fn clone(&self) -> Self {
419 DbStats {
420 tx_stats: self.tx_stats.clone(),
421 free_page_n: self.free_page_n().into(),
422 pending_page_n: self.pending_page_n().into(),
423 free_alloc: self.free_alloc().into(),
424 free_list_in_use: self.free_list_in_use().into(),
425 tx_n: self.tx_n().into(),
426 open_tx_n: self.open_tx_n().into(),
427 }
428 }
429}
430
431pub struct DbState {
432 txs: Vec<TxId>,
433 rwtx: Option<TxId>,
434 is_open: bool,
435 current_meta: Meta,
436}
437
438impl DbState {
439 fn new(current_meta: Meta) -> DbState {
440 DbState {
441 txs: vec![],
442 rwtx: None,
443 is_open: true,
444 current_meta,
445 }
446 }
447}
448
449fn mmap_size(page_size: usize, size: u64) -> crate::Result<u64> {
450 for i in 15..=30usize {
451 if size <= 1 << i {
452 return Ok(1 << i);
453 }
454 }
455 if size > MAX_MAP_SIZE.bytes() as u64 {
456 return Err(Error::MMapTooLarge);
457 }
458
459 let mut sz = size;
460 let remainder = sz % MAX_MMAP_STEP.bytes() as u64;
461 if remainder > 0 {
462 sz += MAX_MMAP_STEP.bytes() as u64 - remainder;
463 }
464
465 let ps = page_size as u64;
466 if sz % ps != 0 {
467 sz = ((sz / ps) + 1) * ps;
468 }
469
470 if sz > MAX_MAP_SIZE.bytes() as u64 {
471 sz = MAX_MAP_SIZE.bytes() as u64;
472 }
473
474 Ok(sz)
475}
476
477#[derive(Clone, PartialOrd, PartialEq, Ord, Eq, Debug)]
479pub enum DbPath {
480 Memory,
481 FilePath(PathBuf),
482}
483
484impl DbPath {
485 pub fn file_path(&self) -> Option<&Path> {
486 match self {
487 DbPath::Memory => None,
488 DbPath::FilePath(p) => Some(p),
489 }
490 }
491}
492
493#[derive(Clone, Debug)]
495pub struct DbInfo {
496 pub page_size: usize,
497}
498
499pub(crate) trait DBBackend: Send + Sync {
500 fn page_size(&self) -> usize;
501 fn data_size(&self) -> u64;
502
503 fn validate_meta(&self) -> crate::Result<()> {
504 let meta0 = self.meta0();
505 let meta1 = self.meta1();
506 if let (Err(error), Err(_)) = (meta0.meta.validate(), meta1.meta.validate()) {
507 return Err(error);
508 }
509 Ok(())
510 }
511
512 fn meta(&self) -> Meta {
513 let meta0 = self.meta0();
514 let meta1 = self.meta1();
515 let (meta_a, meta_b) = {
516 if meta1.meta.txid() > meta0.meta.txid() {
517 (meta1.meta, meta0.meta)
518 } else {
519 (meta0.meta, meta1.meta)
520 }
521 };
522 if meta_a.validate().is_ok() {
523 return meta_a;
524 } else if meta_b.validate().is_ok() {
525 return meta_b;
526 }
527 panic!("bolt.db.meta: invalid meta page")
528 }
529
530 fn meta0(&self) -> MappedMetaPage;
531
532 fn meta1(&self) -> MappedMetaPage;
533
534 fn page<'tx>(&self, pg_id: PgId) -> RefPage<'tx>;
535
536 fn grow(&self, size: u64) -> crate::Result<()>;
538
539 fn mmap(&mut self, min_size: u64, tx: TxCell) -> crate::Result<()>;
542
543 fn fsync(&self) -> crate::Result<()>;
544 fn write_all_at(&self, buffer: &[u8], offset: u64) -> crate::Result<usize>;
545
546 fn freelist(&self) -> MutexGuard<Freelist>;
547}
548
549struct ClosedBackend {}
550
551impl DBBackend for ClosedBackend {
552 fn page_size(&self) -> usize {
553 unreachable!()
554 }
555
556 fn data_size(&self) -> u64 {
557 unreachable!()
558 }
559
560 fn meta0(&self) -> MappedMetaPage {
561 unreachable!()
562 }
563
564 fn meta1(&self) -> MappedMetaPage {
565 unreachable!()
566 }
567
568 fn page<'tx>(&self, _pg_id: PgId) -> RefPage<'tx> {
569 unreachable!()
570 }
571
572 fn grow(&self, _size: u64) -> crate::Result<()> {
573 unreachable!()
574 }
575
576 fn mmap(&mut self, _min_size: u64, _tx: TxCell) -> crate::Result<()> {
577 unreachable!()
578 }
579
580 fn fsync(&self) -> crate::Result<()> {
581 unreachable!()
582 }
583
584 fn write_all_at(&self, _buffer: &[u8], _offset: u64) -> crate::Result<usize> {
585 unreachable!()
586 }
587
588 fn freelist(&self) -> MutexGuard<Freelist> {
589 unreachable!()
590 }
591}
592
593struct MemBackend {
594 mmap: Mutex<AlignedBytes<alignment::Page>>,
595 freelist: OnceLock<Mutex<Freelist>>,
596 page_size: usize,
597 alloc_size: u64,
598 file_size: u64,
600 data_size: u64,
601}
602
603unsafe impl Send for MemBackend {}
604unsafe impl Sync for MemBackend {}
605
606impl DBBackend for MemBackend {
607 fn page_size(&self) -> usize {
608 self.page_size
609 }
610
611 fn data_size(&self) -> u64 {
612 self.data_size
613 }
614
615 fn meta0(&self) -> MappedMetaPage {
616 unsafe { MappedMetaPage::new(self.mmap.lock().as_ptr().cast_mut()) }
618 }
619
620 fn meta1(&self) -> MappedMetaPage {
621 unsafe { MappedMetaPage::new(self.mmap.lock().as_ptr().add(self.page_size).cast_mut()) }
623 }
624
625 fn page<'tx>(&self, pg_id: PgId) -> RefPage<'tx> {
626 let mmap = self.mmap.lock();
627 debug_assert!(((pg_id.0 as usize + 1) * self.page_size) <= mmap.len());
628 unsafe { RefPage::new(mmap.as_ptr().byte_add(pg_id.0 as usize * self.page_size)) }
629 }
630
631 fn grow(&self, size: u64) -> crate::Result<()> {
632 let mut mmap = self.mmap.lock();
633 if size <= mmap.len() as u64 {
634 return Ok(());
635 }
636 let mut new_mmap = AlignedBytes::new_zeroed(size as usize);
637 new_mmap[0..mmap.len()].copy_from_slice(&mmap);
638 *mmap = new_mmap;
639 Ok(())
640 }
641
642 fn mmap(&mut self, min_size: u64, _tx: TxCell) -> crate::Result<()> {
644 let mut size = {
645 let mmap = self.mmap.lock();
646 if mmap.len() < self.page_size * 2 {
647 return Err(Error::MMapTooSmall(mmap.len() as u64));
648 }
649 (mmap.len() as u64).max(min_size)
650 };
651 size = mmap_size(self.page_size, size)?;
652 self.validate_meta()?;
653
654 self.data_size = size;
655 Ok(())
656 }
657
658 fn fsync(&self) -> crate::Result<()> {
659 Ok(())
660 }
661
662 fn write_all_at(&self, buffer: &[u8], offset: u64) -> crate::Result<usize> {
663 let mut mmap = self.mmap.lock();
664 let write_to = &mut mmap[offset as usize..offset as usize + buffer.len()];
665 write_to.copy_from_slice(buffer);
666 let written = write_to.len();
667 Ok(written)
668 }
669
670 fn freelist(&self) -> MutexGuard<Freelist> {
671 self
672 .freelist
673 .get_or_init(|| {
674 let meta = self.meta();
675 let freelist_pgid = meta.free_list();
676 let refpage = self.page(freelist_pgid);
677 let freelist_page = MappedFreeListPage::coerce_ref(&refpage).unwrap();
678 let freelist = freelist_page.read();
679 Mutex::new(freelist)
680 })
681 .lock()
682 }
683}
684
685struct FileState {
686 file: File,
687 file_size: u64,
689}
690
691impl Deref for FileState {
692 type Target = File;
693
694 fn deref(&self) -> &Self::Target {
695 &self.file
696 }
697}
698
699impl DerefMut for FileState {
700 fn deref_mut(&mut self) -> &mut Self::Target {
701 &mut self.file
702 }
703}
704
705pub struct FileBackend {
706 path: Arc<PathBuf>,
707 file: Mutex<FileState>,
708 page_size: usize,
709 mmap: Option<MmapRaw>,
710 freelist: OnceLock<Mutex<Freelist>>,
711 alloc_size: u64,
712 data_size: u64,
713 use_mlock: bool,
714 grow_async: bool,
715 read_only: bool,
716}
717
718impl FileBackend {
719 fn invalidate(&mut self) {
720 self.data_size = 0;
721 }
722
723 fn munmap(&mut self) -> crate::Result<()> {
724 self.mmap = None;
725 self.invalidate();
726 Ok(())
727 }
728
729 fn has_synced_free_list(&self) -> bool {
730 self.meta().free_list() != PGID_NO_FREE_LIST
731 }
732
733 fn mmap_unlock(&mut self) -> crate::Result<()> {
734 if let Some(mmap) = &mut self.mmap {
735 mmap.unlock()?;
736 }
737 Ok(())
738 }
739
740 fn mmap_lock(&mut self) -> crate::Result<()> {
741 if let Some(mmap) = &mut self.mmap {
742 mmap.lock()?;
743 }
744 Ok(())
745 }
746
747 fn mmap_relock(&mut self) -> crate::Result<()> {
748 self.mmap_unlock()?;
749 self.mmap_lock()?;
750 Ok(())
751 }
752
753 fn get_page_size(file: &mut File) -> crate::Result<usize> {
754 let meta0_can_read = match Self::get_page_size_from_first_meta(file) {
756 Ok(page_size) => return Ok(page_size),
757 Err(Error::InvalidDatabase(meta_can_read)) => meta_can_read,
759 Err(e) => return Err(e),
760 };
761
762 let meta1_can_read = match Self::get_page_size_from_second_meta(file) {
764 Ok(page_size) => return Ok(page_size),
765 Err(Error::InvalidDatabase(meta_can_read)) => meta_can_read,
767 Err(e) => return Err(e),
768 };
769
770 if meta0_can_read || meta1_can_read {
779 return Ok(DEFAULT_PAGE_SIZE.bytes() as usize);
780 }
781 Err(Error::InvalidDatabase(false))
782 }
783
784 fn get_page_size_from_first_meta(file: &mut File) -> crate::Result<usize> {
786 let mut buffer = AlignedBytes::<alignment::Page>::new_zeroed(4096);
788 let refpage = RefPage::new(buffer.as_ptr());
789 let mut meta_can_read = false;
790 let bw = file
791 .seek(SeekFrom::Start(0))
792 .and_then(|_| file.read(&mut buffer))
793 .map_err(|_| Error::InvalidDatabase(meta_can_read))?;
794 if bw == buffer.len() {
795 meta_can_read = true;
796 if let Some(meta_page) = MappedMetaPage::coerce_ref(&refpage) {
797 if meta_page.meta.validate().is_ok() {
798 let page_size = meta_page.meta.page_size();
799 return Ok(page_size as usize);
800 }
801 }
802 }
803 Err(Error::InvalidDatabase(meta_can_read))
804 }
805
806 fn get_page_size_from_second_meta(file: &mut File) -> crate::Result<usize> {
807 let mut meta_can_read = false;
808 let metadata = file.metadata()?;
809 let file_size = metadata.len();
810 let mut buffer = AlignedBytes::<alignment::Page>::new_zeroed(4096);
812 for i in 0..15u64 {
813 let pos = 1024u64 << i;
814 if file_size < 1024 || pos >= file_size - 1024 {
815 break;
816 }
817 let bw = file
818 .seek(SeekFrom::Start(pos))
819 .and_then(|_| file.read(&mut buffer))
820 .map_err(|_| Error::InvalidDatabase(meta_can_read))? as u64;
821 if bw == buffer.len() as u64 || bw == file_size - pos {
822 meta_can_read = true;
823 if let Some(meta_page) = MappedMetaPage::coerce_ref(&RefPage::new(buffer.as_ptr())) {
824 if meta_page.meta.validate().is_ok() {
825 return Ok(meta_page.meta.page_size() as usize);
826 }
827 }
828 }
829 buffer.fill(0);
831 }
832 Err(Error::InvalidDatabase(meta_can_read))
833 }
834
835 pub(crate) fn file_size(&self) -> crate::Result<u64> {
836 let file_lock = self.file.lock();
837 let info = file_lock.metadata()?;
838 let size = info.len();
839 if size < (self.page_size * 2) as u64 {
840 return Err(Error::FileSizeTooSmall(size));
841 }
842 Ok(size)
843 }
844}
845
846impl DBBackend for FileBackend {
847 fn page_size(&self) -> usize {
848 self.page_size
849 }
850
851 fn data_size(&self) -> u64 {
852 self.data_size
853 }
854
855 fn meta0(&self) -> MappedMetaPage {
856 self
857 .mmap
858 .as_ref()
859 .map(|mmap| unsafe { MappedMetaPage::new(mmap.as_mut_ptr()) })
860 .unwrap()
861 }
862
863 fn meta1(&self) -> MappedMetaPage {
864 self
865 .mmap
866 .as_ref()
867 .map(|mmap| unsafe { MappedMetaPage::new(mmap.as_mut_ptr().add(self.page_size)) })
868 .unwrap()
869 }
870
871 fn page<'tx>(&self, pg_id: PgId) -> RefPage<'tx> {
872 let page_addr = pg_id.0 as usize * self.page_size;
873 let page_ptr = unsafe { self.mmap.as_ref().unwrap().as_ptr().add(page_addr) };
874 RefPage::new(page_ptr)
875 }
876
877 fn grow(&self, mut size: u64) -> crate::Result<()> {
878 let file_size = self.file.lock().file_size;
880 if size <= file_size {
881 return Ok(());
882 }
883 if self.data_size <= self.alloc_size {
886 size = self.data_size;
887 } else {
888 size += self.alloc_size;
889 }
890
891 if self.grow_async && !self.read_only {
894 let file_lock = self.file.lock();
895 #[cfg(mlock_supported)]
896 if self.use_mlock {
897 self.mmap.as_ref().unwrap().unlock()?;
898 }
899 if cfg!(not(target_os = "windows")) {
900 file_lock.set_len(size)?;
901 }
902 file_lock.sync_all()?;
903 #[cfg(mlock_supported)]
904 if self.use_mlock {
905 self.mmap.as_ref().unwrap().lock()?;
906 }
907 }
908
909 self.file.lock().file_size = size;
911 Ok(())
912 }
913
914 fn mmap(&mut self, min_size: u64, tx: TxCell) -> crate::Result<()> {
917 let file_size = self.file_size()?;
918 let mut size = file_size.max(min_size);
919
920 size = mmap_size(self.page_size, size)?;
921 if let Some(mmap) = self.mmap.take() {
922 #[cfg(mlock_supported)]
923 if self.use_mlock {
924 mmap.unlock()?;
925 }
926 tx.cell.bound().own_in();
927 }
928
929 let file_lock = self.file.lock();
930 let mmap = MmapOptions::new()
931 .len(size as usize)
932 .map_raw(&**file_lock)?;
933 #[cfg(mlock_supported)]
934 if self.use_mlock {
935 mmap.lock()?;
936 }
937 #[cfg(mmap_advise_supported)]
938 mmap.advise(Advice::Random)?;
939
940 self.mmap = Some(mmap);
941
942 let r0 = self.meta0().meta.validate();
943 let r1 = self.meta1().meta.validate();
944
945 if r0.is_err() && r1.is_err() {
946 return r0;
947 }
948
949 self.data_size = size;
950 Ok(())
951 }
952
953 fn fsync(&self) -> crate::Result<()> {
954 self.file.lock().sync_all().map_err(Error::IO)
955 }
956
957 fn write_all_at(&self, buffer: &[u8], offset: u64) -> crate::Result<usize> {
959 let mut file_lock = self.file.lock();
960 file_lock.seek(SeekFrom::Start(offset)).map_err(Error::IO)?;
961 file_lock
962 .write_all(buffer)
963 .map_err(Error::IO)
964 .map(|_| buffer.len())
965 }
966
967 fn freelist(&self) -> MutexGuard<Freelist> {
968 self
969 .freelist
970 .get_or_init(|| {
971 let meta = self.meta();
972 let freelist_pgid = meta.free_list();
973 let refpage = self.page(freelist_pgid);
974 let freelist_page = MappedFreeListPage::coerce_ref(&refpage).unwrap();
975 let freelist = freelist_page.read();
976 Mutex::new(freelist)
977 })
978 .lock()
979 }
980}
981
982impl Drop for FileBackend {
983 fn drop(&mut self) {
984 if !self.read_only {
985 match self.file.lock().unlock() {
986 Ok(_) => {}
987 Err(_) => {
989 todo!("log unlock error")
990 }
991 }
992 }
993 }
994}
995
996pub(crate) enum AllocateResult<'tx> {
997 Page(SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>),
998 PageWithNewSize(SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>, u64),
999}
1000
1001pub(crate) trait DbIApi<'tx>: 'tx {
1002 fn page(&self, pg_id: PgId) -> RefPage<'tx>;
1003
1004 fn is_page_free(&self, pg_id: PgId) -> bool;
1005
1006 fn remove_tx(&self, rem_tx: TxId, tx_stats: Arc<TxStats>);
1007 fn allocate(&self, tx: TxCell, page_count: u64) -> AllocateResult<'tx>;
1008
1009 fn free_page(&self, txid: TxId, p: &PageHeader);
1010 fn free_pages(&self, state: &mut DbState);
1011
1012 fn freelist_count(&self) -> u64;
1013
1014 fn freelist_copyall(&self, all: &mut BVec<PgId>);
1015
1016 fn commit_freelist(&self, tx: TxCell<'tx>) -> crate::Result<AllocateResult<'tx>>;
1017
1018 fn write_all_at(&self, buf: &[u8], offset: u64) -> crate::Result<usize>;
1019
1020 fn fsync(&self) -> crate::Result<()>;
1021 fn repool_allocated(&self, page: AlignedBytes<alignment::Page>);
1022
1023 fn remove_rw_tx(&self, tx_closing_state: TxClosingState, rem_tx: TxId, tx_stats: Arc<TxStats>);
1024
1025 fn grow(&self, size: u64) -> crate::Result<()>;
1026}
1027pub(crate) trait DbMutIApi<'tx>: DbIApi<'tx> {
1028 fn mmap_to_new_size(&mut self, min_size: u64, tx: TxCell) -> crate::Result<()>;
1029}
1030
1031impl<'tx> DbIApi<'tx> for LockGuard<'tx, DbShared> {
1032 fn page(&self, pg_id: PgId) -> RefPage<'tx> {
1033 match self {
1034 LockGuard::R(guard) => guard.page(pg_id),
1035 LockGuard::U(guard) => guard.borrow().page(pg_id),
1036 }
1037 }
1038
1039 fn is_page_free(&self, pg_id: PgId) -> bool {
1040 match self {
1041 LockGuard::R(guard) => guard.is_page_free(pg_id),
1042 LockGuard::U(guard) => guard.borrow().is_page_free(pg_id),
1043 }
1044 }
1045
1046 fn remove_tx(&self, rem_tx: TxId, tx_stats: Arc<TxStats>) {
1047 match self {
1048 LockGuard::R(guard) => guard.remove_tx(rem_tx, tx_stats),
1049 LockGuard::U(guard) => guard.borrow().remove_tx(rem_tx, tx_stats),
1050 }
1051 }
1052
1053 fn allocate(&self, tx: TxCell, page_count: u64) -> AllocateResult<'tx> {
1054 match self {
1055 LockGuard::R(guard) => guard.allocate(tx, page_count),
1056 LockGuard::U(guard) => guard.borrow().allocate(tx, page_count),
1057 }
1058 }
1059
1060 fn free_page(&self, txid: TxId, p: &PageHeader) {
1061 match self {
1062 LockGuard::R(guard) => guard.free_page(txid, p),
1063 LockGuard::U(guard) => guard.borrow().free_page(txid, p),
1064 }
1065 }
1066
1067 fn free_pages(&self, state: &mut DbState) {
1068 match self {
1069 LockGuard::R(guard) => guard.free_pages(state),
1070 LockGuard::U(guard) => guard.borrow().free_pages(state),
1071 }
1072 }
1073
1074 fn freelist_count(&self) -> u64 {
1075 match self {
1076 LockGuard::R(guard) => guard.freelist_count(),
1077 LockGuard::U(guard) => guard.borrow().freelist_count(),
1078 }
1079 }
1080
1081 fn freelist_copyall(&self, all: &mut BVec<PgId>) {
1082 match self {
1083 LockGuard::R(guard) => guard.freelist_copyall(all),
1084 LockGuard::U(guard) => guard.borrow().freelist_copyall(all),
1085 }
1086 }
1087
1088 fn commit_freelist(&self, tx: TxCell<'tx>) -> crate::Result<AllocateResult<'tx>> {
1089 match self {
1090 LockGuard::R(guard) => guard.commit_freelist(tx),
1091 LockGuard::U(guard) => guard.borrow().commit_freelist(tx),
1092 }
1093 }
1094
1095 fn write_all_at(&self, buf: &[u8], offset: u64) -> crate::Result<usize> {
1096 match self {
1097 LockGuard::R(guard) => guard.write_all_at(buf, offset),
1098 LockGuard::U(guard) => guard.borrow().write_all_at(buf, offset),
1099 }
1100 }
1101
1102 fn fsync(&self) -> crate::Result<()> {
1103 match self {
1104 LockGuard::R(guard) => guard.fsync(),
1105 LockGuard::U(guard) => guard.borrow().fsync(),
1106 }
1107 }
1108
1109 fn repool_allocated(&self, page: AlignedBytes<alignment::Page>) {
1110 match self {
1111 LockGuard::R(guard) => guard.repool_allocated(page),
1112 LockGuard::U(guard) => guard.borrow().repool_allocated(page),
1113 }
1114 }
1115
1116 fn remove_rw_tx(&self, tx_closing_state: TxClosingState, rem_tx: TxId, tx_stats: Arc<TxStats>) {
1117 match self {
1118 LockGuard::R(guard) => guard.remove_rw_tx(tx_closing_state, rem_tx, tx_stats),
1119 LockGuard::U(guard) => guard
1120 .borrow()
1121 .remove_rw_tx(tx_closing_state, rem_tx, tx_stats),
1122 }
1123 }
1124
1125 fn grow(&self, size: u64) -> crate::Result<()> {
1126 match self {
1127 LockGuard::R(guard) => guard.grow(size),
1128 LockGuard::U(guard) => guard.borrow().grow(size),
1129 }
1130 }
1131}
1132
1133pub struct DbShared {
1135 pub(crate) stats: Arc<DbStats>,
1136 pub(crate) db_state: Arc<Mutex<DbState>>,
1137 page_pool: Mutex<Vec<AlignedBytes<alignment::Page>>>,
1138 pub(crate) backend: Box<dyn DBBackend>,
1139 pub(crate) options: BoltOptions,
1140}
1141
1142unsafe impl Sync for DbShared {}
1144unsafe impl Send for DbShared {}
1145
1146impl<'tx> DbIApi<'tx> for DbShared {
1147 fn page(&self, pg_id: PgId) -> RefPage<'tx> {
1148 self.backend.page(pg_id)
1149 }
1150
1151 fn is_page_free(&self, pg_id: PgId) -> bool {
1152 self.backend.freelist().freed(pg_id)
1153 }
1154
1155 fn remove_tx(&self, rem_tx: TxId, tx_stats: Arc<TxStats>) {
1156 let mut records = self.db_state.lock();
1157 if let Some(pos) = records.txs.iter().position(|tx| *tx == rem_tx) {
1158 records.txs.swap_remove(pos);
1159 }
1160
1161 let n = records.txs.len();
1162 self.stats.open_tx_n.store(n as i64, Ordering::Release);
1163 self.stats.tx_stats.add_assign(&tx_stats);
1164 }
1165
1166 fn allocate(&self, tx: TxCell, page_count: u64) -> AllocateResult<'tx> {
1167 let tx_id = tx.api_id();
1168 let high_water = tx.meta().pgid();
1169 let bytes = if page_count == 1 && !self.page_pool.lock().is_empty() {
1170 let mut page = self.page_pool.lock().pop().unwrap();
1171 page.fill(0);
1172 page
1173 } else {
1174 AlignedBytes::new_zeroed(page_count as usize * self.backend.page_size())
1175 };
1176
1177 {
1179 let tx = tx.cell.borrow();
1180 let stats = tx.r.stats.as_ref().unwrap();
1181 stats.inc_page_count(page_count as i64);
1182 stats.inc_page_alloc((page_count * tx.r.meta.page_size() as u64) as i64);
1183 }
1184
1185 let mut mut_page = SelfOwned::new_with_map(bytes, |b| MutPage::new(b.as_mut_ptr()));
1186 mut_page.overflow = (page_count - 1) as u32;
1187
1188 if let Some(pid) = self.backend.freelist().allocate(tx_id, page_count) {
1189 mut_page.id = pid;
1190 return AllocateResult::Page(mut_page);
1191 }
1192
1193 mut_page.id = high_water;
1195 let min_size = (high_water.0 + page_count + 1) * self.backend.page_size() as u64;
1196 tx.split_r_mut().meta.set_pgid(high_water + page_count);
1197 if min_size > self.backend.data_size() {
1198 AllocateResult::PageWithNewSize(mut_page, min_size)
1199 } else {
1200 AllocateResult::Page(mut_page)
1201 }
1202 }
1203
1204 fn free_page(&self, txid: TxId, p: &PageHeader) {
1205 self.backend.freelist().free(txid, p)
1206 }
1207
1208 fn free_pages(&self, state: &mut DbState) {
1209 let mut freelist = self.backend.freelist();
1210 state.txs.sort();
1213 let mut min_id = TxId(0xFFFFFFFFFFFFFFFF);
1214 if !state.txs.is_empty() {
1215 min_id = *state.txs.first().unwrap();
1216 }
1217 if min_id.0 > 0 {
1218 freelist.release(min_id - 1);
1219 }
1220
1221 for t in &state.txs {
1223 freelist.release_range(min_id, *t - 1);
1224 min_id = *t + 1;
1225 }
1226 freelist.release_range(min_id, TxId(0xFFFFFFFFFFFFFFFF));
1227 }
1229
1230 fn freelist_count(&self) -> u64 {
1231 self.backend.freelist().count()
1232 }
1233
1234 fn freelist_copyall(&self, all: &mut BVec<PgId>) {
1235 self.backend.freelist().copy_all(all)
1236 }
1237
1238 fn commit_freelist(&self, tx: TxCell<'tx>) -> crate::Result<AllocateResult<'tx>> {
1239 let count = {
1242 let page_size = self.backend.page_size();
1243 let freelist_size = self.backend.freelist().size();
1244 (freelist_size / page_size as u64) + 1
1245 };
1246
1247 let mut freelist_page = self.allocate(tx, count);
1248 {
1249 let page = match &mut freelist_page {
1250 AllocateResult::Page(page) => page,
1251 AllocateResult::PageWithNewSize(page, _) => page,
1252 };
1253 self
1254 .backend
1255 .freelist()
1256 .write(MappedFreeListPage::mut_into(page))
1257 }
1258
1259 Ok(freelist_page)
1260 }
1261
1262 fn write_all_at(&self, buf: &[u8], offset: u64) -> crate::Result<usize> {
1263 self.backend.write_all_at(buf, offset)
1264 }
1265
1266 fn fsync(&self) -> crate::Result<()> {
1267 self.backend.fsync()
1268 }
1269
1270 fn repool_allocated(&self, page: AlignedBytes<alignment::Page>) {
1271 self.page_pool.lock().push(page);
1272 }
1273
1274 fn remove_rw_tx(&self, tx_closing_state: TxClosingState, rem_tx: TxId, tx_stats: Arc<TxStats>) {
1275 let mut state = self.db_state.lock();
1276
1277 let page_size = self.backend.page_size();
1278 let mut freelist = self.backend.freelist();
1279 if tx_closing_state.is_rollback() {
1280 freelist.rollback(rem_tx);
1281 if tx_closing_state.is_physical_rollback() {
1282 let freelist_page_id = self.backend.meta().free_list();
1283 let freelist_page_ref = self.backend.page(freelist_page_id);
1284 let freelist_page = MappedFreeListPage::coerce_ref(&freelist_page_ref).unwrap();
1285 freelist.reload(freelist_page);
1286 }
1287 }
1288
1289 let free_list_free_n = freelist.free_count();
1290 let free_list_pending_n = freelist.pending_count();
1291 let free_list_alloc = freelist.size();
1292
1293 let new_meta = self.backend.meta();
1294 state.current_meta = new_meta;
1295
1296 state.rwtx = None;
1297
1298 self.stats.set_free_page_n(free_list_free_n as i64);
1299 self.stats.set_pending_page_n(free_list_pending_n as i64);
1300 self
1301 .stats
1302 .set_free_alloc(((free_list_free_n + free_list_pending_n) * page_size as u64) as i64);
1303 self.stats.set_free_list_in_use(free_list_alloc as i64);
1304 self.stats.tx_stats.add_assign(&tx_stats);
1305 }
1306
1307 fn grow(&self, size: u64) -> crate::Result<()> {
1308 self.backend.grow(size)
1309 }
1310}
1311
1312impl<'tx> DbMutIApi<'tx> for DbShared {
1313 fn mmap_to_new_size(&mut self, min_size: u64, tx: TxCell) -> crate::Result<()> {
1314 self.backend.as_mut().mmap(min_size, tx)
1315 }
1316}
1317
1318#[derive(Clone, Default, Debug, PartialEq, Eq, TypedBuilder)]
1320#[builder(doc)]
1321pub struct BoltOptions {
1322 #[cfg(timeout_supported)]
1324 #[builder(
1325 default,
1326 setter(
1327 strip_option,
1328 skip,
1329 doc = "Timeout is the amount of time to wait to obtain a file lock. \
1330 When set to zero it will wait indefinitely."
1331 )
1332 )]
1333 timeout: Option<Duration>,
1334 #[builder(
1335 default,
1336 setter(
1337 skip,
1338 doc = "Sets the DB.NoGrowSync flag before memory mapping the file."
1339 )
1340 )]
1341 no_grow_sync: bool,
1342 #[builder(
1344 default,
1345 setter(
1346 skip,
1347 doc = "Do not sync freelist to disk.\
1348 This improves the database write performance under normal operation,\
1349 but requires a full database re-sync during recovery."
1350 )
1351 )]
1352 no_freelist_sync: bool,
1353 #[builder(setter(
1354 strip_bool,
1355 doc = "Sets whether to load the free pages when opening the db file.\
1356 Note when opening db in write mode, bbolt will always load the free pages."
1357 ))]
1358 preload_freelist: bool,
1359 #[builder(
1361 default,
1362 setter(
1363 strip_option,
1364 doc = "InitialMmapSize is the initial mmap size of the database in bytes. \
1365 Read transactions won't block write transaction if the InitialMmapSize is \
1366 large enough to hold database mmap size."
1367 )
1368 )]
1369 initial_mmap_size: Option<u64>,
1378 #[builder(default, setter(strip_option))]
1379 page_size: Option<usize>,
1381 #[builder(setter(strip_bool))]
1385 no_sync: bool,
1386 #[cfg(mlock_supported)]
1390 #[builder(setter(strip_bool))]
1391 mlock: bool,
1392 #[builder(
1393 default,
1394 setter(strip_option, doc = "max_batch_size is the maximum size of a batch.")
1395 )]
1396 max_batch_size: Option<u32>,
1397 #[builder(
1398 default,
1399 setter(
1400 strip_option,
1401 doc = "max_batch_delay is the maximum delay before a batch starts."
1402 )
1403 )]
1404 max_batch_delay: Option<Duration>,
1405 #[builder(default = false, setter(skip))]
1406 read_only: bool,
1409}
1410
1411impl BoltOptions {
1412 #[inline]
1413 pub(crate) fn timeout(&self) -> Option<Duration> {
1414 if cfg!(timeout_supported) {
1415 self.timeout
1416 } else {
1417 None
1418 }
1419 }
1420
1421 #[inline]
1422 pub(crate) fn no_grow_sync(&self) -> bool {
1423 self.no_grow_sync
1424 }
1425
1426 #[inline]
1427 pub(crate) fn no_freelist_sync(&self) -> bool {
1428 self.no_freelist_sync
1429 }
1430
1431 #[inline]
1432 pub(crate) fn preload_freelist(&self) -> bool {
1433 if self.read_only {
1434 self.preload_freelist
1435 } else {
1436 true
1437 }
1438 }
1439
1440 #[inline]
1441 pub(crate) fn initial_map_size(&self) -> Option<u64> {
1442 self.initial_mmap_size
1443 }
1444
1445 #[inline]
1446 pub(crate) fn page_size(&self) -> Option<usize> {
1447 self.page_size
1448 }
1449
1450 #[inline]
1451 pub(crate) fn no_sync(&self) -> bool {
1452 self.no_sync
1453 }
1454
1455 #[inline]
1456 pub(crate) fn mlock(&self) -> bool {
1457 if cfg!(mlock_supported) {
1458 self.mlock
1459 } else {
1460 false
1461 }
1462 }
1463
1464 #[inline]
1465 pub(crate) fn read_only(&self) -> bool {
1466 self.read_only
1467 }
1468
1469 pub fn open<T: AsRef<Path>>(self, path: T) -> crate::Result<Bolt> {
1472 Bolt::open_path(path, self)
1473 }
1474
1475 pub fn open_ro<T: AsRef<Path>>(mut self, path: T) -> crate::Result<impl DbApi> {
1478 self.read_only = true;
1479 Bolt::open_path(path, self)
1480 }
1481
1482 pub fn open_mem(self) -> crate::Result<Bolt> {
1484 Bolt::new_mem_with_options(self)
1485 }
1486}
1487
1488type BatchFn = dyn FnMut(&mut TxRwRef) -> crate::Result<()> + Send + Sync + 'static;
1489
1490struct Call {
1491 f: Box<BatchFn>,
1492 err: SyncSender<crate::Result<()>>,
1493}
1494
1495struct ScheduledBatch {
1496 timer_guard: Option<Guard>,
1497 calls: Vec<Call>,
1498}
1499
1500impl ScheduledBatch {
1501 fn cancel_schedule(&mut self) {
1502 if let Some(guard) = self.timer_guard.take() {
1503 guard.ignore()
1504 }
1505 }
1506
1507 fn run(&mut self, db: &mut Bolt) {
1508 'retry: loop {
1509 if self.calls.is_empty() {
1510 break;
1511 }
1512 let mut fail_idx = None;
1513 let _ = db.update(|mut tx| {
1514 for (i, call) in self.calls.iter_mut().enumerate() {
1515 let result = (call.f)(&mut tx);
1516 if result.is_err() {
1517 fail_idx = Some(i);
1518 return result;
1519 }
1520 }
1521 Ok(())
1522 });
1523 if let Some(idx) = fail_idx {
1524 let call = self.calls.remove(idx);
1525 call.err.send(Err(Error::TrySolo)).unwrap();
1526 continue 'retry;
1527 }
1528 for call in &self.calls {
1529 call.err.send(Ok(())).unwrap()
1530 }
1531 break;
1532 }
1533 }
1534}
1535
1536struct InnerBatcher {
1537 timer: Timer,
1538 batch_pool: Arc<SyncPool<ScheduledBatch>>,
1539 scheduled: Mutex<SyncReusable<ScheduledBatch>>,
1540}
1541
1542impl InnerBatcher {
1543 fn new(parent: &Arc<Batcher>) -> InnerBatcher {
1544 let b = Arc::downgrade(parent);
1545 let timer = Timer::new();
1546 let batch_pool = SyncPool::new(
1547 || ScheduledBatch {
1548 timer_guard: None,
1549 calls: Vec::with_capacity(0),
1550 },
1551 |batch| {
1552 batch.timer_guard = None;
1553 batch.calls.clear();
1554 },
1555 );
1556 let guard = timer.schedule_with_delay(parent.max_batch_delay, move || {
1557 let batcher = b.upgrade().unwrap();
1558 let mut db = Bolt {
1559 inner: batcher.db.upgrade().unwrap(),
1560 };
1561 batcher.take_batch().run(&mut db)
1562 });
1563
1564 let mut scheduled = batch_pool.pull();
1565 scheduled.timer_guard = Some(guard);
1566 InnerBatcher {
1567 timer,
1568 batch_pool,
1569 scheduled: scheduled.into(),
1570 }
1571 }
1572}
1573
1574struct Batcher {
1575 inner: OnceLock<InnerBatcher>,
1576 db: Weak<InnerDB>,
1577 max_batch_delay: Duration,
1578 max_batch_size: u32,
1579}
1580
1581impl Batcher {
1582 fn new(db: Weak<InnerDB>, max_batch_delay: Duration, max_batch_size: u32) -> Arc<Batcher> {
1583 Arc::new(Batcher {
1584 inner: Default::default(),
1585 db,
1586 max_batch_delay,
1587 max_batch_size,
1588 })
1589 }
1590
1591 fn inner<'a>(self: &'a Arc<Batcher>) -> &'a InnerBatcher {
1592 self.inner.get_or_init(move || InnerBatcher::new(self))
1593 }
1594
1595 fn batch<F>(self: &Arc<Batcher>, mut db: Bolt, mut f: F) -> crate::Result<()>
1596 where
1597 F: FnMut(&mut TxRwRef) -> crate::Result<()> + Send + Sync + Clone + 'static,
1598 {
1599 if self.max_batch_size == 0 || self.max_batch_delay.is_zero() {
1600 return Err(Error::BatchDisabled);
1601 }
1602 let inner = self.inner();
1603 let (call_len, rx) = {
1604 let mut batch = inner.scheduled.lock();
1605
1606 let (tx, rx): (SyncSender<crate::Result<()>>, Receiver<crate::Result<()>>) =
1607 mpsc::sync_channel(1);
1608 batch.calls.push(Call {
1609 f: Box::new(f.clone()),
1610 err: tx,
1611 });
1612 (batch.calls.len(), rx)
1613 };
1614 if call_len > self.max_batch_size as usize {
1615 let mut immediate = self.take_batch();
1616 if !immediate.calls.is_empty() {
1617 let mut i_db = db.clone();
1618 thread::spawn(move || immediate.run(&mut i_db));
1619 }
1620 }
1621
1622 let result = rx.recv().unwrap();
1623 if Err(Error::TrySolo) == result {
1624 db.update(|mut tx| f(&mut tx))?;
1625 }
1626 Ok(())
1627 }
1628
1629 fn schedule_batch(self: &Arc<Batcher>) -> Guard {
1630 let inner = self.inner();
1631 let b = Arc::downgrade(self);
1632 inner
1633 .timer
1634 .schedule_with_delay(self.max_batch_delay, move || {
1635 let batcher = b.upgrade().unwrap();
1636 let mut db = Bolt {
1637 inner: batcher.db.upgrade().unwrap(),
1638 };
1639 batcher.take_batch().run(&mut db)
1640 })
1641 }
1642
1643 fn take_batch(self: &Arc<Batcher>) -> SyncReusable<ScheduledBatch> {
1644 let inner = self.inner();
1645 let mut swap_batch = inner.batch_pool.pull();
1646 let mut lock = inner.scheduled.lock();
1647 mem::swap(&mut swap_batch, &mut *lock);
1648 swap_batch.cancel_schedule();
1649 let guard = self.schedule_batch();
1650 lock.timer_guard = Some(guard);
1651 swap_batch
1652 }
1653}
1654
1655pub struct InnerDB {
1657 path: Arc<DbPath>,
1658 bump_pool: Arc<SyncPool<Pin<Box<PinBump>>>>,
1659 db: RwLock<DbShared>,
1660 stats: Arc<DbStats>,
1661 db_state: Arc<Mutex<DbState>>,
1662 batcher: Arc<Batcher>,
1663}
1664
1665unsafe impl Send for InnerDB {}
1666unsafe impl Sync for InnerDB {}
1667
1668#[derive(Clone)]
1670pub struct Bolt {
1671 inner: Arc<InnerDB>,
1672}
1673
1674impl Bolt {
1675 pub fn open<T: AsRef<Path>>(path: T) -> crate::Result<Self> {
1678 Bolt::open_path(path, BoltOptions::default())
1679 }
1680
1681 pub fn open_ro<T: AsRef<Path>>(path: T) -> crate::Result<impl DbApi> {
1684 Bolt::open_path(
1685 path,
1686 BoltOptions {
1687 read_only: true,
1688 ..Default::default()
1689 },
1690 )
1691 }
1692
1693 fn new_file_backend(path: &Path, bolt_options: BoltOptions) -> crate::Result<Bolt> {
1694 let read_only = bolt_options.read_only();
1695 let mut file = if bolt_options.read_only() {
1696 let file = fs::OpenOptions::new().read(true).open(path)?;
1697 file.lock_shared()?;
1698 file
1699 } else {
1700 let mut file = fs::OpenOptions::new()
1701 .write(true)
1702 .read(true)
1703 .create(true)
1704 .truncate(false)
1705 .open(path)?;
1706 file.lock_exclusive()?;
1707 if !path.exists() || path.metadata()?.len() == 0 {
1708 let page_size = bolt_options
1709 .page_size()
1710 .unwrap_or(DEFAULT_PAGE_SIZE.bytes() as usize);
1711 Bolt::init(path, &mut file, page_size)?;
1712 }
1713 file
1714 };
1715 let page_size = FileBackend::get_page_size(&mut file)?;
1716 assert!(page_size > 0, "invalid page size");
1717
1718 let file_size = file.metadata()?.len();
1719 let data_size = if let Some(initial_mmap_size) = bolt_options.initial_map_size() {
1720 file_size.max(initial_mmap_size)
1721 } else {
1722 file_size
1723 };
1724 let options = MmapOptions::new()
1725 .offset(0)
1726 .len(data_size as usize)
1727 .to_owned();
1728 let mmap = if read_only {
1729 options.map_raw_read_only(&file)?
1730 } else {
1731 options.map_raw(&file)?
1732 };
1733 #[cfg(mlock_supported)]
1734 if bolt_options.mlock() {
1735 mmap.lock()?;
1736 }
1737
1738 #[cfg(mmap_advise_supported)]
1739 mmap.advise(Advice::Random)?;
1740
1741 let backend = FileBackend {
1742 path: Arc::new(path.into()),
1743 file: Mutex::new(FileState { file, file_size }),
1744 page_size,
1745 mmap: Some(mmap),
1746 freelist: OnceLock::new(),
1747 alloc_size: DEFAULT_ALLOC_SIZE.bytes() as u64,
1748 data_size,
1749 use_mlock: bolt_options.mlock(),
1750 grow_async: !bolt_options.no_grow_sync(),
1751 read_only,
1752 };
1753 backend.file_size()?;
1754 let backend = Box::new(backend);
1755 Self::new_db(DbPath::FilePath(path.into()), bolt_options, backend)
1756 }
1757
1758 fn new_mem_with_options(bolt_options: BoltOptions) -> crate::Result<Bolt> {
1759 let page_size = bolt_options
1760 .page_size()
1761 .unwrap_or(DEFAULT_PAGE_SIZE.bytes() as usize);
1762 let mut mmap = Bolt::init_page(page_size);
1763 let file_size = mmap.len() as u64;
1764 let data_size = if let Some(initial_mmap_size) = bolt_options.initial_map_size() {
1765 file_size.max(initial_mmap_size)
1766 } else {
1767 file_size
1768 };
1769 if file_size < data_size {
1770 let mut new_mmap = AlignedBytes::new_zeroed(data_size as usize);
1771 new_mmap
1772 .split_at_mut(file_size as usize)
1773 .0
1774 .copy_from_slice(&mmap);
1775 mmap = new_mmap;
1776 }
1777 let backend = MemBackend {
1778 mmap: Mutex::new(mmap),
1779 freelist: OnceLock::new(),
1780 page_size,
1781 alloc_size: DEFAULT_ALLOC_SIZE.bytes() as u64,
1782 file_size,
1783 data_size,
1784 };
1785 let backend = Box::new(backend);
1786 Self::new_db(DbPath::Memory, bolt_options, backend)
1787 }
1788
1789 fn new_db(
1790 db_path: DbPath, bolt_options: BoltOptions, backend: Box<dyn DBBackend>,
1791 ) -> crate::Result<Self> {
1792 backend.validate_meta()?;
1793 let mut free_count = 0u64;
1794 if bolt_options.preload_freelist() {
1795 free_count = backend.freelist().free_count();
1796 }
1797 let meta = backend.meta();
1798 if meta.free_list() == PGID_NO_FREE_LIST {
1799 return Err(Error::Other(anyhow!(
1800 "PGID_NO_FREE_LIST not currently supported"
1801 )));
1802 }
1803 let db_state = Arc::new(Mutex::new(DbState::new(meta)));
1804 let stats = DbStats {
1805 free_page_n: (free_count as i64).into(),
1806 ..Default::default()
1807 };
1808 let arc_stats = Arc::new(stats);
1809 let bump_pool = SyncPool::new(
1810 || Box::pin(PinBump::default()),
1811 |bump| Pin::as_mut(bump).reset(),
1812 );
1813
1814 let inner = Arc::new_cyclic(|weak| InnerDB {
1815 path: Arc::new(db_path),
1816 bump_pool,
1817 db: RwLock::new(DbShared {
1818 stats: arc_stats.clone(),
1819 db_state: db_state.clone(),
1820 backend,
1821 page_pool: Mutex::new(vec![]),
1822 options: bolt_options.clone(),
1823 }),
1824 stats: arc_stats,
1825 db_state,
1826 batcher: Arc::new(Batcher {
1827 inner: Default::default(),
1828 db: weak.clone(),
1829 max_batch_delay: bolt_options
1830 .max_batch_delay
1831 .unwrap_or(DEFAULT_MAX_BATCH_DELAY),
1832 max_batch_size: bolt_options
1833 .max_batch_size
1834 .unwrap_or(DEFAULT_MAX_BATCH_SIZE),
1835 }),
1836 });
1837 Ok(Bolt { inner })
1838 }
1839
1840 fn open_path<T: AsRef<Path>>(path: T, db_options: BoltOptions) -> crate::Result<Self> {
1841 let pref = path.as_ref();
1842 Self::new_file_backend(pref, db_options)
1843 }
1844
1845 pub fn open_mem() -> crate::Result<Self> {
1847 Bolt::new_mem_with_options(BoltOptions::default())
1848 }
1849
1850 fn init_page(page_size: usize) -> AlignedBytes<alignment::Page> {
1851 let mut buffer = AlignedBytes::<alignment::Page>::new_zeroed(page_size * 4);
1852 for (i, page_bytes) in buffer.chunks_mut(page_size).enumerate() {
1853 let mut page = MutPage::new(page_bytes.as_mut_ptr());
1854 if i < 2 {
1855 let meta_page = MappedMetaPage::mut_into(&mut page);
1856 let ph = &mut meta_page.page;
1857 ph.id = PgId(i as u64);
1858 ph.count = 0;
1860 ph.overflow = 0;
1861 let meta = &mut meta_page.meta;
1862 meta.set_magic(MAGIC);
1863 meta.set_version(VERSION);
1864 meta.set_page_size(page_size as u32);
1865 meta.set_free_list(PgId(2));
1866 meta.set_root(BucketHeader::new(PgId(3), 0));
1867 meta.set_pgid(PgId(4));
1868 meta.set_txid(TxId(i as u64));
1869 meta.set_checksum(meta.sum64());
1870 } else if i == 2 {
1871 let free_list = MappedFreeListPage::mut_into(&mut page);
1872 free_list.id = PgId(2);
1873 free_list.count = 0;
1874 free_list.overflow = 0;
1875 } else if i == 3 {
1876 let leaf_page = MappedLeafPage::mut_into(&mut page);
1877 leaf_page.id = PgId(3);
1878 leaf_page.count = 0;
1879 leaf_page.overflow = 0;
1880 }
1881 }
1882 buffer
1883 }
1884
1885 fn init(path: &Path, db: &mut File, page_size: usize) -> io::Result<usize> {
1886 let buffer = Bolt::init_page(page_size);
1887 #[cfg(unix)]
1888 {
1889 use std::os::unix::fs::PermissionsExt;
1890 let metadata = db.metadata()?;
1891 let mut permissions = metadata.permissions();
1892 permissions.set_mode(0o600);
1893 fs::set_permissions(path, permissions)?;
1894 }
1895 db.write_all(&buffer)?;
1896 db.flush()?;
1897 Ok(buffer.len())
1898 }
1899
1900 fn require_open(state: &DbState) -> crate::Result<()> {
1901 if !state.is_open {
1902 return Err(Error::DatabaseNotOpen);
1903 }
1904 Ok(())
1905 }
1906
1907 pub(crate) fn begin_tx(&self) -> crate::Result<TxImpl> {
1908 let mut state = self.inner.db_state.lock();
1909 Bolt::require_open(&state)?;
1910 let lock = self.inner.db.read();
1911 let bump = self.inner.bump_pool.pull();
1912 let meta = state.current_meta;
1913 let txid = meta.txid();
1914 state.txs.push(txid);
1915 self.inner.stats.inc_tx_n(1);
1916 self
1917 .inner
1918 .stats
1919 .open_tx_n
1920 .store(state.txs.len() as i64, Ordering::Release);
1921 Ok(TxImpl::new(bump, lock, meta))
1922 }
1923
1924 #[cfg(feature = "try-begin")]
1925 pub(crate) fn try_begin_tx<'a, F>(&'a self, f: F) -> crate::Result<Option<TxImpl>>
1926 where
1927 F: Fn() -> Option<RwLockReadGuard<'a, DbShared>>,
1928 {
1929 let mut state = self.inner.db_state.lock();
1930 Bolt::require_open(&state)?;
1931 if let Some(lock) = f() {
1932 let bump = self.inner.bump_pool.pull();
1933 let meta = state.current_meta;
1934 let txid = meta.txid();
1935 state.txs.push(txid);
1936 self.inner.stats.inc_tx_n(1);
1937 self
1938 .inner
1939 .stats
1940 .open_tx_n
1941 .store(state.txs.len() as i64, Ordering::Release);
1942 Ok(Some(TxImpl::new(bump, lock, meta)))
1943 } else {
1944 Ok(None)
1945 }
1946 }
1947
1948 pub(crate) fn begin_rw_tx(&mut self) -> crate::Result<TxRwImpl> {
1949 let lock = self.inner.db.upgradable_read();
1950 let mut state = self.inner.db_state.lock();
1951 Bolt::require_open(&state)?;
1952 lock.free_pages(&mut state);
1953 let bump = self.inner.bump_pool.pull();
1954 let mut meta = state.current_meta;
1955 let txid = meta.txid() + 1;
1956 meta.set_txid(txid);
1957 state.rwtx = Some(txid);
1958 Ok(TxRwImpl::new(bump, lock, meta))
1959 }
1960
1961 #[cfg(feature = "try-begin")]
1962 pub(crate) fn try_begin_rw_tx<'a, F>(&'a self, f: F) -> crate::Result<Option<TxRwImpl>>
1963 where
1964 F: Fn() -> Option<RwLockUpgradableReadGuard<'a, DbShared>>,
1965 {
1966 if let Some(lock) = f() {
1967 lock.free_pages();
1968 let mut state = self.inner.db_state.lock();
1969 Bolt::require_open(&state)?;
1970 let bump = self.inner.bump_pool.pull();
1971 let mut meta = state.current_meta;
1972 let txid = meta.txid() + 1;
1973 meta.set_txid(txid);
1974 state.rwtx = Some(txid);
1975 Ok(Some(TxRwImpl::new(bump, lock, meta)))
1976 } else {
1977 Ok(None)
1978 }
1979 }
1980}
1981
1982impl DbApi for Bolt {
1983 fn begin(&self) -> crate::Result<impl TxApi> {
1984 self.begin_tx()
1985 }
1986
1987 #[cfg(feature = "try-begin")]
1988 fn try_begin(&self) -> crate::Result<Option<impl TxApi>> {
1989 self.try_begin_tx(|| self.inner.db.try_read())
1990 }
1991
1992 #[cfg(feature = "try-begin")]
1993 fn try_begin_for(&self, duration: Duration) -> crate::Result<Option<impl TxApi>> {
1994 self.try_begin_tx(|| self.inner.db.try_read_for(duration))
1995 }
1996
1997 #[cfg(feature = "try-begin")]
1998 fn try_begin_until(&self, instant: Instant) -> crate::Result<Option<impl TxApi>> {
1999 self.try_begin_tx(|| self.inner.db.try_read_until(instant))
2000 }
2001
2002 fn view<'tx, F: FnMut(TxRef<'tx>) -> crate::Result<()>>(
2003 &'tx self, mut f: F,
2004 ) -> crate::Result<()> {
2005 let tx = self.begin_tx()?;
2006 let tx_ref = tx.get_ref();
2007 let r = f(tx_ref);
2008 r
2009 }
2010
2011 fn stats(&self) -> Arc<DbStats> {
2012 self.inner.stats.clone()
2013 }
2014
2015 fn path(&self) -> &DbPath {
2016 &self.inner.path
2017 }
2018
2019 fn info(&self) -> DbInfo {
2020 DbInfo {
2021 page_size: self.inner.db.read().backend.page_size(),
2022 }
2023 }
2024
2025 fn close(self) {
2026 let mut lock = self.inner.db.write();
2027 let mut state = self.inner.db_state.lock();
2028 if Bolt::require_open(&state).is_ok() {
2029 state.is_open = false;
2030 let mut closed_db: Box<dyn DBBackend> = Box::new(ClosedBackend {});
2031 mem::swap(&mut closed_db, &mut lock.backend);
2032 lock.page_pool.lock().clear();
2033 self.inner.bump_pool.clear();
2034 if let Some(inner_batcher) = self.inner.batcher.inner.get() {
2035 inner_batcher.batch_pool.clear();
2036 }
2037 }
2038 }
2039}
2040
2041impl DbRwAPI for Bolt {
2042 fn begin_rw(&mut self) -> crate::Result<impl TxRwApi> {
2043 self.begin_rw_tx()
2044 }
2045
2046 #[cfg(feature = "try-begin")]
2047 fn try_begin_rw(&self) -> crate::Result<Option<impl TxRwApi>> {
2048 self.try_begin_rw_tx(|| self.inner.db.try_upgradable_read())
2049 }
2050
2051 #[cfg(feature = "try-begin")]
2052 fn try_begin_rw_for(&self, duration: Duration) -> crate::Result<Option<impl TxRwApi>> {
2053 self.try_begin_rw_tx(|| self.inner.db.try_upgradable_read_for(duration))
2054 }
2055
2056 #[cfg(feature = "try-begin")]
2057 fn try_begin_rw_until(&self, instant: Instant) -> crate::Result<Option<impl TxRwApi>> {
2058 self.try_begin_rw_tx(|| self.inner.db.try_upgradable_read_until(instant))
2059 }
2060
2061 fn update<'tx, F: FnMut(TxRwRef<'tx>) -> crate::Result<()>>(
2062 &'tx mut self, mut f: F,
2063 ) -> crate::Result<()> {
2064 let txrw = self.begin_rw_tx()?;
2065 let tx_ref = txrw.get_ref();
2066 match f(tx_ref) {
2067 Ok(_) => {
2068 txrw.commit()?;
2069 Ok(())
2070 }
2071 Err(e) => {
2072 let _ = txrw.rollback();
2073 Err(e)
2074 }
2075 }
2076 }
2077
2078 fn batch<F>(&mut self, f: F) -> crate::Result<()>
2079 where
2080 F: FnMut(&mut TxRwRef) -> crate::Result<()> + Send + Sync + Clone + 'static,
2081 {
2082 self.inner.batcher.batch(self.clone(), f)
2083 }
2084
2085 fn sync(&mut self) -> crate::Result<()> {
2086 self.inner.db.write().backend.fsync()?;
2087 Ok(())
2088 }
2089}
2090
2091#[cfg(test)]
2092mod test {
2093 use crate::common::defaults::DEFAULT_PAGE_SIZE;
2094 use crate::common::page::meta::MappedMetaPage;
2095 use crate::db::DbStats;
2096 use crate::test_support::{temp_file, TestDb};
2097 use crate::{
2098 Bolt, BoltOptions, BucketApi, BucketRwApi, DbApi, DbPath, DbRwAPI, Error, PgId, TxApi, TxCheck,
2099 TxRwApi, TxRwRefApi,
2100 };
2101 use aligners::{alignment, AlignedBytes};
2102 use std::io::{Read, Seek, SeekFrom, Write};
2103 use std::sync::mpsc::channel;
2104 use std::sync::Arc;
2105 use std::thread;
2106
2107 #[test]
2108 #[cfg(not(miri))]
2109 fn test_open() -> crate::Result<()> {
2110 let db = TestDb::new()?;
2111 db.clone_db().close();
2112 Ok(())
2113 }
2114
2115 #[test]
2116 #[cfg(feature = "long-tests")]
2117 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2118 fn test_open_multiple_threads() -> crate::Result<()> {
2119 let instances = 30;
2120 let iterations = 30;
2121 let mut threads = Vec::new();
2122 let temp_file = Arc::new(temp_file()?);
2123 let (tx, rx) = channel();
2124 for _ in 0..iterations {
2125 for _ in 0..instances {
2126 let t_file = temp_file.clone();
2127 let t_tx = tx.clone();
2128 let handle = thread::spawn(move || {
2129 let db = Bolt::open(t_file.path());
2130 if let Some(error) = db.err() {
2131 let s = format!("{}", &error);
2132 t_tx.send(error).unwrap();
2133 }
2134 });
2135 threads.push(handle);
2136 }
2137 while let Some(handle) = threads.pop() {
2138 handle.join().unwrap();
2139 }
2140 }
2141 drop(tx);
2142 if let Ok(error) = rx.try_recv() {
2143 panic!("Fatal error: {}", error);
2144 }
2145 Ok(())
2146 }
2147
2148 #[test]
2149 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2150 fn test_open_err_path_required() -> crate::Result<()> {
2151 let r = Bolt::open("");
2152 assert!(r.is_err());
2153 Ok(())
2154 }
2155
2156 #[test]
2157 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2158 fn test_open_err_not_exists() -> crate::Result<()> {
2159 let file = temp_file()?;
2160 let path = file.path().join("bad-path");
2161 let r = Bolt::open(path);
2162 assert!(r.is_err());
2163 Ok(())
2164 }
2165
2166 #[test]
2167 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2168 fn test_open_err_invalid() -> crate::Result<()> {
2169 let mut file = temp_file()?;
2170 file
2171 .as_file_mut()
2172 .write_all(b"this is not a bolt database")?;
2173 let r = Bolt::open(file.path());
2174 assert_eq!(Some(Error::InvalidDatabase(false)), r.err());
2175 Ok(())
2176 }
2177
2178 #[test]
2179 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2180 fn test_open_err_version_mismatch() -> crate::Result<()> {
2181 let mut file = temp_file()?;
2183 let db = Bolt::open(file.path())?;
2184 db.close();
2185 let mut bytes = AlignedBytes::<alignment::Page>::new_zeroed(4096 * 2);
2186 file.seek(SeekFrom::Start(0))?;
2187 file.read_exact(&mut bytes)?;
2188 let mut meta_0 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr()) };
2189 let meta0_version = meta_0.meta.version();
2190 meta_0.meta.set_version(meta0_version + 1);
2191 let mut meta_1 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr().add(4096)) };
2192 let meta1_version = meta_1.meta.version();
2193 meta_1.meta.set_version(meta1_version + 1);
2194 file.seek(SeekFrom::Start(0))?;
2195 file.write_all(&bytes)?;
2196 file.flush()?;
2197 let r = Bolt::open(file.path());
2198 assert_eq!(Some(Error::VersionMismatch), r.err());
2199 Ok(())
2200 }
2201
2202 #[test]
2203 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2204 fn test_open_err_checksum() -> crate::Result<()> {
2205 let mut file = temp_file()?;
2207 let db = Bolt::open(file.path())?;
2208 db.close();
2209 let mut bytes = AlignedBytes::<alignment::Page>::new_zeroed(4096 * 2);
2210 file.seek(SeekFrom::Start(0))?;
2211 file.read_exact(&mut bytes)?;
2212 let mut meta_0 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr()) };
2213 let meta0_pgid = meta_0.meta.pgid();
2214 meta_0.meta.set_pgid(meta0_pgid + 1);
2215 let mut meta_1 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr().add(4096)) };
2216 let meta1_pgid = meta_1.meta.pgid();
2217 meta_1.meta.set_pgid(meta1_pgid + 1);
2218 file.seek(SeekFrom::Start(0))?;
2219 file.write_all(&bytes)?;
2220 file.flush()?;
2221 let r = Bolt::open(file.path());
2222 assert_eq!(Some(Error::ChecksumMismatch), r.err());
2223 Ok(())
2224 }
2225
2226 #[test]
2227 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2228 fn test_open_read_page_size_from_meta1_os() -> crate::Result<()> {
2229 let mut file = temp_file()?;
2231 let db = Bolt::open(file.path())?;
2232 db.close();
2233 let mut bytes = AlignedBytes::<alignment::Page>::new_zeroed(4096 * 2);
2234 file.seek(SeekFrom::Start(0))?;
2235 file.read_exact(&mut bytes)?;
2236 let mut meta_0 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr()) };
2237 let meta0_pgid = meta_0.meta.pgid();
2238 meta_0.meta.set_pgid(meta0_pgid + 1);
2239 file.seek(SeekFrom::Start(0))?;
2240 file.write_all(&bytes)?;
2241 file.flush()?;
2242 let db = Bolt::open(file.path())?;
2243 assert_eq!(4096, db.info().page_size);
2244 Ok(())
2245 }
2246
2247 #[test]
2248 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2249 fn test_open_read_page_size_from_meta1_given() -> crate::Result<()> {
2250 for i in 0..=14usize {
2251 let given_page_size = 1024usize << i;
2252 let mut db = TestDb::with_options(BoltOptions::builder().page_size(given_page_size).build())?;
2253
2254 if i % 3 == 0 {
2255 db.must_close();
2256 let named_file = db.tmp_file.as_mut();
2257 let file = named_file.unwrap();
2258 let mut bytes = AlignedBytes::<alignment::Page>::new_zeroed(given_page_size * 2);
2259 file.seek(SeekFrom::Start(0))?;
2260 file.read_exact(&mut bytes)?;
2261 let mut meta_0 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr()) };
2262 let meta0_pgid = meta_0.meta.pgid();
2263 meta_0.meta.set_pgid(meta0_pgid + 1);
2264 file.seek(SeekFrom::Start(0))?;
2265 file.write_all(&bytes)?;
2266 file.flush()?;
2267 db.must_reopen();
2268 }
2269
2270 assert_eq!(given_page_size, db.info().page_size);
2271 }
2272 Ok(())
2273 }
2274
2275 #[test]
2276 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2277 fn test_open_size() -> crate::Result<()> {
2278 let mut db = TestDb::new()?;
2279 let page_size = db.info().page_size;
2280 let v = [0; 1000];
2281 for _tx_ct in 0..1 {
2282 db.update(|mut tx| {
2283 let mut b = tx.create_bucket_if_not_exists("data")?;
2284 for keys_ct in 0..10000 {
2285 let k = format!("{:04}", keys_ct);
2286 b.put(&k, &v)?;
2287 }
2288 Ok(())
2289 })?;
2290 }
2291 db.must_close();
2292
2293 let file_size = {
2294 let tmp_file = db.tmp_file.as_ref();
2295 let file = tmp_file.unwrap();
2296 file.path().metadata()?.len()
2297 };
2298 assert_ne!(0, file_size, "unexpected new file size");
2299
2300 db.must_reopen();
2301 db.update(|mut tx| {
2302 tx.bucket_mut("data").unwrap().put("0", "0")?;
2303 Ok(())
2304 })?;
2305
2306 db.must_close();
2307
2308 let new_size = {
2309 let tmp_file = db.tmp_file.as_ref();
2310 let file = tmp_file.unwrap();
2311 file.path().metadata()?.len()
2312 };
2313
2314 assert!(
2315 file_size > (new_size - (5 * page_size) as u64),
2316 "unexpected file growth: {} => {}",
2317 file_size,
2318 new_size
2319 );
2320
2321 Ok(())
2322 }
2323
2324 #[test]
2325 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2326 #[cfg(feature = "long-tests")]
2327 fn test_open_size_large() -> crate::Result<()> {
2328 let mut db = TestDb::new()?;
2329 let page_size = db.info().page_size;
2330 let v = [0; 50];
2331 for _tx_ct in 0..10000 {
2332 db.update(|mut tx| {
2333 let mut b = tx.create_bucket_if_not_exists("data")?;
2334 for keys_ct in 0..1000u64 {
2335 let k = keys_ct.to_be_bytes();
2336 b.put(k, v)?;
2337 }
2338 Ok(())
2339 })?;
2340 }
2341 db.must_close();
2342
2343 let file_size = {
2344 let tmp_file = db.tmp_file.as_ref();
2345 let file = tmp_file.unwrap();
2346 file.path().metadata()?.len()
2347 };
2348 assert_ne!(0, file_size, "unexpected new file size");
2349
2350 db.must_reopen();
2351 db.update(|mut tx| {
2352 tx.bucket_mut("data").unwrap().put("0", "0")?;
2353 Ok(())
2354 })?;
2355
2356 db.must_close();
2357
2358 let new_size = {
2359 let tmp_file = db.tmp_file.as_ref();
2360 let file = tmp_file.unwrap();
2361 file.path().metadata()?.len()
2362 };
2363
2364 assert!(
2365 file_size > (new_size - (5 * page_size) as u64),
2366 "unexpected file growth: {} => {}",
2367 file_size,
2368 new_size
2369 );
2370
2371 Ok(())
2372 }
2373
2374 #[test]
2375 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2376 fn test_open_check() -> crate::Result<()> {
2377 let mut db = TestDb::new()?;
2378 db.view(|tx| {
2379 assert!(tx.check().is_empty());
2380 Ok(())
2381 })?;
2382 db.must_close();
2383 db.must_reopen();
2384 db.view(|tx| {
2385 assert!(tx.check().is_empty());
2386 Ok(())
2387 })?;
2388 Ok(())
2389 }
2390
2391 #[test]
2392 #[ignore]
2393 fn test_open_meta_init_write_error() {
2394 todo!("pending in go")
2395 }
2396
2397 #[test]
2398 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2399 fn test_open_file_too_small() -> crate::Result<()> {
2400 let mut db = TestDb::new()?;
2401 db.must_close();
2402 {
2403 let temp_file = db.tmp_file.as_mut();
2404 let file = temp_file.unwrap();
2405 file.as_file_mut().set_len(4096)?;
2406 }
2407 assert_eq!(Some(Error::FileSizeTooSmall(4096)), db.reopen().err());
2408 Ok(())
2409 }
2410
2411 #[test]
2412 #[ignore]
2413 fn test_db_open_initial_mmap_size() {
2414 todo!()
2415 }
2416
2417 #[test]
2418 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2419 fn test_db_open_read_only() -> crate::Result<()> {
2420 let mut db = TestDb::new()?;
2421 db.update(|mut tx| {
2422 let mut b = tx.create_bucket("widgets")?;
2423 b.put("foo", "bar")?;
2424 Ok(())
2425 })?;
2426 let path = db.path().clone();
2427 db.must_close();
2428 let ro = match path {
2429 DbPath::Memory => panic!("Path is DbPath::Memory"),
2430 DbPath::FilePath(path) => Bolt::open_ro(path)?,
2431 };
2432 ro.view(|tx| {
2433 let b = tx.bucket("widgets").unwrap();
2434 assert_eq!(Some(b"bar".as_slice()), b.get("foo"));
2435 Ok(())
2436 })?;
2437 ro.close();
2438 Ok(())
2439 }
2440
2441 #[test]
2442 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2443 fn test_open_big_page() -> crate::Result<()> {
2444 let page_size = DEFAULT_PAGE_SIZE.bytes() as usize;
2445 let options = BoltOptions::builder().page_size(page_size * 2).build();
2446 let db1 = TestDb::with_options(options.clone())?;
2447 let options = BoltOptions::builder().page_size(page_size * 4).build();
2448 let db2 = TestDb::with_options(options.clone())?;
2449 let db1_len = db1.tmp_file.as_ref().unwrap().as_file().metadata()?.len();
2450 let db2_len = db2.tmp_file.as_ref().unwrap().as_file().metadata()?.len();
2451 assert!(db1_len < db2_len, "expected {} < {}", db1_len, db2_len);
2452 Ok(())
2453 }
2454
2455 #[test]
2456 #[ignore]
2457 fn test_open_recover_free_list() {
2458 todo!()
2459 }
2460
2461 #[test]
2462 fn test_db_begin_err_database_not_open() -> crate::Result<()> {
2463 let db = TestDb::new()?;
2464 let t_db = db.clone_db();
2465 t_db.close();
2466 let r = db.begin_tx();
2467 assert_eq!(Some(Error::DatabaseNotOpen), r.err());
2468 Ok(())
2469 }
2470
2471 #[test]
2472 fn test_db_begin_rw() -> crate::Result<()> {
2473 let mut db = TestDb::new()?;
2474 let tx = db.begin_rw()?;
2475 assert!(tx.writable());
2476 tx.commit()?;
2477 Ok(())
2478 }
2479
2480 #[test]
2481 #[ignore]
2482 fn test_db_concurrent_write_to() {
2483 todo!()
2484 }
2485
2486 #[test]
2487 fn test_db_begin_rw_closed() -> crate::Result<()> {
2488 let mut db = TestDb::new()?;
2489 let t_db = db.clone_db();
2490 t_db.close();
2491 let r = db.begin_rw_tx();
2492 assert_eq!(Some(Error::DatabaseNotOpen), r.err());
2493 Ok(())
2494 }
2495
2496 #[test]
2497 #[ignore]
2498 fn test_db_close_pending_tx_rw() {
2499 todo!()
2500 }
2501
2502 #[test]
2503 #[ignore]
2504 fn test_db_close_pending_tx_ro() {
2505 todo!()
2506 }
2507
2508 #[test]
2509 fn test_db_update() -> crate::Result<()> {
2510 let mut db = TestDb::new()?;
2511 db.update(|mut tx| {
2512 let mut b = tx.create_bucket("widgets")?;
2513 b.put("foo", "bar")?;
2514 b.put("baz", "bat")?;
2515 b.delete("foo")?;
2516 Ok(())
2517 })?;
2518
2519 db.view(|tx| {
2520 let b = tx.bucket("widgets").unwrap();
2521 assert_eq!(None, b.get("foo"));
2522 assert_eq!(Some(b"bat".as_slice()), b.get("baz"));
2523 Ok(())
2524 })?;
2525 Ok(())
2526 }
2527
2528 #[test]
2529 fn test_db_update_closed() -> crate::Result<()> {
2530 let mut db = TestDb::new()?;
2531 let t_db = db.clone_db();
2532 t_db.close();
2533 let r = db.update(|mut tx| {
2534 tx.create_bucket("widgets")?;
2535 Ok(())
2536 });
2537 assert_eq!(Some(Error::DatabaseNotOpen), r.err());
2538 Ok(())
2539 }
2540
2541 #[test]
2542 #[ignore]
2543 fn test_db_update_panic() -> crate::Result<()> {
2544 todo!()
2545 }
2546
2547 #[test]
2548 fn test_db_view_error() -> crate::Result<()> {
2549 let db = TestDb::new()?;
2550 let r = db.view(|_| Err(Error::InvalidDatabase(false))).err();
2551 assert_eq!(Some(Error::InvalidDatabase(false)), r);
2552 Ok(())
2553 }
2554
2555 #[test]
2556 #[ignore]
2557 fn test_db_view_panic() {
2558 todo!()
2559 }
2560
2561 #[test]
2562 fn test_db_stats() -> crate::Result<()> {
2563 let mut db = TestDb::new()?;
2564 db.update(|mut tx| {
2565 tx.create_bucket("widgets")?;
2566 Ok(())
2567 })?;
2568 let stats = db.stats();
2569 assert_eq!(2, stats.tx_stats.page_count());
2570 assert_eq!(0, stats.free_page_n());
2571 assert_eq!(2, stats.pending_page_n());
2572 Ok(())
2573 }
2574
2575 #[test]
2576 fn test_db_consistency() -> crate::Result<()> {
2577 let mut db = TestDb::new()?;
2578 db.update(|mut tx| {
2579 tx.create_bucket("widgets")?;
2580 Ok(())
2581 })?;
2582
2583 for _ in 0..10 {
2584 db.update(|mut tx| {
2585 tx.bucket_mut("widgets").unwrap().put("foo", "bar")?;
2586 Ok(())
2587 })?;
2588 }
2589
2590 db.update(|tx| {
2591 let p = tx.page(PgId(0)).expect("expected page");
2592 assert_eq!("meta", p.t);
2593 let p = tx.page(PgId(1)).expect("expected page");
2594 assert_eq!("meta", p.t);
2595 let p = tx.page(PgId(2)).expect("expected page");
2596 assert_eq!("free", p.t);
2597 let p = tx.page(PgId(3)).expect("expected page");
2598 assert_eq!("free", p.t);
2599 let p = tx.page(PgId(4)).expect("expected page");
2600 assert_eq!("leaf", p.t);
2601 let p = tx.page(PgId(5)).expect("expected page");
2602 assert_eq!("freelist", p.t);
2603 assert_eq!(None, tx.page(PgId(6)));
2604 Ok(())
2605 })?;
2606 Ok(())
2607 }
2608
2609 #[test]
2610 fn test_dbstats_sub() {
2611 let a = DbStats::default();
2612 let b = DbStats::default();
2613 a.tx_stats.inc_page_count(3);
2614 a.set_free_page_n(4);
2615 b.tx_stats.inc_page_count(10);
2616 b.set_free_page_n(14);
2617 let diff = b.sub(&a);
2618 assert_eq!(7, diff.tx_stats.page_count());
2619 assert_eq!(14, diff.free_page_n());
2620 }
2621
2622 #[test]
2623 fn test_db_batch() -> crate::Result<()> {
2624 let mut db = TestDb::new()?;
2625 db.update(|mut tx| {
2626 let _ = tx.create_bucket("widgets")?;
2627 Ok(())
2628 })?;
2629
2630 let n = 2;
2631 let mut threads = Vec::with_capacity(n);
2632 for i in 0..n {
2633 let mut t_db = db.clone_db();
2634 let join = thread::spawn(move || {
2635 t_db.batch(move |tx| {
2636 let mut b = tx.bucket_mut("widgets").unwrap();
2637 b.put(format!("{}", i), "")
2638 })
2639 });
2640 threads.push(join);
2641 }
2642
2643 for t in threads {
2644 t.join().unwrap()?;
2645 }
2646
2647 db.view(|tx| {
2648 let b = tx.bucket("widgets").unwrap();
2649 for i in 0..n {
2650 let g = b.get(format!("{}", i));
2651 assert!(g.is_some(), "key not found {}", i);
2652 }
2653 Ok(())
2654 })?;
2655
2656 Ok(())
2657 }
2658
2659 #[test]
2660 #[ignore]
2661 fn test_db_batch_panic() {
2662 todo!()
2663 }
2664
2665 #[test]
2666 #[ignore]
2667 fn test_db_batch_full() {
2668 todo!()
2669 }
2670
2671 #[test]
2672 #[ignore]
2673 fn test_db_batch_time() {
2674 todo!()
2675 }
2676
2677 #[test]
2678 #[ignore]
2679 fn test_dbunmap() {
2680 todo!()
2681 }
2682
2683 #[test]
2684 #[ignore]
2685 fn benchmark_dbbatch_automatic() {
2686 todo!()
2687 }
2688
2689 #[test]
2690 #[ignore]
2691 fn benchmark_dbbatch_single() {
2692 todo!()
2693 }
2694
2695 #[test]
2696 #[ignore]
2697 fn benchmark_dbbatch_manual10x100() {
2698 todo!()
2699 }
2700}