1use crate::arch::size::MAX_ALLOC_SIZE;
2use crate::bucket::{
3 BucketCell, BucketIApi, BucketImpl, BucketR, BucketRW, BucketRwIApi, BucketRwImpl, BucketW,
4};
5use crate::common::bump::PinBump;
6use crate::common::cell::{Ref, RefCell, RefMut};
7use crate::common::defaults::IGNORE_NO_SYNC;
8use crate::common::lock::{LockGuard, PinLockGuard};
9use crate::common::memory::BCell;
10use crate::common::page::meta::{MappedMetaPage, Meta, MetaPage};
11use crate::common::page::tree::branch::MappedBranchPage;
12use crate::common::page::tree::TreePage;
13use crate::common::page::{CoerciblePage, MutPage, PageHeader, PageInfo, RefPage};
14use crate::common::pool::SyncReusable;
15use crate::common::self_owned::SelfOwned;
16use crate::common::{BVec, HashMap, PgId, SplitRef, TxId};
17use crate::cursor::{CursorImpl, InnerCursor};
18use crate::db::{AllocateResult, DbIApi, DbMutIApi, DbShared};
19use crate::iter::{BucketIter, BucketIterMut};
20use crate::tx::check::TxICheck;
21use crate::TxCheck;
22use aliasable::boxed::AliasableBox;
23use aligners::{alignment, AlignedBytes};
24use bumpalo::Bump;
25use parking_lot::{Mutex, RwLockReadGuard, RwLockUpgradableReadGuard};
26use std::alloc::Layout;
27use std::borrow::Cow;
28use std::fmt::{Debug, Formatter};
29use std::io::Write;
30use std::marker::PhantomData;
31use std::mem;
32use std::mem::MaybeUninit;
33use std::ops::{Deref, SubAssign};
34use std::pin::Pin;
35use std::ptr::{addr_of, addr_of_mut};
36use std::slice::from_raw_parts_mut;
37use std::sync::atomic::{AtomicI64, Ordering};
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40
41pub trait TxApi<'tx>: TxCheck<'tx> {
43 fn id(&self) -> TxId;
70
71 fn size(&self) -> u64;
92
93 fn writable(&self) -> bool;
115
116 fn cursor<'a>(&'a self) -> CursorImpl<'tx, 'a>;
144
145 fn stats(&self) -> Arc<TxStats>;
147
148 fn bucket<'a, T: AsRef<[u8]>>(&'a self, name: T) -> Option<BucketImpl<'tx, 'a>>;
173
174 fn bucket_path<'a, T: AsRef<[u8]>>(&'a self, names: &[T]) -> Option<BucketImpl<'tx, 'a>>;
175
176 #[deprecated(since = "1.3.9", note = "please use `iter_*` methods instead")]
177 fn for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
212 &self, f: F,
213 ) -> crate::Result<()>
214 where
215 'tx: 'a;
216
217 fn page(&self, id: PgId) -> Option<PageInfo>;
244
245 fn iter_buckets<'a>(&'a self) -> BucketIter<'tx, 'a>;
246}
247
248pub trait TxRwRefApi<'tx>: TxApi<'tx> {
250 fn bucket_mut<'a, T: AsRef<[u8]>>(&'a mut self, name: T) -> Option<BucketRwImpl<'tx, 'a>>;
282
283 fn bucket_mut_path<'a, T: AsRef<[u8]>>(
284 &'a mut self, names: &[T],
285 ) -> Option<BucketRwImpl<'tx, 'a>>;
286
287 fn create_bucket<'a, T: AsRef<[u8]>>(
313 &'a mut self, name: T,
314 ) -> crate::Result<BucketRwImpl<'tx, 'a>>;
315
316 fn create_bucket_if_not_exists<'a, T: AsRef<[u8]>>(
342 &'a mut self, name: T,
343 ) -> crate::Result<BucketRwImpl<'tx, 'a>>;
344
345 fn create_bucket_path<'a, T: AsRef<[u8]>>(
346 &'a mut self, names: &[T],
347 ) -> crate::Result<BucketRwImpl<'tx, 'a>>;
348
349 fn delete_bucket<T: AsRef<[u8]>>(&mut self, name: T) -> crate::Result<()>;
378
379 fn on_commit<F: FnMut() + 'tx>(&mut self, f: F);
400
401 fn iter_mut_buckets<'a>(&'a mut self) -> BucketIterMut<'tx, 'a>;
402}
403
404pub trait TxRwApi<'tx>: TxRwRefApi<'tx> {
406 fn rollback(self) -> crate::Result<()>;
435
436 fn commit(self) -> crate::Result<()>;
464}
465
466#[derive(Default)]
468pub struct TxStats {
469 page_count: AtomicI64,
473 page_alloc: AtomicI64,
475
476 cursor_count: AtomicI64,
480
481 node_count: AtomicI64,
485 node_deref: AtomicI64,
487
488 rebalance: AtomicI64,
492 rebalance_time: Mutex<Duration>,
494
495 split: AtomicI64,
499 spill: AtomicI64,
501 spill_time: Mutex<Duration>,
503
504 write: AtomicI64,
508 write_time: Mutex<Duration>,
510}
511
512impl TxStats {
513 pub fn page_alloc(&self) -> i64 {
515 self.page_alloc.load(Ordering::Acquire)
516 }
517
518 pub(crate) fn inc_page_alloc(&self, delta: i64) {
519 self.page_alloc.fetch_add(delta, Ordering::Relaxed);
520 }
521
522 pub fn page_count(&self) -> i64 {
524 self.page_count.load(Ordering::Acquire)
525 }
526
527 pub(crate) fn inc_page_count(&self, delta: i64) {
528 self.page_count.fetch_add(delta, Ordering::Relaxed);
529 }
530
531 pub fn cursor_count(&self) -> i64 {
533 self.cursor_count.load(Ordering::Acquire)
534 }
535
536 pub(crate) fn inc_cursor_count(&self, delta: i64) {
537 self.cursor_count.fetch_add(delta, Ordering::Relaxed);
538 }
539
540 pub fn node_count(&self) -> i64 {
542 self.node_count.load(Ordering::Acquire)
543 }
544
545 pub(crate) fn inc_node_count(&self, delta: i64) {
546 self.node_count.fetch_add(delta, Ordering::Relaxed);
547 }
548
549 pub fn node_deref(&self) -> i64 {
551 self.node_deref.load(Ordering::Acquire)
552 }
553
554 pub(crate) fn inc_node_deref(&self, delta: i64) {
555 self.node_deref.fetch_add(delta, Ordering::Relaxed);
556 }
557
558 pub fn rebalance(&self) -> i64 {
560 self.rebalance.load(Ordering::Acquire)
561 }
562
563 pub(crate) fn inc_rebalance(&self, delta: i64) {
564 self.rebalance.fetch_add(delta, Ordering::Relaxed);
565 }
566
567 pub fn rebalance_time(&self) -> Duration {
569 *self.rebalance_time.lock()
570 }
571
572 pub(crate) fn inc_rebalance_time(&self, delta: Duration) {
573 *self.rebalance_time.lock() += delta;
574 }
575
576 pub fn split(&self) -> i64 {
578 self.split.load(Ordering::Acquire)
579 }
580
581 pub(crate) fn inc_split(&self, delta: i64) {
582 self.split.fetch_add(delta, Ordering::Relaxed);
583 }
584
585 pub fn spill(&self) -> i64 {
587 self.spill.load(Ordering::Acquire)
588 }
589
590 pub(crate) fn inc_spill(&self, delta: i64) {
591 self.spill.fetch_add(delta, Ordering::Relaxed);
592 }
593
594 pub fn spill_time(&self) -> Duration {
596 *self.spill_time.lock()
597 }
598
599 pub(crate) fn inc_spill_time(&self, delta: Duration) {
600 *self.spill_time.lock() += delta;
601 }
602
603 pub fn write(&self) -> i64 {
605 self.write.load(Ordering::Acquire)
606 }
607
608 pub(crate) fn inc_write(&self, delta: i64) {
609 self.write.fetch_add(delta, Ordering::Relaxed);
610 }
611
612 pub fn write_time(&self) -> Duration {
614 *self.write_time.lock()
615 }
616
617 pub(crate) fn inc_write_time(&self, delta: Duration) {
618 *self.write_time.lock() += delta;
619 }
620
621 pub(crate) fn add_assign(&self, rhs: &TxStats) {
622 self.inc_page_count(rhs.page_count());
623 self.inc_page_alloc(rhs.page_alloc());
624 self.inc_cursor_count(rhs.cursor_count());
625 self.inc_node_count(rhs.node_count());
626 self.inc_node_deref(rhs.node_deref());
627 self.inc_rebalance(rhs.rebalance());
628 self.inc_rebalance_time(rhs.rebalance_time());
629 self.inc_split(rhs.split());
630 self.inc_spill(rhs.spill());
631 self.inc_spill_time(rhs.spill_time());
632 self.inc_write(rhs.write());
633 self.inc_write_time(rhs.write_time());
634 }
635
636 pub(crate) fn add(&self, rhs: &TxStats) -> TxStats {
637 let add = self.clone();
638 add.add_assign(rhs);
639 add
640 }
641
642 pub(crate) fn sub_assign(&self, rhs: &TxStats) {
643 self.inc_page_count(-rhs.page_count());
644 self.inc_page_alloc(-rhs.page_alloc());
645 self.inc_cursor_count(-rhs.cursor_count());
646 self.inc_node_count(-rhs.node_count());
647 self.inc_node_deref(-rhs.node_deref());
648 self.inc_rebalance(-rhs.rebalance());
649 self.rebalance_time.lock().sub_assign(rhs.rebalance_time());
650 self.inc_split(-rhs.split());
651 self.inc_spill(-rhs.spill());
652 self.spill_time.lock().sub_assign(rhs.spill_time());
653 self.inc_write(-rhs.write());
654 self.write_time.lock().sub_assign(rhs.write_time());
655 }
656
657 pub(crate) fn sub(&self, rhs: &TxStats) -> TxStats {
658 let sub = self.clone();
659 sub.sub_assign(rhs);
660 sub
661 }
662}
663
664impl Clone for TxStats {
665 fn clone(&self) -> Self {
666 TxStats {
667 page_count: self.page_count().into(),
668 page_alloc: self.page_alloc().into(),
669 cursor_count: self.cursor_count().into(),
670 node_count: self.node_count().into(),
671 node_deref: self.node_deref().into(),
672 rebalance: self.rebalance().into(),
673 rebalance_time: self.rebalance_time().into(),
674 split: self.split().into(),
675 spill: self.spill().into(),
676 spill_time: self.spill_time().into(),
677 write: self.write().into(),
678 write_time: self.write_time().into(),
679 }
680 }
681}
682
683impl PartialEq for TxStats {
684 fn eq(&self, other: &Self) -> bool {
685 self.page_count() == other.page_count()
686 && self.page_alloc() == other.page_alloc()
687 && self.cursor_count() == other.cursor_count()
688 && self.node_count() == other.node_count()
689 && self.node_deref() == other.node_deref()
690 && self.rebalance() == other.rebalance()
691 && self.rebalance_time() == other.rebalance_time()
692 && self.split() == other.split()
693 && self.spill() == other.spill()
694 && self.spill_time() == other.spill_time()
695 && self.write() == other.write()
696 && self.write_time() == other.write_time()
697 }
698}
699
700impl Eq for TxStats {}
701
702impl Debug for TxStats {
703 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
704 f.debug_struct("TxStats")
705 .field("page_count", &self.page_count())
706 .field("page_alloc", &self.page_alloc())
707 .field("cursor_count", &self.cursor_count())
708 .field("node_count", &self.node_count())
709 .field("node_deref", &self.node_deref())
710 .field("rebalance", &self.rebalance())
711 .field("rebalance_time", &self.rebalance_time())
712 .field("split", &self.split())
713 .field("spill", &self.spill())
714 .field("spill_time", &self.spill_time())
715 .field("write", &self.write())
716 .field("write_time", &self.write_time())
717 .finish()
718 }
719}
720
721pub(crate) enum AnyPage<'tx: 'a, 'a> {
722 Ref(RefPage<'tx>),
723 Pending(RefPage<'a>),
724}
725
726impl<'tx: 'a, 'a> Deref for AnyPage<'tx, 'a> {
727 type Target = RefPage<'a>;
728
729 #[inline]
730 fn deref(&self) -> &Self::Target {
731 match self {
732 AnyPage::Ref(r) => r,
733 AnyPage::Pending(p) => p,
734 }
735 }
736}
737
738#[derive(Copy, Clone, Default, PartialOrd, Ord, PartialEq, Eq)]
739pub(crate) enum TxClosingState {
740 #[default]
741 Rollback,
742 ExplicitRollback,
743 PhysicalRollback,
744 Commit,
745}
746
747impl TxClosingState {
748 #[inline]
749 pub(crate) fn is_rollback(&self) -> bool {
750 matches!(
751 self,
752 TxClosingState::Rollback
753 | TxClosingState::ExplicitRollback
754 | TxClosingState::PhysicalRollback
755 )
756 }
757
758 #[inline]
759 pub(crate) fn is_physical_rollback(&self) -> bool {
760 matches!(self, TxClosingState::PhysicalRollback)
761 }
762}
763
764pub(crate) trait TxIApi<'tx> {
765 fn bump(self) -> &'tx Bump;
766
767 fn page_size(self) -> usize;
768
769 fn meta<'a>(&'a self) -> Ref<'a, Meta>
770 where
771 'tx: 'a;
772
773 fn mem_page(self, id: PgId) -> RefPage<'tx>;
774
775 fn any_page<'a>(&'a self, id: PgId) -> AnyPage<'tx, 'a>;
776
777 fn api_id(self) -> TxId;
779
780 fn api_size(self) -> u64;
782
783 fn api_cursor(self) -> InnerCursor<'tx>;
785
786 fn api_stats(self) -> Arc<TxStats>;
788
789 fn root_bucket(self) -> BucketCell<'tx>;
790
791 fn api_bucket(self, name: &[u8]) -> Option<BucketCell<'tx>>;
793
794 fn api_bucket_path<T: AsRef<[u8]>>(self, names: &[T]) -> Option<BucketCell<'tx>>;
795
796 fn api_for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
798 &self, f: F,
799 ) -> crate::Result<()>
800 where
801 'tx: 'a;
802
803 fn for_each_page<F: FnMut(&RefPage<'tx>, usize, &mut BVec<PgId>)>(self, pg_id: PgId, f: &mut F);
805
806 fn for_each_page_internal<F: FnMut(&RefPage<'tx>, usize, &mut BVec<PgId>)>(
807 self, pgid_stack: &mut BVec<PgId>, f: &mut F,
808 );
809
810 fn rollback(self) -> crate::Result<()>;
811
812 fn api_page(&self, id: PgId) -> Option<PageInfo>;
814}
815
816pub(crate) trait TxRwIApi<'tx>: TxIApi<'tx> + TxICheck<'tx> {
817 fn freelist_free_page(self, txid: TxId, p: &PageHeader);
818
819 fn root_bucket_mut(self) -> BucketCell<'tx>;
820
821 fn allocate(
822 self, count: usize,
823 ) -> crate::Result<SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>>;
824
825 fn queue_page(self, page: SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>);
826
827 fn api_create_bucket(self, name: &[u8]) -> crate::Result<BucketCell<'tx>>;
829
830 fn api_create_bucket_if_not_exist(self, name: &[u8]) -> crate::Result<BucketCell<'tx>>;
832
833 fn api_create_bucket_path<T: AsRef<[u8]>>(self, names: &[T]) -> crate::Result<BucketCell<'tx>>;
834
835 fn api_delete_bucket(self, name: &[u8]) -> crate::Result<()>;
837
838 fn write(self) -> crate::Result<()>;
839
840 fn write_meta(self) -> crate::Result<()>;
841
842 fn api_on_commit(self, f: Box<dyn FnOnce() + 'tx>);
844
845 fn physical_rollback(self) -> crate::Result<()>;
846}
847
848pub struct TxR<'tx> {
849 b: &'tx Bump,
850 page_size: usize,
851 pub(crate) db: &'tx LockGuard<'tx, DbShared>,
852 pub(crate) stats: Option<Arc<TxStats>>,
853 pub(crate) meta: Meta,
854 marker: PhantomData<&'tx u8>,
855}
856
857pub struct TxW<'tx> {
858 pages: HashMap<'tx, PgId, SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>>,
859 commit_handlers: BVec<'tx, Box<dyn FnOnce() + 'tx>>,
860 no_sync: bool,
861 tx_closing_state: TxClosingState,
862 marker: PhantomData<&'tx u8>,
863}
864
865pub struct TxRW<'tx> {
866 pub(crate) r: TxR<'tx>,
867 w: Option<TxW<'tx>>,
868}
869
870#[derive(Copy, Clone)]
871pub struct TxCell<'tx> {
872 pub(crate) cell: BCell<'tx, TxRW<'tx>, BucketCell<'tx>>,
873}
874
875impl<'tx> SplitRef<TxR<'tx>, BucketCell<'tx>, TxW<'tx>> for TxCell<'tx> {
876 fn split_r(&self) -> Ref<TxR<'tx>> {
877 Ref::map(self.cell.borrow(), |c| &c.r)
878 }
879
880 fn split_ref(&self) -> (Ref<TxR<'tx>>, Ref<Option<TxW<'tx>>>) {
881 let (r, w) = Ref::map_split(self.cell.borrow(), |b| (&b.r, &b.w));
882 (r, w)
883 }
884
885 fn split_ow(&self) -> Ref<Option<TxW<'tx>>> {
886 Ref::map(self.cell.borrow(), |c| &c.w)
887 }
888
889 #[inline]
890 fn split_bound(&self) -> BucketCell<'tx> {
891 self.cell.bound()
892 }
893
894 fn split_r_mut(&self) -> RefMut<TxR<'tx>> {
895 RefMut::map(self.cell.borrow_mut(), |c| &mut c.r)
896 }
897
898 fn split_ow_mut(&self) -> RefMut<Option<TxW<'tx>>> {
899 RefMut::map(self.cell.borrow_mut(), |c| &mut c.w)
900 }
901}
902
903impl<'tx> TxIApi<'tx> for TxCell<'tx> {
904 #[inline]
905 fn bump(self) -> &'tx Bump {
906 self.split_r().b
907 }
908
909 #[inline]
910 fn page_size(self) -> usize {
911 self.split_r().page_size
912 }
913
914 fn meta<'a>(&'a self) -> Ref<'a, Meta>
915 where
916 'tx: 'a,
917 {
918 Ref::map(self.split_r(), |tx| &tx.meta)
919 }
920
921 fn mem_page(self, id: PgId) -> RefPage<'tx> {
922 self.split_r().db.page(id)
923 }
924
925 fn any_page<'a>(&'a self, id: PgId) -> AnyPage<'tx, 'a> {
926 if let Some(tx) = self.split_ow().as_ref() {
927 if let Some(page) = tx.pages.get(&id).map(|p| p.as_ref()) {
928 page.fast_check(id);
929 return AnyPage::Pending(*page);
930 }
931 }
932 let page = self.split_r().db.page(id);
933 page.fast_check(id);
934 AnyPage::Ref(page)
935 }
936
937 #[inline]
939 fn api_id(self) -> TxId {
940 self.split_r().meta.txid()
941 }
942
943 #[inline]
945 fn api_size(self) -> u64 {
946 let r = self.split_r();
947 r.meta.pgid().0 * r.meta.page_size() as u64
948 }
949
950 fn api_cursor(self) -> InnerCursor<'tx> {
952 let root_bucket = self.root_bucket();
953 root_bucket.i_cursor()
954 }
955
956 fn api_stats(self) -> Arc<TxStats> {
958 self.split_r().stats.as_ref().unwrap().clone()
959 }
960
961 #[inline]
962 fn root_bucket(self) -> BucketCell<'tx> {
963 self.split_bound()
964 }
965
966 fn api_bucket(self, name: &[u8]) -> Option<BucketCell<'tx>> {
968 let root_bucket = self.root_bucket();
969 root_bucket.api_bucket(name)
970 }
971
972 fn api_bucket_path<T: AsRef<[u8]>>(self, names: &[T]) -> Option<BucketCell<'tx>> {
973 let mut b = self.root_bucket();
974 for n in names {
975 let name = n.as_ref();
976 b = match b.api_bucket(name) {
977 None => return None,
978 Some(next_b) => next_b,
979 };
980 }
981 Some(b)
982 }
983
984 fn api_for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
986 &self, mut f: F,
987 ) -> crate::Result<()>
988 where
989 'tx: 'a,
990 {
991 let root_bucket = self.root_bucket();
992 root_bucket.api_for_each_bucket(|k| {
993 let bucket = root_bucket.api_bucket(k).unwrap();
994 f(k, bucket.into_impl())?;
995 Ok(())
996 })
997 }
998
999 fn for_each_page<F: FnMut(&RefPage<'tx>, usize, &mut BVec<PgId>)>(self, pg_id: PgId, f: &mut F) {
1001 let mut stack = BVec::with_capacity_in(10, self.bump());
1002 stack.push(pg_id);
1003 self.for_each_page_internal(&mut stack, f);
1004 }
1005
1006 fn for_each_page_internal<F: FnMut(&RefPage<'tx>, usize, &mut BVec<PgId>)>(
1007 self, pgid_stack: &mut BVec<PgId>, f: &mut F,
1008 ) {
1009 let p = self.mem_page(*pgid_stack.last().unwrap());
1010
1011 f(&p, pgid_stack.len() - 1, pgid_stack);
1013
1014 if let Some(branch_page) = MappedBranchPage::coerce_ref(&p) {
1016 for elem in branch_page.elements() {
1017 pgid_stack.push(elem.pgid());
1018 self.for_each_page_internal(pgid_stack, f);
1019 pgid_stack.pop();
1020 }
1021 }
1022 }
1023
1024 fn rollback(self) -> crate::Result<()> {
1025 if let Some(w) = self.split_ow_mut().as_mut() {
1026 w.tx_closing_state = TxClosingState::ExplicitRollback;
1027 }
1028 Ok(())
1029 }
1030
1031 fn api_page(&self, id: PgId) -> Option<PageInfo> {
1033 let r = self.split_r();
1034 if id >= r.meta.pgid() {
1035 return None;
1036 }
1037 let p = r.db.page(id);
1041 let id = p.id;
1042 let count = p.count as u64;
1043 let overflow_count = p.overflow as u64;
1044
1045 let t = if r.db.is_page_free(id) {
1046 Cow::Borrowed("free")
1047 } else {
1048 p.page_type()
1049 };
1050 let info = PageInfo {
1051 id: id.0,
1052 t,
1053 count,
1054 overflow_count,
1055 };
1056 Some(info)
1057 }
1058}
1059
1060impl<'tx> TxRwIApi<'tx> for TxCell<'tx> {
1061 fn freelist_free_page(self, txid: TxId, p: &PageHeader) {
1062 self.cell.borrow().r.db.free_page(txid, p)
1063 }
1064
1065 fn root_bucket_mut(self) -> BucketCell<'tx> {
1066 self.split_bound()
1067 }
1068
1069 fn allocate(
1070 self, count: usize,
1071 ) -> crate::Result<SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>> {
1072 let db = { self.cell.borrow().r.db };
1073 let page = match db.allocate(self, count as u64) {
1074 AllocateResult::Page(page) => page,
1075 AllocateResult::PageWithNewSize(page, min_size) => {
1076 db.get_mut().unwrap().mmap_to_new_size(min_size, self)?;
1077 page
1078 }
1079 };
1080
1081 Ok(page)
1082 }
1083
1084 fn queue_page(self, page: SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>) {
1085 let mut tx = self.cell.borrow_mut();
1086 if let Some(pending) = tx.w.as_mut().unwrap().pages.insert(page.id, page) {
1087 if pending.overflow == 0 {
1088 tx.r
1089 .db
1090 .get_mut()
1091 .unwrap()
1092 .repool_allocated(pending.into_owner());
1093 }
1094 }
1095 }
1096
1097 fn api_create_bucket(self, name: &[u8]) -> crate::Result<BucketCell<'tx>> {
1098 let root_bucket = self.root_bucket();
1099 root_bucket.api_create_bucket(name)
1100 }
1101
1102 fn api_create_bucket_if_not_exist(self, name: &[u8]) -> crate::Result<BucketCell<'tx>> {
1103 let root_bucket = self.root_bucket();
1104 root_bucket.api_create_bucket_if_not_exists(name)
1105 }
1106
1107 fn api_create_bucket_path<T: AsRef<[u8]>>(self, names: &[T]) -> crate::Result<BucketCell<'tx>> {
1108 let mut b = self.root_bucket();
1109 for n in names {
1110 let name = n.as_ref();
1111 b = b.api_create_bucket_if_not_exists(name)?;
1112 }
1113 Ok(b)
1114 }
1115
1116 fn api_delete_bucket(self, name: &[u8]) -> crate::Result<()> {
1117 let root_bucket = self.root_bucket();
1118 root_bucket.api_delete_bucket(name)
1119 }
1120
1121 fn write(self) -> crate::Result<()> {
1122 let (pages, db, page_size, no_sync) = {
1123 let mut tx = self.cell.borrow_mut();
1124 let mut swap_pages = HashMap::with_capacity_in(0, tx.r.b);
1125 mem::swap(&mut swap_pages, &mut tx.w.as_mut().unwrap().pages);
1127 let mut pages = BVec::from_iter_in(swap_pages.into_iter().map(|(_, page)| page), tx.r.b);
1128
1129 pages.sort_by_key(|page| page.id);
1131 (
1132 pages,
1133 tx.r.db,
1134 tx.r.page_size,
1135 tx.w.as_ref().unwrap().no_sync,
1136 )
1137 };
1138
1139 let r = self.split_r();
1140
1141 for page in &pages {
1143 let mut rem = (page.overflow as usize + 1) * page_size;
1144 let mut offset = page.id.0 * page_size as u64;
1145 let mut written = 0;
1146
1147 loop {
1149 let size = rem.min(MAX_ALLOC_SIZE.bytes() as usize - 1);
1150 let buf = &page.ref_owner()[written..size];
1151
1152 let size = db.write_all_at(buf, offset)?;
1153
1154 r.stats.as_ref().unwrap().inc_write(1);
1156
1157 rem -= size;
1158 if rem == 0 {
1159 break;
1160 }
1161
1162 offset += size as u64;
1163 written += size;
1164 }
1165 }
1166
1167 if !no_sync || IGNORE_NO_SYNC {
1168 db.fsync()?;
1169 }
1170
1171 for page in pages.into_iter() {
1172 if page.overflow == 0 {
1173 db.repool_allocated(page.into_owner());
1174 }
1175 }
1176 Ok(())
1177 }
1178
1179 fn write_meta(self) -> crate::Result<()> {
1180 let tx = self.cell.borrow();
1181 let page_size = tx.r.page_size;
1182
1183 let layout = Layout::from_size_align(page_size, mem::align_of::<MetaPage>()).unwrap();
1184 let ptr = tx.r.b.alloc_layout(layout);
1185
1186 let mut meta_page = unsafe { MappedMetaPage::new(ptr.as_ptr()) };
1187 tx.r.meta.write(&mut meta_page);
1188
1189 let db = tx.r.db;
1190 let offset = meta_page.page.id.0 * page_size as u64;
1191 let buf = unsafe { from_raw_parts_mut(ptr.as_ptr(), page_size) };
1192 db.write_all_at(buf, offset)?;
1193
1194 if !tx.w.as_ref().unwrap().no_sync || IGNORE_NO_SYNC {
1195 db.fsync()?;
1196 }
1197
1198 tx.r.stats.as_ref().unwrap().inc_write(1);
1199
1200 Ok(())
1201 }
1202
1203 fn api_on_commit(self, f: Box<dyn FnOnce() + 'tx>) {
1204 self
1205 .cell
1206 .borrow_mut()
1207 .w
1208 .as_mut()
1209 .unwrap()
1210 .commit_handlers
1211 .push(f);
1212 }
1213
1214 fn physical_rollback(self) -> crate::Result<()> {
1215 if let Some(w) = self.split_ow_mut().as_mut() {
1216 w.tx_closing_state = TxClosingState::PhysicalRollback;
1217 }
1218 Ok(())
1219 }
1220}
1221
1222pub struct TxImpl<'tx> {
1224 bump: SyncReusable<Pin<Box<PinBump>>>,
1225 db: Pin<AliasableBox<PinLockGuard<'tx, DbShared>>>,
1226 pub(crate) tx: TxCell<'tx>,
1227}
1228
1229impl<'tx> TxImpl<'tx> {
1230 pub(crate) fn new(
1231 bump: SyncReusable<Pin<Box<PinBump>>>, lock: RwLockReadGuard<'tx, DbShared>, meta: Meta,
1232 ) -> TxImpl<'tx> {
1233 let page_size = meta.page_size() as usize;
1234 let inline_bucket = meta.root();
1235 let mut uninit: MaybeUninit<TxImpl<'tx>> = MaybeUninit::uninit();
1236 let ptr = uninit.as_mut_ptr();
1237 unsafe {
1238 addr_of_mut!((*ptr).bump).write(bump);
1239
1240 let bump = Pin::as_ref(&*addr_of!((*ptr).bump)).bump().get_ref();
1241 addr_of_mut!((*ptr).db).write(AliasableBox::from_unique_pin(Box::pin(PinLockGuard::new(
1242 lock,
1243 ))));
1244 let db = Pin::as_ref(&*addr_of!((*ptr).db)).guard().get_ref();
1245 let tx = {
1246 let r = TxR {
1247 b: bump,
1248 page_size,
1249 db,
1250 meta,
1251 stats: Some(Default::default()),
1252 marker: Default::default(),
1253 };
1254
1255 let uninit_tx: MaybeUninit<(RefCell<TxRW>, BucketCell<'tx>)> = MaybeUninit::uninit();
1256 let cell_tx = bump.alloc(uninit_tx);
1257 let cell_tx_ptr = cell_tx.as_ptr().cast_mut();
1258 let const_cell_ptr = cell_tx_ptr.cast_const();
1259
1260 addr_of_mut!((*cell_tx_ptr).0).write(RefCell::new(TxRW { r, w: None }));
1261 addr_of_mut!((*cell_tx_ptr).1).write(BucketCell::new_r_in(
1262 bump,
1263 inline_bucket,
1264 TxCell {
1265 cell: BCell(const_cell_ptr, PhantomData),
1266 },
1267 None,
1268 ));
1269 TxCell {
1270 cell: BCell(cell_tx.assume_init_ref(), PhantomData),
1271 }
1272 };
1273 addr_of_mut!((*ptr).tx).write(tx);
1274 uninit.assume_init()
1275 }
1276 }
1277
1278 pub(crate) fn get_ref(&self) -> TxRef<'tx> {
1279 TxRef {
1280 tx: TxCell { cell: self.tx.cell },
1281 }
1282 }
1283}
1284
1285impl<'tx> Drop for TxImpl<'tx> {
1286 fn drop(&mut self) {
1287 let tx_id = self.id();
1288 let stats = self.tx.cell.borrow_mut().r.stats.take().unwrap();
1289 Pin::as_ref(&self.db).guard().remove_tx(tx_id, stats);
1290 }
1291}
1292
1293impl<'tx> TxApi<'tx> for TxImpl<'tx> {
1294 #[inline]
1295 fn id(&self) -> TxId {
1296 self.tx.api_id()
1297 }
1298
1299 #[inline]
1300 fn size(&self) -> u64 {
1301 self.tx.api_size()
1302 }
1303
1304 #[inline]
1305 fn writable(&self) -> bool {
1306 false
1307 }
1308
1309 fn cursor<'a>(&'a self) -> CursorImpl<'tx, 'a> {
1310 self.tx.api_cursor().into()
1311 }
1312
1313 fn stats(&self) -> Arc<TxStats> {
1314 self.tx.api_stats()
1315 }
1316
1317 fn bucket<'a, T: AsRef<[u8]>>(&'a self, name: T) -> Option<BucketImpl<'tx, 'a>> {
1318 self.tx.api_bucket(name.as_ref()).map(BucketImpl::from)
1319 }
1320
1321 fn bucket_path<'a, T: AsRef<[u8]>>(&'a self, names: &[T]) -> Option<BucketImpl<'tx, 'a>> {
1322 self.tx.api_bucket_path(names).map(BucketImpl::from)
1323 }
1324
1325 fn for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
1326 &self, f: F,
1327 ) -> crate::Result<()>
1328 where
1329 'tx: 'a,
1330 {
1331 self.tx.api_for_each(f)
1332 }
1333
1334 fn page(&self, id: PgId) -> Option<PageInfo> {
1335 self.tx.api_page(id)
1336 }
1337
1338 fn iter_buckets<'a>(&'a self) -> BucketIter<'tx, 'a> {
1339 BucketIter::new(self.tx.api_cursor())
1340 }
1341}
1342
1343pub struct TxRef<'tx> {
1345 pub(crate) tx: TxCell<'tx>,
1346}
1347
1348impl<'tx> TxApi<'tx> for TxRef<'tx> {
1349 #[inline]
1350 fn id(&self) -> TxId {
1351 self.tx.api_id()
1352 }
1353
1354 #[inline]
1355 fn size(&self) -> u64 {
1356 self.tx.api_size()
1357 }
1358
1359 #[inline]
1360 fn writable(&self) -> bool {
1361 false
1362 }
1363
1364 fn cursor<'a>(&'a self) -> CursorImpl<'tx, 'a> {
1365 self.tx.api_cursor().into()
1366 }
1367
1368 fn stats(&self) -> Arc<TxStats> {
1369 self.tx.api_stats()
1370 }
1371
1372 fn bucket<'a, T: AsRef<[u8]>>(&'a self, name: T) -> Option<BucketImpl<'tx, 'a>> {
1373 self.tx.api_bucket(name.as_ref()).map(BucketImpl::from)
1374 }
1375
1376 fn bucket_path<'a, T: AsRef<[u8]>>(&'a self, names: &[T]) -> Option<BucketImpl<'tx, 'a>> {
1377 self.tx.api_bucket_path(names).map(BucketImpl::from)
1378 }
1379
1380 fn for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
1381 &self, f: F,
1382 ) -> crate::Result<()>
1383 where
1384 'tx: 'a,
1385 {
1386 self.tx.api_for_each(f)
1387 }
1388
1389 fn page(&self, id: PgId) -> Option<PageInfo> {
1390 self.tx.api_page(id)
1391 }
1392
1393 fn iter_buckets<'a>(&'a self) -> BucketIter<'tx, 'a> {
1394 BucketIter::new(self.tx.api_cursor())
1395 }
1396}
1397
1398pub struct TxRwImpl<'tx> {
1400 bump: SyncReusable<Pin<Box<PinBump>>>,
1401 db: Pin<AliasableBox<PinLockGuard<'tx, DbShared>>>,
1402 pub(crate) tx: TxCell<'tx>,
1403}
1404
1405impl<'tx> TxRwImpl<'tx> {
1406 pub(crate) fn get_ref(&self) -> TxRwRef<'tx> {
1407 TxRwRef {
1408 tx: TxCell { cell: self.tx.cell },
1409 }
1410 }
1411
1412 pub(crate) fn new(
1413 bump: SyncReusable<Pin<Box<PinBump>>>, lock: RwLockUpgradableReadGuard<'tx, DbShared>,
1414 meta: Meta,
1415 ) -> TxRwImpl<'tx> {
1416 let no_sync = lock.options.no_sync();
1417 let page_size = meta.page_size() as usize;
1418 let inline_bucket = meta.root();
1419 let mut uninit: MaybeUninit<TxRwImpl<'tx>> = MaybeUninit::uninit();
1420 let ptr = uninit.as_mut_ptr();
1421 unsafe {
1422 addr_of_mut!((*ptr).bump).write(bump);
1423 let bump = Pin::as_ref(&*addr_of!((*ptr).bump)).bump().get_ref();
1424 addr_of_mut!((*ptr).db).write(AliasableBox::from_unique_pin(Box::pin(PinLockGuard::new(
1425 lock,
1426 ))));
1427
1428 let db = Pin::as_ref(&*addr_of!((*ptr).db)).guard().get_ref();
1429 let tx = {
1430 let tx_r = TxR {
1431 b: bump,
1432 page_size,
1433 db,
1434 meta,
1435 stats: Some(Default::default()),
1436 marker: Default::default(),
1437 };
1438 let tx_w = TxW {
1439 pages: HashMap::with_capacity_in(0, bump),
1440 commit_handlers: BVec::with_capacity_in(0, bump),
1441 no_sync,
1442 tx_closing_state: TxClosingState::Rollback,
1443 marker: Default::default(),
1444 };
1445
1446 let bucket_r = BucketR::new(inline_bucket);
1447 let bucket_w = BucketW::new_in(bump);
1448
1449 let uninit_tx: MaybeUninit<(RefCell<TxRW>, BucketCell<'tx>)> = MaybeUninit::uninit();
1450 let uninit_bucket: MaybeUninit<(RefCell<BucketRW<'tx>>, TxCell<'tx>)> =
1451 MaybeUninit::uninit();
1452 let cell_tx = bump.alloc(uninit_tx);
1453 let cell_tx_ptr = cell_tx.as_mut_ptr();
1454 let const_cell_tx_ptr = cell_tx_ptr.cast_const();
1455 let cell_bucket = bump.alloc(uninit_bucket);
1456 let cell_bucket_ptr = cell_bucket.as_mut_ptr();
1457
1458 addr_of_mut!((*cell_tx_ptr).0).write(RefCell::new(TxRW {
1459 r: tx_r,
1460 w: Some(tx_w),
1461 }));
1462 addr_of_mut!((*cell_bucket_ptr).0).write(RefCell::new(BucketRW {
1463 r: bucket_r,
1464 w: Some(bucket_w),
1465 }));
1466 addr_of_mut!((*cell_bucket_ptr).1).write(TxCell {
1467 cell: BCell(const_cell_tx_ptr, PhantomData),
1468 });
1469 addr_of_mut!((*cell_tx_ptr).1).write(BucketCell {
1470 cell: BCell(cell_bucket.assume_init_ref(), PhantomData),
1471 });
1472 TxCell {
1473 cell: BCell(cell_tx.assume_init_ref(), PhantomData),
1474 }
1475 };
1476 addr_of_mut!((*ptr).tx).write(tx);
1477 uninit.assume_init()
1478 }
1479 }
1480
1481 fn commit_freelist(&mut self) -> crate::Result<()> {
1482 let allocated_page = Pin::as_ref(&self.db).guard().commit_freelist(self.tx)?;
1483
1484 let freelist_page = match allocated_page {
1485 AllocateResult::Page(page) => page,
1486 AllocateResult::PageWithNewSize(page, min_size) => {
1487 Pin::as_ref(&self.db)
1488 .guard()
1489 .get_mut()
1490 .unwrap()
1491 .mmap_to_new_size(min_size, self.tx)?;
1492 page
1493 }
1494 };
1495 let pg_id = freelist_page.id;
1496 let mut tx = self.tx.cell.borrow_mut();
1497 tx.r.meta.set_free_list(pg_id);
1498 tx.w.as_mut().unwrap().pages.insert(pg_id, freelist_page);
1499 Ok(())
1500 }
1501}
1502
1503impl<'tx> Drop for TxRwImpl<'tx> {
1504 fn drop(&mut self) {
1505 let mut cell = self.tx.cell.borrow_mut();
1506 let tx_closing_state = cell.w.as_ref().unwrap().tx_closing_state;
1507 let tx_id = cell.r.meta.txid();
1508 let stats = cell.r.stats.take().unwrap();
1509 Pin::as_ref(&self.db)
1510 .guard()
1511 .remove_rw_tx(tx_closing_state, tx_id, stats);
1512 }
1513}
1514
1515impl<'tx> TxApi<'tx> for TxRwImpl<'tx> {
1516 #[inline]
1517 fn id(&self) -> TxId {
1518 self.tx.api_id()
1519 }
1520
1521 fn size(&self) -> u64 {
1522 self.tx.api_size()
1523 }
1524
1525 #[inline]
1526 fn writable(&self) -> bool {
1527 true
1528 }
1529
1530 fn cursor<'a>(&'a self) -> CursorImpl<'tx, 'a> {
1531 self.tx.api_cursor().into()
1532 }
1533
1534 fn stats(&self) -> Arc<TxStats> {
1535 self.tx.api_stats()
1536 }
1537
1538 fn bucket<'a, T: AsRef<[u8]>>(&'a self, name: T) -> Option<BucketImpl<'tx, 'a>> {
1539 self.tx.api_bucket(name.as_ref()).map(BucketImpl::from)
1540 }
1541
1542 fn bucket_path<'a, T: AsRef<[u8]>>(&'a self, names: &[T]) -> Option<BucketImpl<'tx, 'a>> {
1543 self.tx.api_bucket_path(names).map(BucketImpl::from)
1544 }
1545
1546 fn for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
1547 &self, f: F,
1548 ) -> crate::Result<()>
1549 where
1550 'tx: 'a,
1551 {
1552 self.tx.api_for_each(f)
1553 }
1554
1555 fn page(&self, id: PgId) -> Option<PageInfo> {
1556 self.tx.api_page(id)
1557 }
1558
1559 fn iter_buckets<'a>(&'a self) -> BucketIter<'tx, 'a> {
1560 BucketIter::new(self.tx.api_cursor())
1561 }
1562}
1563
1564impl<'tx> TxRwRefApi<'tx> for TxRwImpl<'tx> {
1565 fn bucket_mut<'a, T: AsRef<[u8]>>(&'a mut self, name: T) -> Option<BucketRwImpl<'tx, 'a>> {
1566 self.tx.api_bucket(name.as_ref()).map(BucketRwImpl::from)
1567 }
1568
1569 fn bucket_mut_path<'a, T: AsRef<[u8]>>(
1570 &'a mut self, names: &[T],
1571 ) -> Option<BucketRwImpl<'tx, 'a>> {
1572 self.tx.api_bucket_path(names).map(BucketRwImpl::from)
1573 }
1574
1575 fn create_bucket<'a, T: AsRef<[u8]>>(
1576 &'a mut self, name: T,
1577 ) -> crate::Result<BucketRwImpl<'tx, 'a>> {
1578 self
1579 .tx
1580 .api_create_bucket(name.as_ref())
1581 .map(BucketRwImpl::from)
1582 }
1583
1584 fn create_bucket_if_not_exists<'a, T: AsRef<[u8]>>(
1585 &'a mut self, name: T,
1586 ) -> crate::Result<BucketRwImpl<'tx, 'a>> {
1587 self
1588 .tx
1589 .api_create_bucket_if_not_exist(name.as_ref())
1590 .map(BucketRwImpl::from)
1591 }
1592
1593 fn create_bucket_path<'a, T: AsRef<[u8]>>(
1594 &'a mut self, names: &[T],
1595 ) -> crate::Result<BucketRwImpl<'tx, 'a>> {
1596 self
1597 .tx
1598 .api_create_bucket_path(names)
1599 .map(BucketRwImpl::from)
1600 }
1601
1602 fn delete_bucket<T: AsRef<[u8]>>(&mut self, name: T) -> crate::Result<()> {
1603 self.tx.api_delete_bucket(name.as_ref())
1604 }
1605
1606 fn on_commit<F: FnOnce() + 'tx>(&mut self, f: F) {
1607 self.tx.api_on_commit(Box::new(f))
1608 }
1609
1610 fn iter_mut_buckets<'a>(&'a mut self) -> BucketIterMut<'tx, 'a> {
1611 BucketIterMut::new(self.tx.api_cursor())
1612 }
1613}
1614
1615impl<'tx> TxRwApi<'tx> for TxRwImpl<'tx> {
1616 fn rollback(self) -> crate::Result<()> {
1617 self.tx.rollback()
1618 }
1619
1620 fn commit(mut self) -> crate::Result<()> {
1621 let tx_stats = {
1622 let mut tx = self.tx.cell.borrow_mut();
1623
1624 if tx.w.as_ref().unwrap().tx_closing_state == TxClosingState::ExplicitRollback {
1626 return Ok(());
1627 }
1628 tx.w.as_mut().unwrap().tx_closing_state = TxClosingState::Commit;
1629 tx.r.stats.as_ref().cloned().unwrap()
1630 };
1631
1632 let bump = self.tx.bump();
1633
1634 let start_time = Instant::now();
1635 self.tx.root_bucket().rebalance();
1636 if tx_stats.rebalance() > 0 {
1637 tx_stats.inc_rebalance_time(start_time.elapsed());
1638 }
1639 let opgid = self.tx.meta().pgid();
1640 let start_time = Instant::now();
1641 match self.tx.root_bucket().spill(bump) {
1642 Ok(_) => {
1643 tx_stats.inc_spill_time(start_time.elapsed());
1644 }
1645 Err(e) => {
1646 let _ = self.tx.physical_rollback();
1647 return Err(e);
1648 }
1649 }
1650 {
1651 let new_bucket = self.tx.cell.bound().split_r().bucket_header;
1652 let mut tx = self.tx.cell.borrow_mut();
1653 tx.r.meta.set_root(new_bucket);
1654
1655 let freelist_pg = tx.r.db.page(tx.r.meta.free_list());
1657 let tx_id = tx.r.meta.txid();
1658 Pin::as_ref(&self.db).guard().free_page(tx_id, &freelist_pg);
1659 }
1660 match self.commit_freelist() {
1663 Ok(_) => {}
1664 Err(e) => {
1665 let _ = self.tx.physical_rollback();
1666 return Err(e);
1667 }
1668 }
1669
1670 let new_pgid = self.tx.meta().pgid();
1671 let page_size = self.tx.meta().page_size();
1672 {
1673 let tx = self.tx.cell.borrow();
1674 for page in tx.w.as_ref().unwrap().pages.values() {
1675 assert!(page.id.0 > 1, "Invalid page id");
1676 }
1677 }
1678 if new_pgid > opgid {
1679 Pin::as_ref(&self.db)
1680 .guard()
1681 .grow((new_pgid.0 + 1) * page_size as u64)?;
1682 }
1683 let start_time = Instant::now();
1684 match self.tx.write() {
1685 Ok(_) => {}
1686 Err(e) => {
1687 let _ = self.tx.physical_rollback();
1688 return Err(e);
1689 }
1690 };
1691
1692 #[cfg(feature = "strict")]
1693 {
1694 let errors = self.tx.check();
1695 if !errors.is_empty() {
1696 panic!("check fail: {}", errors.join("\n"))
1697 }
1698 }
1699
1700 match self.tx.write_meta() {
1701 Ok(_) => {
1702 tx_stats.inc_write_time(start_time.elapsed());
1703 }
1704 Err(e) => {
1705 let _ = self.tx.physical_rollback();
1706 return Err(e);
1707 }
1708 }
1709
1710 let mut tx = self.tx.cell.borrow_mut();
1711 let mut commit_handlers = BVec::with_capacity_in(0, tx.r.b);
1712 mem::swap(
1713 &mut commit_handlers,
1714 &mut tx.w.as_mut().unwrap().commit_handlers,
1715 );
1716 for f in commit_handlers.into_iter() {
1717 f();
1718 }
1719 Ok(())
1720 }
1721}
1722
1723pub struct TxRwRef<'tx> {
1725 pub(crate) tx: TxCell<'tx>,
1726}
1727
1728impl<'tx> TxApi<'tx> for TxRwRef<'tx> {
1729 #[inline]
1730 fn id(&self) -> TxId {
1731 self.tx.api_id()
1732 }
1733
1734 fn size(&self) -> u64 {
1735 self.tx.api_size()
1736 }
1737
1738 #[inline]
1739 fn writable(&self) -> bool {
1740 true
1741 }
1742
1743 fn cursor<'a>(&'a self) -> CursorImpl<'tx, 'a> {
1744 self.tx.api_cursor().into()
1745 }
1746
1747 fn stats(&self) -> Arc<TxStats> {
1748 self.tx.api_stats()
1749 }
1750
1751 fn bucket<'a, T: AsRef<[u8]>>(&'a self, name: T) -> Option<BucketImpl<'tx, 'a>> {
1752 self.tx.api_bucket(name.as_ref()).map(BucketImpl::from)
1753 }
1754
1755 fn bucket_path<'a, T: AsRef<[u8]>>(&'a self, names: &[T]) -> Option<BucketImpl<'tx, 'a>> {
1756 self.tx.api_bucket_path(names).map(BucketImpl::from)
1757 }
1758
1759 fn for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
1760 &self, f: F,
1761 ) -> crate::Result<()>
1762 where
1763 'tx: 'a,
1764 {
1765 self.tx.api_for_each(f)
1766 }
1767
1768 fn page(&self, id: PgId) -> Option<PageInfo> {
1769 self.tx.api_page(id)
1770 }
1771
1772 fn iter_buckets<'a>(&'a self) -> BucketIter<'tx, 'a> {
1773 BucketIter::new(self.tx.api_cursor())
1774 }
1775}
1776
1777impl<'tx> TxRwRefApi<'tx> for TxRwRef<'tx> {
1778 fn bucket_mut<'a, T: AsRef<[u8]>>(&'a mut self, name: T) -> Option<BucketRwImpl<'tx, 'a>> {
1779 self.tx.api_bucket(name.as_ref()).map(BucketRwImpl::from)
1780 }
1781
1782 fn bucket_mut_path<'a, T: AsRef<[u8]>>(
1783 &'a mut self, names: &[T],
1784 ) -> Option<BucketRwImpl<'tx, 'a>> {
1785 self.tx.api_bucket_path(names).map(BucketRwImpl::from)
1786 }
1787
1788 fn create_bucket<'a, T: AsRef<[u8]>>(
1789 &'a mut self, name: T,
1790 ) -> crate::Result<BucketRwImpl<'tx, 'a>> {
1791 self
1792 .tx
1793 .api_create_bucket(name.as_ref())
1794 .map(BucketRwImpl::from)
1795 }
1796
1797 fn create_bucket_if_not_exists<'a, T: AsRef<[u8]>>(
1798 &'a mut self, name: T,
1799 ) -> crate::Result<BucketRwImpl<'tx, 'a>> {
1800 self
1801 .tx
1802 .api_create_bucket_if_not_exist(name.as_ref())
1803 .map(BucketRwImpl::from)
1804 }
1805
1806 fn create_bucket_path<'a, T: AsRef<[u8]>>(
1807 &'a mut self, names: &[T],
1808 ) -> crate::Result<BucketRwImpl<'tx, 'a>> {
1809 self
1810 .tx
1811 .api_create_bucket_path(names)
1812 .map(BucketRwImpl::from)
1813 }
1814
1815 fn delete_bucket<T: AsRef<[u8]>>(&mut self, name: T) -> crate::Result<()> {
1816 self.tx.api_delete_bucket(name.as_ref())
1817 }
1818
1819 fn on_commit<F: FnOnce() + 'tx>(&mut self, f: F) {
1820 self.tx.api_on_commit(Box::new(f))
1821 }
1822
1823 fn iter_mut_buckets<'a>(&'a mut self) -> BucketIterMut<'tx, 'a> {
1824 BucketIterMut::new(self.tx.api_cursor())
1825 }
1826}
1827
1828pub(crate) mod check {
1829 use crate::bucket::{BucketCell, BucketIApi};
1830 use crate::common::page::tree::branch::MappedBranchPage;
1831 use crate::common::page::tree::leaf::MappedLeafPage;
1832 use crate::common::page::tree::TreePage;
1833 use crate::common::page::{CoerciblePage, RefPage};
1834 use crate::common::refstack::RefStack;
1835 use crate::common::{BVec, HashMap, HashSet, PgId, SplitRef, ZERO_PGID};
1836 use crate::db::DbIApi;
1837 use crate::tx::{TxCell, TxIApi, TxImpl, TxR, TxRef, TxRwIApi, TxRwImpl, TxRwRef, TxW};
1838
1839 pub(crate) trait UnsealTx<'tx> {
1840 fn unseal(&self) -> impl TxIApi<'tx> + TxICheck<'tx>;
1841 }
1842
1843 pub(crate) trait UnsealRwTx<'tx>: UnsealTx<'tx> {
1844 fn unseal_rw(&self) -> impl TxRwIApi<'tx>;
1845 }
1846
1847 impl<'tx> UnsealTx<'tx> for TxImpl<'tx> {
1848 #[inline]
1849 fn unseal(&self) -> impl TxIApi<'tx> + TxICheck<'tx> {
1850 TxCell { cell: self.tx.cell }
1851 }
1852 }
1853
1854 impl<'tx> UnsealTx<'tx> for TxRef<'tx> {
1855 #[inline]
1856 fn unseal(&self) -> impl TxIApi<'tx> + TxICheck<'tx> {
1857 TxCell { cell: self.tx.cell }
1858 }
1859 }
1860
1861 impl<'tx> UnsealTx<'tx> for TxRwImpl<'tx> {
1862 #[inline]
1863 fn unseal(&self) -> impl TxIApi<'tx> + TxICheck<'tx> {
1864 TxCell { cell: self.tx.cell }
1865 }
1866 }
1867
1868 impl<'tx> UnsealTx<'tx> for TxRwRef<'tx> {
1869 #[inline]
1870 fn unseal(&self) -> impl TxIApi<'tx> + TxICheck<'tx> {
1871 self.tx
1872 }
1873 }
1874
1875 impl<'tx> UnsealRwTx<'tx> for TxRwImpl<'tx> {
1876 #[inline]
1877 fn unseal_rw(&self) -> impl TxRwIApi<'tx> {
1878 TxCell { cell: self.tx.cell }
1879 }
1880 }
1881
1882 impl<'tx> UnsealRwTx<'tx> for TxRwRef<'tx> {
1883 #[inline]
1884 fn unseal_rw(&self) -> impl TxRwIApi<'tx> {
1885 self.tx
1886 }
1887 }
1888
1889 pub trait TxCheck<'tx> {
1898 fn check(&self) -> Vec<String>;
1899 }
1900
1901 impl<'tx, T> TxCheck<'tx> for T
1902 where
1903 T: UnsealTx<'tx>,
1904 {
1905 fn check(&self) -> Vec<String> {
1906 let tx = self.unseal();
1907 tx.check()
1908 }
1909 }
1910
1911 pub(crate) trait TxICheck<'tx>:
1912 TxIApi<'tx> + SplitRef<TxR<'tx>, BucketCell<'tx>, TxW<'tx>>
1913 {
1914 fn check(self) -> Vec<String>;
1915
1916 fn check_bucket(
1917 &self, bucket: BucketCell<'tx>, reachable: &mut HashMap<PgId, RefPage<'tx>>,
1918 freed: &mut HashSet<PgId>, errors: &mut Vec<String>,
1919 );
1920
1921 fn recursively_check_pages(self, pg_id: PgId, errors: &mut Vec<String>);
1922
1923 fn recursively_check_pages_internal(
1924 self, pg_id: PgId, min_key_closed: &[u8], max_key_open: &[u8], pageid_stack: &RefStack<PgId>,
1925 errors: &mut Vec<String>,
1926 ) -> &'tx [u8];
1927
1928 fn verify_key_order(
1933 self, pg_id: PgId, page_type: &str, index: usize, key: &[u8], previous_key: &[u8],
1934 max_key_open: &[u8], pageid_stack: &RefStack<PgId>, errors: &mut Vec<String>,
1935 );
1936 }
1937
1938 impl<'tx> TxICheck<'tx> for TxCell<'tx> {
1939 fn check(self) -> Vec<String> {
1940 let mut errors = Vec::new();
1941 let bump = self.bump();
1942 let db = self.split_r().db;
1943 let freelist_count = db.freelist_count();
1944 let high_water = self.meta().pgid();
1945 let mut freed = HashSet::new_in(bump);
1949 let mut all = BVec::with_capacity_in(freelist_count as usize, bump);
1950 for _ in 0..freelist_count {
1951 all.push(ZERO_PGID);
1952 }
1953 db.freelist_copyall(&mut all);
1954 for id in &all {
1955 if freed.contains(id) {
1956 errors.push(format!("page {}: already freed", id));
1957 } else {
1958 freed.insert(*id);
1959 }
1960 }
1961
1962 let mut reachable = HashMap::new_in(bump);
1964 reachable.insert(PgId(0), RefPage::new(std::ptr::null_mut())); reachable.insert(PgId(1), RefPage::new(std::ptr::null_mut())); let freelist_pgid = self.meta().free_list();
1967 for i in 0..=self.mem_page(freelist_pgid).overflow {
1968 let pg_id = freelist_pgid + i as u64;
1969 reachable.insert(pg_id, self.mem_page(freelist_pgid));
1970 }
1971
1972 self.check_bucket(self.split_bound(), &mut reachable, &mut freed, &mut errors);
1974
1975 for i in 0..high_water.0 {
1977 let pg_id = PgId(i);
1978 if !reachable.contains_key(&pg_id) && !freed.contains(&pg_id) {
1979 errors.push(format!("page {}: unreachable unfreed", pg_id));
1980 }
1981 }
1982
1983 errors
1984 }
1985
1986 fn check_bucket(
1987 &self, bucket: BucketCell<'tx>, reachable: &mut HashMap<PgId, RefPage<'tx>>,
1988 freed: &mut HashSet<PgId>, errors: &mut Vec<String>,
1989 ) {
1990 if bucket.root() == ZERO_PGID {
1992 return;
1993 }
1994
1995 self.for_each_page(bucket.root(), &mut |p, _, pgid_stack| {
1996 if p.id > self.meta().pgid() {
1997 errors.push(format!(
1998 "page {}: out of bounds: {} (stack: {:?})",
1999 p.id,
2000 self.meta().pgid(),
2001 pgid_stack
2002 ));
2003 }
2004 for i in 0..=p.overflow {
2005 let id = p.id + i as u64;
2006 if reachable.contains_key(&id) {
2007 errors.push(format!(
2008 "page {}: multiple references (stack: {:?})",
2009 id, pgid_stack
2010 ));
2011 }
2012 reachable.insert(id, *p);
2013 }
2014
2015 if freed.contains(&p.id) {
2016 errors.push(format!("page {}: reachable freed", p.id));
2017 } else if !p.is_branch() && !p.is_leaf() {
2018 errors.push(format!(
2019 "page {}: invalid type: {} (stack: {:?})",
2020 p.id,
2021 p.page_type(),
2022 pgid_stack
2023 ));
2024 }
2025 });
2026
2027 self.recursively_check_pages(bucket.root(), errors);
2028
2029 bucket
2030 .api_for_each_bucket(|key| {
2031 let child = bucket.api_bucket(key).unwrap();
2032 self.check_bucket(child, reachable, freed, errors);
2033 Ok(())
2034 })
2035 .unwrap();
2036 }
2037
2038 fn recursively_check_pages(self, pg_id: PgId, errors: &mut Vec<String>) {
2039 let pgid_stack = RefStack::new(pg_id);
2040 self.recursively_check_pages_internal(pg_id, &[], &[], &pgid_stack, errors);
2041 }
2042
2043 fn recursively_check_pages_internal(
2044 self, pg_id: PgId, min_key_closed: &[u8], max_key_open: &[u8], pageid_stack: &RefStack<PgId>,
2045 errors: &mut Vec<String>,
2046 ) -> &'tx [u8] {
2047 let p = self.mem_page(pg_id);
2048 pageid_stack.push(pg_id);
2049 let mut max_key_in_subtree = [].as_slice();
2050 if let Some(branch_page) = MappedBranchPage::coerce_ref(&p) {
2051 let mut running_min = min_key_closed;
2052 let elements_len = branch_page.elements().len();
2053 for (i, (pg_id, key)) in branch_page
2054 .elements()
2055 .iter()
2056 .map(|e| {
2057 (e.pgid(), unsafe {
2058 e.key(branch_page.page_ptr().cast_const())
2059 })
2060 })
2061 .enumerate()
2062 {
2063 self.verify_key_order(
2064 pg_id,
2065 "branch",
2066 i,
2067 key,
2068 running_min,
2069 max_key_open,
2070 pageid_stack,
2071 errors,
2072 );
2073 let mut max_key = max_key_open;
2074 if i < elements_len - 1 {
2075 max_key = branch_page.get_elem(i as u16 + 1).unwrap().key();
2076 }
2077 max_key_in_subtree =
2078 self.recursively_check_pages_internal(pg_id, key, max_key, pageid_stack, errors);
2079 running_min = max_key_in_subtree;
2080 }
2081 return max_key_in_subtree;
2082 } else if let Some(leaf_page) = MappedLeafPage::coerce_ref(&p) {
2083 let mut running_min = min_key_closed;
2084 for (i, key) in leaf_page
2085 .elements()
2086 .iter()
2087 .map(|e| unsafe { e.key(leaf_page.page_ptr().cast_const()) })
2088 .enumerate()
2089 {
2090 self.verify_key_order(
2091 pg_id,
2092 "leaf",
2093 i,
2094 key,
2095 running_min,
2096 max_key_open,
2097 pageid_stack,
2098 errors,
2099 );
2100 running_min = key;
2101 }
2102 if p.count > 0 {
2103 return leaf_page.get_elem(p.count - 1).unwrap().key();
2104 }
2105 } else {
2106 errors.push(format!("unexpected page type for pgId: {}", pg_id));
2107 }
2108 &[]
2109 }
2110
2111 fn verify_key_order(
2116 self, pg_id: PgId, page_type: &str, index: usize, key: &[u8], previous_key: &[u8],
2117 max_key_open: &[u8], pageid_stack: &RefStack<PgId>, errors: &mut Vec<String>,
2118 ) {
2119 if index == 0 && !previous_key.is_empty() && previous_key > key {
2120 errors.push(format!("the first key[{}]={:02X?} on {} page({}) needs to be >= the key in the ancestor ({:02X?}). Stack: {:?}", index, key, page_type, pg_id, previous_key, pageid_stack));
2121 }
2122 if index > 0 {
2123 if previous_key > key {
2124 errors.push(format!("key[{}]=(hex){:02X?} on {} page({}) needs to be > (found <) than previous element (hex){:02X?}. Stack: {:?}", index, key, page_type, pg_id, previous_key, pageid_stack));
2125 } else if previous_key == key {
2126 errors.push(format!("key[{}]=(hex){:02X?} on {} page({}) needs to be > (found =) than previous element (hex){:02X?}. Stack: {:?}", index, key, page_type, pg_id, previous_key, pageid_stack));
2127 }
2128 }
2129 if !max_key_open.is_empty() && key >= max_key_open {
2130 errors.push(format!("key[{}]=(hex){:02X?} on {} page({}) needs to be < than key of the next element in ancestor (hex){:02X?}. Pages stack: {:?}", index, key, page_type, pg_id, previous_key, pageid_stack));
2131 }
2132 }
2133 }
2134}
2135
2136#[cfg(test)]
2137mod test {
2138 use crate::common::cell::RefCell;
2139 use crate::common::defaults::DEFAULT_PAGE_SIZE;
2140 use crate::test_support::TestDb;
2141 use crate::tx::check::TxCheck;
2142 use crate::tx::{TxRwApi, TxStats};
2143 use crate::{
2144 Bolt, BoltOptions, BucketApi, BucketRwApi, CursorApi, DbApi, DbRwAPI, Error, TxApi, TxImpl,
2145 TxRwRefApi,
2146 };
2147 use anyhow::anyhow;
2148 use std::time::Duration;
2149
2150 #[test]
2151 #[cfg(not(any(miri, feature = "test-mem-backend")))]
2152 fn test_tx_check_read_only() -> crate::Result<()> {
2153 let mut db = TestDb::new()?;
2154 db.update(|mut tx| {
2155 let mut b = tx.create_bucket("widgets")?;
2156 b.put("foo", "bar")?;
2157 Ok(())
2158 })?;
2159 let close_db = db.clone_db();
2160 close_db.close();
2161
2162 let file = db.tmp_file.as_ref().unwrap();
2163 let ro = Bolt::open_ro(file.as_ref());
2164 let ro_db = ro.unwrap();
2165 let tx = ro_db.begin()?;
2166 let errors = tx.check();
2167 assert!(errors.is_empty(), "{:?}", errors);
2168
2169 Ok(())
2170 }
2171
2172 #[test]
2173 fn test_tx_cursor() -> crate::Result<()> {
2174 let mut db = TestDb::new()?;
2175 db.update(|mut tx| {
2176 tx.create_bucket("widgets")?;
2177 tx.create_bucket("woojits")?;
2178 let mut c = tx.cursor();
2179 assert_eq!(Some(("widgets".as_bytes(), None)), c.first());
2180 assert_eq!(Some(("woojits".as_bytes(), None)), c.next());
2181 Ok(())
2182 })?;
2183 Ok(())
2184 }
2185
2186 #[test]
2187 fn test_tx_bucket() -> crate::Result<()> {
2188 let mut db = TestDb::new()?;
2189 db.update(|mut tx| {
2190 tx.create_bucket("widgets")?;
2191 assert!(tx.bucket("widgets").is_some(), "expected bucket");
2192 Ok(())
2193 })?;
2194 Ok(())
2195 }
2196
2197 #[test]
2198 fn test_tx_get_not_found() -> crate::Result<()> {
2199 let mut db = TestDb::new()?;
2200 db.update(|mut tx| {
2201 let mut b = tx.create_bucket("widgets")?;
2202 b.put("foo", "bar")?;
2203 assert_eq!(None, b.get("no_such_key"), "expected None");
2204 Ok(())
2205 })?;
2206 Ok(())
2207 }
2208
2209 #[test]
2210 fn test_tx_create_bucket() -> crate::Result<()> {
2211 let mut db = TestDb::new()?;
2212 db.update(|mut tx| {
2213 tx.create_bucket(b"widgets")?;
2214 Ok(())
2215 })?;
2216 db.view(|tx| {
2217 let bucket = tx.bucket(b"widgets");
2218 assert!(bucket.is_some(), "expected bucket");
2219 Ok(())
2220 })
2221 }
2222
2223 #[test]
2224 fn test_tx_create_bucket_if_not_exists() -> crate::Result<()> {
2225 let mut db = TestDb::new()?;
2226 db.update(|mut tx| {
2227 tx.create_bucket_if_not_exists("widgets")?;
2228 tx.create_bucket_if_not_exists("widgets")?;
2229 Ok(())
2230 })?;
2231 db.view(|tx| {
2232 assert!(tx.bucket("widgets").is_some());
2233 Ok(())
2234 })?;
2235 Ok(())
2236 }
2237
2238 #[test]
2239 fn test_tx_create_bucket_if_not_exists_err_bucket_name_required() -> crate::Result<()> {
2240 let mut db = TestDb::new()?;
2241 db.update(|mut tx| {
2242 assert_eq!(
2243 Some(Error::BucketNameRequired),
2244 tx.create_bucket_if_not_exists("").err()
2245 );
2246 Ok(())
2247 })?;
2248 Ok(())
2249 }
2250
2251 #[test]
2252 fn test_tx_create_bucket_err_bucket_exists() -> crate::Result<()> {
2253 let mut db = TestDb::new()?;
2254 db.update(|mut tx| {
2255 tx.create_bucket("widgets")?;
2256 Ok(())
2257 })?;
2258 db.update(|mut tx| {
2259 assert_eq!(Some(Error::BucketExists), tx.create_bucket("widgets").err());
2260 Ok(())
2261 })?;
2262 Ok(())
2263 }
2264
2265 #[test]
2266 fn test_tx_create_bucket_err_bucket_name_required() -> crate::Result<()> {
2267 let mut db = TestDb::new()?;
2268 db.update(|mut tx| {
2269 assert_eq!(Some(Error::BucketNameRequired), tx.create_bucket("").err());
2270 Ok(())
2271 })?;
2272 Ok(())
2273 }
2274
2275 #[test]
2276 fn test_tx_delete_bucket() -> crate::Result<()> {
2277 let mut db = TestDb::new()?;
2278 db.update(|mut tx| {
2279 let mut b = tx.create_bucket("widgets")?;
2280 b.put("foo", "bar")?;
2281 Ok(())
2282 })?;
2283 db.update(|mut tx| {
2284 tx.delete_bucket("widgets")?;
2285 assert!(tx.bucket("widgets").is_none());
2286 Ok(())
2287 })?;
2288 db.update(|mut tx| {
2289 let b = tx.create_bucket("widgets")?;
2290 assert!(b.get("widgets").is_none());
2291 Ok(())
2292 })?;
2293 Ok(())
2294 }
2295
2296 #[test]
2297 fn test_tx_delete_bucket_not_found() -> crate::Result<()> {
2298 let mut db = TestDb::new()?;
2299 db.update(|mut tx| {
2300 assert_eq!(
2301 Some(Error::BucketNotFound),
2302 tx.delete_bucket("widgets").err()
2303 );
2304 Ok(())
2305 })?;
2306 Ok(())
2307 }
2308
2309 #[test]
2310 fn test_tx_for_each_no_error() -> crate::Result<()> {
2311 let mut db = TestDb::new()?;
2312 db.update(|mut tx| {
2313 let mut b = tx.create_bucket("widgets")?;
2314 b.put("foo", "bar")?;
2315 tx.for_each(|_, _| Ok(()))?;
2316 Ok(())
2317 })?;
2318 Ok(())
2319 }
2320
2321 #[test]
2322 fn test_tx_for_each_with_error() -> crate::Result<()> {
2323 let mut db = TestDb::new()?;
2324 let result = db.update(|mut tx| {
2325 let mut b = tx.create_bucket("widgets")?;
2326 b.put("foo", "bar")?;
2327 tx.for_each(|_, _| Err(Error::Other(anyhow!("marker"))))?;
2328 Ok(())
2329 });
2330 let e = result.map_err(|e| e.to_string()).err().unwrap();
2331 assert_eq!("marker", e);
2332 Ok(())
2333 }
2334
2335 #[test]
2336 fn test_tx_on_commit() -> crate::Result<()> {
2337 let x = RefCell::new(0u64);
2338 let mut db = TestDb::new()?;
2339 db.update(|mut tx| {
2340 tx.on_commit(|| {
2341 *x.borrow_mut() += 1;
2342 });
2343 tx.on_commit(|| {
2344 *x.borrow_mut() += 2;
2345 });
2346 let mut b = tx.create_bucket("widgets")?;
2347 b.put("foo", "bar")?;
2348 Ok(())
2349 })?;
2350 assert_eq!(3, *x.borrow());
2351 Ok(())
2352 }
2353
2354 #[test]
2355 fn test_tx_on_commit_rollback() -> crate::Result<()> {
2356 let x = RefCell::new(0u64);
2357 let mut db = TestDb::new()?;
2358 let _ = db.update(|mut tx| {
2359 tx.on_commit(|| {
2360 *x.borrow_mut() += 1;
2361 });
2362 tx.on_commit(|| {
2363 *x.borrow_mut() += 2;
2364 });
2365 tx.create_bucket("widgets")?;
2366 Err(Error::Other(anyhow!("rollback")))
2367 });
2368 assert_eq!(0, *x.borrow());
2369 Ok(())
2370 }
2371
2372 #[test]
2373 #[ignore]
2374 fn test_tx_copy_file() {
2375 todo!()
2376 }
2377
2378 #[test]
2379 #[ignore]
2380 fn test_tx_copy_file_error_meta() {
2381 todo!()
2382 }
2383
2384 #[test]
2385 #[ignore]
2386 fn test_tx_copy_file_error_normal() {
2387 todo!()
2388 }
2389
2390 #[test]
2391 fn test_tx_rollback() -> crate::Result<()> {
2392 let mut db = TestDb::new()?;
2393 let mut tx = db.begin_rw_tx()?;
2394 tx.create_bucket("mybucket")?;
2395 tx.commit()?;
2396 let mut tx = db.begin_rw_tx()?;
2397 let mut b = tx.bucket_mut("mybucket").unwrap();
2398 b.put("k", "v")?;
2399 tx.rollback()?;
2400 let tx = db.begin_tx()?;
2401 let b = tx.bucket("mybucket").unwrap();
2402 assert_eq!(None, b.get("k"));
2403 drop(tx);
2404 Ok(())
2406 }
2407
2408 #[test]
2409 fn test_tx_release_range() -> crate::Result<()> {
2410 let initial_mmap_size = DEFAULT_PAGE_SIZE.bytes() as u64 * 100;
2414 let db_options = BoltOptions::builder()
2415 .initial_mmap_size(initial_mmap_size)
2416 .build();
2417 let db = TestDb::with_options(db_options)?;
2418 let bucket = "bucket";
2419
2420 let mut put_db = db.clone_db();
2421 let mut put = move |key, value| {
2422 put_db
2423 .update(|mut tx| {
2424 let mut b = tx.create_bucket_if_not_exists(bucket)?;
2425 b.put(key, value)?;
2426 Ok(())
2427 })
2428 .unwrap();
2429 };
2430
2431 let mut del_db = db.clone_db();
2432 let mut del = move |key| {
2433 del_db
2434 .update(|mut tx| {
2435 let mut b = tx.create_bucket_if_not_exists(bucket)?;
2436 b.delete(key)?;
2437 Ok(())
2438 })
2439 .unwrap();
2440 };
2441
2442 let open_read_tx = || db.begin_tx().unwrap();
2443
2444 let check_with_read_tx = |tx: &TxImpl, key, want_value| {
2445 let bucket = tx.bucket(bucket).unwrap();
2446 let value = bucket.get(key);
2447 assert_eq!(want_value, value);
2448 };
2449
2450 put("k1", "v1");
2451 let rtx1 = open_read_tx();
2452 put("k2", "v2");
2453 let hold1 = open_read_tx();
2454 put("k3", "v3");
2455 let hold2 = open_read_tx();
2456 del("k3");
2457 let rtx2 = open_read_tx();
2458 del("k1");
2459 let hold3 = open_read_tx();
2460 del("k2");
2461 let hold4 = open_read_tx();
2462 put("k4", "v4");
2463 let hold5 = open_read_tx();
2464
2465 drop(hold1);
2467 drop(hold2);
2468 drop(hold3);
2469 drop(hold4);
2470 drop(hold5);
2471
2472 put("k4", "v4");
2476
2477 check_with_read_tx(&rtx1, "k1", Some("v1".as_bytes()));
2479 check_with_read_tx(&rtx2, "k2", Some("v2".as_bytes()));
2480 drop(rtx1);
2481 drop(rtx2);
2482
2483 let rtx7 = open_read_tx();
2485 check_with_read_tx(&rtx7, "k1", None);
2486 check_with_read_tx(&rtx7, "k2", None);
2487 check_with_read_tx(&rtx7, "k3", None);
2488 check_with_read_tx(&rtx7, "k4", Some("v4".as_bytes()));
2489 Ok(())
2490 }
2491
2492 #[test]
2493 #[ignore]
2494 fn example_tx_copy_file() {
2495 todo!()
2496 }
2497
2498 #[test]
2499 fn test_tx_stats_get_and_inc_atomically() {
2500 let stats = TxStats::default();
2501
2502 stats.inc_page_count(1);
2503 assert_eq!(1, stats.page_count());
2504
2505 stats.inc_page_alloc(2);
2506 assert_eq!(2, stats.page_alloc());
2507
2508 stats.inc_cursor_count(3);
2509 assert_eq!(3, stats.cursor_count());
2510
2511 stats.inc_node_count(100);
2512 assert_eq!(100, stats.node_count());
2513
2514 stats.inc_node_deref(101);
2515 assert_eq!(101, stats.node_deref());
2516
2517 stats.inc_rebalance(1000);
2518 assert_eq!(1000, stats.rebalance());
2519
2520 stats.inc_rebalance_time(Duration::from_secs(1001));
2521 assert_eq!(1001, stats.rebalance_time().as_secs());
2522
2523 stats.inc_split(10000);
2524 assert_eq!(10000, stats.split());
2525
2526 stats.inc_spill(10001);
2527 assert_eq!(10001, stats.spill());
2528
2529 stats.inc_spill_time(Duration::from_secs(10001));
2530 assert_eq!(10001, stats.spill_time().as_secs());
2531
2532 stats.inc_write(100_000);
2533 assert_eq!(100_000, stats.write());
2534
2535 stats.inc_write_time(Duration::from_secs(100_001));
2536 assert_eq!(100_001, stats.write_time().as_secs());
2537
2538 let expected_stats = TxStats {
2539 page_count: 1.into(),
2540 page_alloc: 2.into(),
2541 cursor_count: 3.into(),
2542 node_count: 100.into(),
2543 node_deref: 101.into(),
2544 rebalance: 1000.into(),
2545 rebalance_time: Duration::from_secs(1001).into(),
2546 split: 10000.into(),
2547 spill: 10001.into(),
2548 spill_time: Duration::from_secs(10001).into(),
2549 write: 100_000.into(),
2550 write_time: Duration::from_secs(100_001).into(),
2551 };
2552
2553 assert_eq!(expected_stats, stats);
2554 }
2555
2556 #[test]
2557 fn test_tx_stats_sub() {
2558 let stats_a = TxStats {
2559 page_count: 1.into(),
2560 page_alloc: 2.into(),
2561 cursor_count: 3.into(),
2562 node_count: 100.into(),
2563 node_deref: 101.into(),
2564 rebalance: 1000.into(),
2565 rebalance_time: Duration::from_secs(1001).into(),
2566 split: 10000.into(),
2567 spill: 10001.into(),
2568 spill_time: Duration::from_secs(10001).into(),
2569 write: 100_000.into(),
2570 write_time: Duration::from_secs(100_001).into(),
2571 };
2572
2573 let stats_b = TxStats {
2574 page_count: 2.into(),
2575 page_alloc: 3.into(),
2576 cursor_count: 4.into(),
2577 node_count: 101.into(),
2578 node_deref: 102.into(),
2579 rebalance: 1001.into(),
2580 rebalance_time: Duration::from_secs(1002).into(),
2581 split: 11001.into(),
2582 spill: 11002.into(),
2583 spill_time: Duration::from_secs(11002).into(),
2584 write: 110_001.into(),
2585 write_time: Duration::from_secs(110_010).into(),
2586 };
2587
2588 let diff = stats_b.sub(&stats_a);
2589 let expected_stats = TxStats {
2590 page_count: 1.into(),
2591 page_alloc: 1.into(),
2592 cursor_count: 1.into(),
2593 node_count: 1.into(),
2594 node_deref: 1.into(),
2595 rebalance: 1.into(),
2596 rebalance_time: Duration::from_secs(1).into(),
2597 split: 1001.into(),
2598 spill: 1001.into(),
2599 spill_time: Duration::from_secs(1001).into(),
2600 write: 10001.into(),
2601 write_time: Duration::from_secs(10009).into(),
2602 };
2603
2604 assert_eq!(expected_stats, diff);
2605 }
2606
2607 #[test]
2608 #[ignore]
2609 fn test_tx_truncate_before_write() {
2610 todo!()
2611 }
2612
2613 #[test]
2614 fn test_tx_stats_add() {
2615 let stats_a = TxStats {
2616 page_count: 1.into(),
2617 page_alloc: 2.into(),
2618 cursor_count: 3.into(),
2619 node_count: 100.into(),
2620 node_deref: 101.into(),
2621 rebalance: 1000.into(),
2622 rebalance_time: Duration::from_secs(1001).into(),
2623 split: 10000.into(),
2624 spill: 10001.into(),
2625 spill_time: Duration::from_secs(10001).into(),
2626 write: 100_000.into(),
2627 write_time: Duration::from_secs(100_001).into(),
2628 };
2629
2630 let stats_b = TxStats {
2631 page_count: 2.into(),
2632 page_alloc: 3.into(),
2633 cursor_count: 4.into(),
2634 node_count: 101.into(),
2635 node_deref: 102.into(),
2636 rebalance: 1001.into(),
2637 rebalance_time: Duration::from_secs(1002).into(),
2638 split: 11001.into(),
2639 spill: 11002.into(),
2640 spill_time: Duration::from_secs(11002).into(),
2641 write: 110_001.into(),
2642 write_time: Duration::from_secs(110_010).into(),
2643 };
2644
2645 let add = stats_b.add(&stats_a);
2646 let expected_stats = TxStats {
2647 page_count: 3.into(),
2648 page_alloc: 5.into(),
2649 cursor_count: 7.into(),
2650 node_count: 201.into(),
2651 node_deref: 203.into(),
2652 rebalance: 2001.into(),
2653 rebalance_time: Duration::from_secs(2003).into(),
2654 split: 21001.into(),
2655 spill: 21003.into(),
2656 spill_time: Duration::from_secs(21003).into(),
2657 write: 210001.into(),
2658 write_time: Duration::from_secs(210011).into(),
2659 };
2660
2661 assert_eq!(expected_stats, add);
2662 }
2663}