bbolt_rs/
tx.rs

1use crate::arch::size::MAX_ALLOC_SIZE;
2use crate::bucket::{
3  BucketCell, BucketIApi, BucketImpl, BucketR, BucketRW, BucketRwIApi, BucketRwImpl, BucketW,
4};
5use crate::common::bump::PinBump;
6use crate::common::cell::{Ref, RefCell, RefMut};
7use crate::common::defaults::IGNORE_NO_SYNC;
8use crate::common::lock::{LockGuard, PinLockGuard};
9use crate::common::memory::BCell;
10use crate::common::page::meta::{MappedMetaPage, Meta, MetaPage};
11use crate::common::page::tree::branch::MappedBranchPage;
12use crate::common::page::tree::TreePage;
13use crate::common::page::{CoerciblePage, MutPage, PageHeader, PageInfo, RefPage};
14use crate::common::pool::SyncReusable;
15use crate::common::self_owned::SelfOwned;
16use crate::common::{BVec, HashMap, PgId, SplitRef, TxId};
17use crate::cursor::{CursorImpl, InnerCursor};
18use crate::db::{AllocateResult, DbIApi, DbMutIApi, DbShared};
19use crate::iter::{BucketIter, BucketIterMut};
20use crate::tx::check::TxICheck;
21use crate::TxCheck;
22use aliasable::boxed::AliasableBox;
23use aligners::{alignment, AlignedBytes};
24use bumpalo::Bump;
25use parking_lot::{Mutex, RwLockReadGuard, RwLockUpgradableReadGuard};
26use std::alloc::Layout;
27use std::borrow::Cow;
28use std::fmt::{Debug, Formatter};
29use std::io::Write;
30use std::marker::PhantomData;
31use std::mem;
32use std::mem::MaybeUninit;
33use std::ops::{Deref, SubAssign};
34use std::pin::Pin;
35use std::ptr::{addr_of, addr_of_mut};
36use std::slice::from_raw_parts_mut;
37use std::sync::atomic::{AtomicI64, Ordering};
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40
41/// Read-only transaction API
42pub trait TxApi<'tx>: TxCheck<'tx> {
43  /// Returns the transaction id.
44  ///
45  /// ```rust
46  /// use bbolt_rs::*;
47  ///
48  /// fn main() -> Result<()> {
49  ///   let mut db = Bolt::open_mem()?;
50  ///
51  ///   db.view(|tx| {
52  ///     assert_eq!(1, tx.id().0);
53  ///     Ok(())
54  ///   })?;
55  ///
56  ///   db.update(|mut tx| {
57  ///     assert_eq!(2, tx.id().0);
58  ///     Ok(())
59  ///   })?;
60  ///
61  ///   db.view(|tx| {
62  ///     assert_eq!(2, tx.id().0);
63  ///     Ok(())
64  ///   })?;
65  ///
66  ///   Ok(())
67  /// }
68  /// ```
69  fn id(&self) -> TxId;
70
71  /// Returns current database size in bytes as seen by this transaction.
72  ///
73  /// ```rust
74  /// use bbolt_rs::*;
75  ///
76  /// fn main() -> Result<()> {
77  ///   let mut db = Bolt::open_mem()?;
78  ///
79  ///   db.update(|mut tx| {
80  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
81  ///     Ok(())
82  ///   })?;
83  ///
84  ///   db.view(|tx| {
85  ///     assert_eq!((page_size::get() * 6) as u64, tx.size());
86  ///     Ok(())
87  ///   })?;
88  ///   Ok(())
89  /// }
90  /// ```
91  fn size(&self) -> u64;
92
93  /// Returns whether the transaction can perform write operations.
94  ///
95  /// ```rust
96  /// use bbolt_rs::*;
97  ///
98  /// fn main() -> Result<()> {
99  ///   let mut db = Bolt::open_mem()?;
100  ///
101  ///   db.update(|mut tx| {
102  ///     assert_eq!(true, tx.writable());
103  ///     Ok(())
104  ///   })?;
105  ///
106  ///   db.view(|tx| {
107  ///     assert_eq!(false, tx.writable());
108  ///     Ok(())
109  ///   })?;
110  ///
111  ///   Ok(())
112  /// }
113  /// ```
114  fn writable(&self) -> bool;
115
116  /// Creates a cursor associated with the root bucket.
117  /// All items in the cursor will return None value because all root bucket keys point to buckets.
118  ///
119  /// ```rust
120  /// use bbolt_rs::*;
121  ///
122  /// fn main() -> Result<()> {
123  ///   let mut db = Bolt::open_mem()?;
124  ///
125  ///   db.update(|mut tx| {
126  ///     tx.create_bucket_if_not_exists("test1")?;
127  ///     tx.create_bucket_if_not_exists("test2")?;
128  ///     tx.create_bucket_if_not_exists("test3")?;
129  ///     Ok(())
130  ///   })?;
131  ///
132  ///   db.view(|tx| {
133  ///     let mut c = tx.cursor();
134  ///     assert_eq!(Some((b"test1".as_slice(), None)), c.first());
135  ///     assert_eq!(Some((b"test2".as_slice(), None)), c.next());
136  ///     assert_eq!(Some((b"test3".as_slice(), None)), c.next());
137  ///     Ok(())
138  ///   })?;
139  ///
140  ///   Ok(())
141  /// }
142  /// ```
143  fn cursor<'a>(&'a self) -> CursorImpl<'tx, 'a>;
144
145  /// Retrieves a copy of the current transaction statistics.
146  fn stats(&self) -> Arc<TxStats>;
147
148  /// Retrieves a bucket by name.
149  /// Returns None if the bucket does not exist.
150  ///
151  /// ```rust
152  /// use bbolt_rs::*;
153  ///
154  /// fn main() -> Result<()> {
155  ///   let mut db = Bolt::open_mem()?;
156  ///
157  ///   db.update(|mut tx| {
158  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
159  ///     b.put("key", "value")?;
160  ///     Ok(())
161  ///   })?;
162  ///
163  ///   db.view(|tx| {
164  ///     let b = tx.bucket("test").unwrap();
165  ///     assert_eq!(Some(b"value".as_slice()), b.get("key"));
166  ///     Ok(())
167  ///   })?;
168  ///
169  ///   Ok(())
170  /// }
171  /// ```
172  fn bucket<'a, T: AsRef<[u8]>>(&'a self, name: T) -> Option<BucketImpl<'tx, 'a>>;
173
174  fn bucket_path<'a, T: AsRef<[u8]>>(&'a self, names: &[T]) -> Option<BucketImpl<'tx, 'a>>;
175
176  #[deprecated(since = "1.3.9", note = "please use `iter_*` methods instead")]
177  /// Executes a function for each key/value pair in a bucket.
178  /// Because ForEach uses a Cursor, the iteration over keys is in lexicographical order.
179  ///
180  /// If the provided function returns an error then the iteration is stopped and
181  /// the error is returned to the caller.
182  ///
183  /// ```rust
184  /// use bbolt_rs::*;
185  ///
186  /// fn main() -> Result<()> {
187  ///   let mut db = Bolt::open_mem()?;
188  ///
189  ///   db.update(|mut tx| {
190  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
191  ///     b.put("key1", "value1")?;
192  ///     b.put("key2", "value2")?;
193  ///     b.put("key3", "value3")?;
194  ///     Ok(())
195  ///   })?;
196  ///
197  ///   db.view(|tx| {;
198  ///     tx.for_each(|bk,b| {
199  ///       b.for_each(|k, v| {
200  ///         println!("{:?}->{:?}, {:?}", bk, k, v);
201  ///         Ok(())
202  ///       })?;
203  ///      Ok(())
204  ///     })?;
205  ///     Ok(())
206  ///   })?;
207  ///
208  ///   Ok(())
209  /// }
210  /// ```
211  fn for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
212    &self, f: F,
213  ) -> crate::Result<()>
214  where
215    'tx: 'a;
216
217  /// Returns page information for a given page number.
218  ///
219  /// This is only safe for concurrent use when used by a writable transaction.
220  /// ```rust
221  /// use bbolt_rs::*;
222  ///
223  /// fn main() -> Result<()> {
224  ///   let mut db = Bolt::open_mem()?;
225  ///
226  ///   db.update(|mut tx| {
227  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
228  ///     b.put("key", "value")?;
229  ///     Ok(())
230  ///   })?;
231  ///
232  ///   db.view(|tx| {
233  ///     let b = tx.bucket("test").unwrap();
234  ///     let page_id = b.root();
235  ///     let page_info = tx.page(page_id).unwrap();
236  ///     println!("{:?}", page_info);
237  ///     Ok(())
238  ///   })?;
239  ///
240  ///   Ok(())
241  /// }
242  /// ```
243  fn page(&self, id: PgId) -> Option<PageInfo>;
244
245  fn iter_buckets<'a>(&'a self) -> BucketIter<'tx, 'a>;
246}
247
248/// RW transaction API
249pub trait TxRwRefApi<'tx>: TxApi<'tx> {
250  /// Retrieves a mutable bucket by name.
251  ///
252  /// Returns None if the bucket does not exist.
253  ///
254  /// ```rust
255  /// use bbolt_rs::*;
256  ///
257  /// fn main() -> Result<()> {
258  ///   let mut db = Bolt::open_mem()?;
259  ///
260  ///   db.update(|mut tx| {
261  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
262  ///     b.put("key", "value")?;
263  ///     Ok(())
264  ///   })?;
265  ///
266  ///   db.update(|mut tx| {
267  ///     let mut b = tx.bucket_mut("test").unwrap();
268  ///     b.put("key", "new value")?;
269  ///     Ok(())
270  ///   })?;
271  ///
272  ///   db.view(|tx| {
273  ///     let b = tx.bucket("test").unwrap();
274  ///     assert_eq!(Some(b"new value".as_slice()), b.get("key"));
275  ///     Ok(())
276  ///   })?;
277  ///
278  ///   Ok(())
279  /// }
280  /// ```
281  fn bucket_mut<'a, T: AsRef<[u8]>>(&'a mut self, name: T) -> Option<BucketRwImpl<'tx, 'a>>;
282
283  fn bucket_mut_path<'a, T: AsRef<[u8]>>(
284    &'a mut self, names: &[T],
285  ) -> Option<BucketRwImpl<'tx, 'a>>;
286
287  /// Creates a new bucket.
288  ///
289  /// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
290  ///
291  /// ```rust
292  /// use bbolt_rs::*;
293  ///
294  /// fn main() -> Result<()> {
295  ///   let mut db = Bolt::open_mem()?;
296  ///
297  ///   db.update(|mut tx| {
298  ///     let mut b = tx.create_bucket("test")?;
299  ///     b.put("key", "value")?;
300  ///     Ok(())
301  ///   })?;
302  ///
303  ///   db.view(|tx| {
304  ///     let b = tx.bucket("test").unwrap();
305  ///     assert_eq!(Some(b"value".as_slice()), b.get("key"));
306  ///     Ok(())
307  ///   })?;
308  ///
309  ///   Ok(())
310  /// }
311  /// ```
312  fn create_bucket<'a, T: AsRef<[u8]>>(
313    &'a mut self, name: T,
314  ) -> crate::Result<BucketRwImpl<'tx, 'a>>;
315
316  /// Creates a new bucket if it doesn't already exist.
317  ///
318  /// Returns an error if the bucket name is blank, or if the bucket name is too long.
319  ///
320  /// ```rust
321  /// use bbolt_rs::*;
322  ///
323  /// fn main() -> Result<()> {
324  ///   let mut db = Bolt::open_mem()?;
325  ///
326  ///   db.update(|mut tx| {
327  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
328  ///     b.put("key", "value")?;
329  ///     Ok(())
330  ///   })?;
331  ///
332  ///   db.view(|tx| {
333  ///     let b = tx.bucket("test").unwrap();
334  ///     assert_eq!(Some(b"value".as_slice()), b.get("key"));
335  ///     Ok(())
336  ///   })?;
337  ///
338  ///   Ok(())
339  /// }
340  /// ```
341  fn create_bucket_if_not_exists<'a, T: AsRef<[u8]>>(
342    &'a mut self, name: T,
343  ) -> crate::Result<BucketRwImpl<'tx, 'a>>;
344
345  fn create_bucket_path<'a, T: AsRef<[u8]>>(
346    &'a mut self, names: &[T],
347  ) -> crate::Result<BucketRwImpl<'tx, 'a>>;
348
349  /// DeleteBucket deletes a bucket.
350  /// Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
351  ///
352  /// ```rust
353  /// use bbolt_rs::*;
354  ///
355  /// fn main() -> Result<()> {
356  ///   let mut db = Bolt::open_mem()?;
357  ///
358  ///   db.update(|mut tx| {
359  ///     let mut b = tx.create_bucket("test")?;
360  ///     b.put("key", "value")?;
361  ///     Ok(())
362  ///   })?;
363  ///
364  ///   db.update(|mut tx| {
365  ///     tx.delete_bucket("test")?;
366  ///     Ok(())
367  ///   })?;
368  ///
369  ///   db.view(|tx| {
370  ///     assert_eq!(false, tx.bucket("test").is_some());
371  ///     Ok(())
372  ///   })?;
373  ///
374  ///   Ok(())
375  /// }
376  /// ```
377  fn delete_bucket<T: AsRef<[u8]>>(&mut self, name: T) -> crate::Result<()>;
378
379  /// OnCommit adds a handler function to be executed after the transaction successfully commits.
380  ///
381  /// ```rust
382  /// use bbolt_rs::*;
383  /// use std::cell::RefCell;
384  ///
385  /// fn main() -> Result<()> {
386  ///   let mut db = Bolt::open_mem()?;
387  ///
388  ///   let tx_committed = RefCell::new(false);
389  ///   db.update(|mut tx| {
390  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
391  ///     tx.on_commit(|| *tx_committed.borrow_mut() = true);
392  ///     Ok(())
393  ///   })?;
394  ///
395  ///   assert_eq!(true, *tx_committed.borrow());
396  ///   Ok(())
397  /// }
398  /// ```
399  fn on_commit<F: FnMut() + 'tx>(&mut self, f: F);
400
401  fn iter_mut_buckets<'a>(&'a mut self) -> BucketIterMut<'tx, 'a>;
402}
403
404/// RW transaction API + Commit
405pub trait TxRwApi<'tx>: TxRwRefApi<'tx> {
406  /// Closes the transaction and ignores all previous updates.
407  ///
408  /// ```rust
409  /// use bbolt_rs::*;
410  ///
411  /// fn main() -> Result<()> {
412  ///   let mut db = Bolt::open_mem()?;
413  ///
414  ///   db.update(|mut tx| {
415  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
416  ///     b.put("key", "value")?;
417  ///     Ok(())
418  ///   })?;
419  ///
420  ///   let mut tx = db.begin_rw()?;
421  ///   let mut b = tx.bucket_mut("test").unwrap();
422  ///   b.put("key", "new value")?;
423  ///   tx.rollback()?;
424  ///
425  ///   db.view(|tx| {
426  ///     let b = tx.bucket("test").unwrap();
427  ///     assert_eq!(Some(b"value".as_slice()), b.get("key"));
428  ///     Ok(())
429  ///   })?;
430  ///
431  ///   Ok(())
432  /// }
433  /// ```
434  fn rollback(self) -> crate::Result<()>;
435
436  /// commit writes all changes to disk and updates the meta page.
437  /// Returns an error if a disk write error occurs
438  ///
439  /// ```rust
440  /// use bbolt_rs::*;
441  ///
442  /// fn main() -> Result<()> {
443  ///   let mut db = Bolt::open_mem()?;
444  ///
445  ///   let mut tx = db.begin_rw()?;
446  ///   tx.create_bucket_if_not_exists("test1")?;
447  ///   tx.create_bucket_if_not_exists("test2")?;
448  ///   tx.create_bucket_if_not_exists("test3")?;
449  ///   tx.commit()?;
450  ///
451  ///   db.view(|tx| {
452  ///     let mut c = tx.cursor();
453  ///     assert_eq!(Some((b"test1".as_slice(), None)), c.first());
454  ///     assert_eq!(Some((b"test2".as_slice(), None)), c.next());
455  ///     assert_eq!(Some((b"test3".as_slice(), None)), c.next());
456  ///     Ok(())
457  ///   })?;
458  ///
459  ///   Ok(())
460  /// }
461  /// ```
462
463  fn commit(self) -> crate::Result<()>;
464}
465
466/// Stats for the transaction
467#[derive(Default)]
468pub struct TxStats {
469  // Page statistics.
470  //
471  /// number of page allocations
472  page_count: AtomicI64,
473  /// total bytes allocated
474  page_alloc: AtomicI64,
475
476  // Cursor statistics.
477  //
478  /// number of cursors created
479  cursor_count: AtomicI64,
480
481  // Node statistics
482  //
483  /// number of node allocations
484  node_count: AtomicI64,
485  /// number of node dereferences
486  node_deref: AtomicI64,
487
488  // Rebalance statistics.
489  //
490  /// number of node rebalances
491  rebalance: AtomicI64,
492  /// total time spent rebalancing
493  rebalance_time: Mutex<Duration>,
494
495  // Split/Spill statistics.
496  //
497  /// number of nodes split
498  split: AtomicI64,
499  /// number of nodes spilled
500  spill: AtomicI64,
501  /// total time spent spilling
502  spill_time: Mutex<Duration>,
503
504  // Write statistics.
505  //
506  /// number of writes performed
507  write: AtomicI64,
508  /// total time spent writing to disk
509  write_time: Mutex<Duration>,
510}
511
512impl TxStats {
513  /// total bytes allocated
514  pub fn page_alloc(&self) -> i64 {
515    self.page_alloc.load(Ordering::Acquire)
516  }
517
518  pub(crate) fn inc_page_alloc(&self, delta: i64) {
519    self.page_alloc.fetch_add(delta, Ordering::Relaxed);
520  }
521
522  /// number of page allocations
523  pub fn page_count(&self) -> i64 {
524    self.page_count.load(Ordering::Acquire)
525  }
526
527  pub(crate) fn inc_page_count(&self, delta: i64) {
528    self.page_count.fetch_add(delta, Ordering::Relaxed);
529  }
530
531  /// number of cursors created
532  pub fn cursor_count(&self) -> i64 {
533    self.cursor_count.load(Ordering::Acquire)
534  }
535
536  pub(crate) fn inc_cursor_count(&self, delta: i64) {
537    self.cursor_count.fetch_add(delta, Ordering::Relaxed);
538  }
539
540  /// number of node allocations
541  pub fn node_count(&self) -> i64 {
542    self.node_count.load(Ordering::Acquire)
543  }
544
545  pub(crate) fn inc_node_count(&self, delta: i64) {
546    self.node_count.fetch_add(delta, Ordering::Relaxed);
547  }
548
549  /// number of node dereferences
550  pub fn node_deref(&self) -> i64 {
551    self.node_deref.load(Ordering::Acquire)
552  }
553
554  pub(crate) fn inc_node_deref(&self, delta: i64) {
555    self.node_deref.fetch_add(delta, Ordering::Relaxed);
556  }
557
558  /// number of node rebalances
559  pub fn rebalance(&self) -> i64 {
560    self.rebalance.load(Ordering::Acquire)
561  }
562
563  pub(crate) fn inc_rebalance(&self, delta: i64) {
564    self.rebalance.fetch_add(delta, Ordering::Relaxed);
565  }
566
567  /// total time spent rebalancing
568  pub fn rebalance_time(&self) -> Duration {
569    *self.rebalance_time.lock()
570  }
571
572  pub(crate) fn inc_rebalance_time(&self, delta: Duration) {
573    *self.rebalance_time.lock() += delta;
574  }
575
576  /// number of nodes split
577  pub fn split(&self) -> i64 {
578    self.split.load(Ordering::Acquire)
579  }
580
581  pub(crate) fn inc_split(&self, delta: i64) {
582    self.split.fetch_add(delta, Ordering::Relaxed);
583  }
584
585  /// number of nodes spilled
586  pub fn spill(&self) -> i64 {
587    self.spill.load(Ordering::Acquire)
588  }
589
590  pub(crate) fn inc_spill(&self, delta: i64) {
591    self.spill.fetch_add(delta, Ordering::Relaxed);
592  }
593
594  /// total time spent spilling
595  pub fn spill_time(&self) -> Duration {
596    *self.spill_time.lock()
597  }
598
599  pub(crate) fn inc_spill_time(&self, delta: Duration) {
600    *self.spill_time.lock() += delta;
601  }
602
603  /// number of writes performed
604  pub fn write(&self) -> i64 {
605    self.write.load(Ordering::Acquire)
606  }
607
608  pub(crate) fn inc_write(&self, delta: i64) {
609    self.write.fetch_add(delta, Ordering::Relaxed);
610  }
611
612  /// total time spent writing to disk
613  pub fn write_time(&self) -> Duration {
614    *self.write_time.lock()
615  }
616
617  pub(crate) fn inc_write_time(&self, delta: Duration) {
618    *self.write_time.lock() += delta;
619  }
620
621  pub(crate) fn add_assign(&self, rhs: &TxStats) {
622    self.inc_page_count(rhs.page_count());
623    self.inc_page_alloc(rhs.page_alloc());
624    self.inc_cursor_count(rhs.cursor_count());
625    self.inc_node_count(rhs.node_count());
626    self.inc_node_deref(rhs.node_deref());
627    self.inc_rebalance(rhs.rebalance());
628    self.inc_rebalance_time(rhs.rebalance_time());
629    self.inc_split(rhs.split());
630    self.inc_spill(rhs.spill());
631    self.inc_spill_time(rhs.spill_time());
632    self.inc_write(rhs.write());
633    self.inc_write_time(rhs.write_time());
634  }
635
636  pub(crate) fn add(&self, rhs: &TxStats) -> TxStats {
637    let add = self.clone();
638    add.add_assign(rhs);
639    add
640  }
641
642  pub(crate) fn sub_assign(&self, rhs: &TxStats) {
643    self.inc_page_count(-rhs.page_count());
644    self.inc_page_alloc(-rhs.page_alloc());
645    self.inc_cursor_count(-rhs.cursor_count());
646    self.inc_node_count(-rhs.node_count());
647    self.inc_node_deref(-rhs.node_deref());
648    self.inc_rebalance(-rhs.rebalance());
649    self.rebalance_time.lock().sub_assign(rhs.rebalance_time());
650    self.inc_split(-rhs.split());
651    self.inc_spill(-rhs.spill());
652    self.spill_time.lock().sub_assign(rhs.spill_time());
653    self.inc_write(-rhs.write());
654    self.write_time.lock().sub_assign(rhs.write_time());
655  }
656
657  pub(crate) fn sub(&self, rhs: &TxStats) -> TxStats {
658    let sub = self.clone();
659    sub.sub_assign(rhs);
660    sub
661  }
662}
663
664impl Clone for TxStats {
665  fn clone(&self) -> Self {
666    TxStats {
667      page_count: self.page_count().into(),
668      page_alloc: self.page_alloc().into(),
669      cursor_count: self.cursor_count().into(),
670      node_count: self.node_count().into(),
671      node_deref: self.node_deref().into(),
672      rebalance: self.rebalance().into(),
673      rebalance_time: self.rebalance_time().into(),
674      split: self.split().into(),
675      spill: self.spill().into(),
676      spill_time: self.spill_time().into(),
677      write: self.write().into(),
678      write_time: self.write_time().into(),
679    }
680  }
681}
682
683impl PartialEq for TxStats {
684  fn eq(&self, other: &Self) -> bool {
685    self.page_count() == other.page_count()
686      && self.page_alloc() == other.page_alloc()
687      && self.cursor_count() == other.cursor_count()
688      && self.node_count() == other.node_count()
689      && self.node_deref() == other.node_deref()
690      && self.rebalance() == other.rebalance()
691      && self.rebalance_time() == other.rebalance_time()
692      && self.split() == other.split()
693      && self.spill() == other.spill()
694      && self.spill_time() == other.spill_time()
695      && self.write() == other.write()
696      && self.write_time() == other.write_time()
697  }
698}
699
700impl Eq for TxStats {}
701
702impl Debug for TxStats {
703  fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
704    f.debug_struct("TxStats")
705      .field("page_count", &self.page_count())
706      .field("page_alloc", &self.page_alloc())
707      .field("cursor_count", &self.cursor_count())
708      .field("node_count", &self.node_count())
709      .field("node_deref", &self.node_deref())
710      .field("rebalance", &self.rebalance())
711      .field("rebalance_time", &self.rebalance_time())
712      .field("split", &self.split())
713      .field("spill", &self.spill())
714      .field("spill_time", &self.spill_time())
715      .field("write", &self.write())
716      .field("write_time", &self.write_time())
717      .finish()
718  }
719}
720
721pub(crate) enum AnyPage<'tx: 'a, 'a> {
722  Ref(RefPage<'tx>),
723  Pending(RefPage<'a>),
724}
725
726impl<'tx: 'a, 'a> Deref for AnyPage<'tx, 'a> {
727  type Target = RefPage<'a>;
728
729  #[inline]
730  fn deref(&self) -> &Self::Target {
731    match self {
732      AnyPage::Ref(r) => r,
733      AnyPage::Pending(p) => p,
734    }
735  }
736}
737
738#[derive(Copy, Clone, Default, PartialOrd, Ord, PartialEq, Eq)]
739pub(crate) enum TxClosingState {
740  #[default]
741  Rollback,
742  ExplicitRollback,
743  PhysicalRollback,
744  Commit,
745}
746
747impl TxClosingState {
748  #[inline]
749  pub(crate) fn is_rollback(&self) -> bool {
750    matches!(
751      self,
752      TxClosingState::Rollback
753        | TxClosingState::ExplicitRollback
754        | TxClosingState::PhysicalRollback
755    )
756  }
757
758  #[inline]
759  pub(crate) fn is_physical_rollback(&self) -> bool {
760    matches!(self, TxClosingState::PhysicalRollback)
761  }
762}
763
764pub(crate) trait TxIApi<'tx> {
765  fn bump(self) -> &'tx Bump;
766
767  fn page_size(self) -> usize;
768
769  fn meta<'a>(&'a self) -> Ref<'a, Meta>
770  where
771    'tx: 'a;
772
773  fn mem_page(self, id: PgId) -> RefPage<'tx>;
774
775  fn any_page<'a>(&'a self, id: PgId) -> AnyPage<'tx, 'a>;
776
777  /// See [TxApi::id]
778  fn api_id(self) -> TxId;
779
780  /// See [TxApi::size]
781  fn api_size(self) -> u64;
782
783  /// See [TxApi::cursor]
784  fn api_cursor(self) -> InnerCursor<'tx>;
785
786  /// See [TxApi::stats]
787  fn api_stats(self) -> Arc<TxStats>;
788
789  fn root_bucket(self) -> BucketCell<'tx>;
790
791  /// See [TxApi::bucket]
792  fn api_bucket(self, name: &[u8]) -> Option<BucketCell<'tx>>;
793
794  fn api_bucket_path<T: AsRef<[u8]>>(self, names: &[T]) -> Option<BucketCell<'tx>>;
795
796  /// See [TxApi::for_each]
797  fn api_for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
798    &self, f: F,
799  ) -> crate::Result<()>
800  where
801    'tx: 'a;
802
803  /// forEachPage iterates over every page within a given page and executes a function.
804  fn for_each_page<F: FnMut(&RefPage<'tx>, usize, &mut BVec<PgId>)>(self, pg_id: PgId, f: &mut F);
805
806  fn for_each_page_internal<F: FnMut(&RefPage<'tx>, usize, &mut BVec<PgId>)>(
807    self, pgid_stack: &mut BVec<PgId>, f: &mut F,
808  );
809
810  fn rollback(self) -> crate::Result<()>;
811
812  /// See [TxApi::page]
813  fn api_page(&self, id: PgId) -> Option<PageInfo>;
814}
815
816pub(crate) trait TxRwIApi<'tx>: TxIApi<'tx> + TxICheck<'tx> {
817  fn freelist_free_page(self, txid: TxId, p: &PageHeader);
818
819  fn root_bucket_mut(self) -> BucketCell<'tx>;
820
821  fn allocate(
822    self, count: usize,
823  ) -> crate::Result<SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>>;
824
825  fn queue_page(self, page: SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>);
826
827  /// See [TxRwRefApi::create_bucket]
828  fn api_create_bucket(self, name: &[u8]) -> crate::Result<BucketCell<'tx>>;
829
830  /// See [TxRwRefApi::create_bucket_if_not_exists]
831  fn api_create_bucket_if_not_exist(self, name: &[u8]) -> crate::Result<BucketCell<'tx>>;
832
833  fn api_create_bucket_path<T: AsRef<[u8]>>(self, names: &[T]) -> crate::Result<BucketCell<'tx>>;
834
835  /// See [TxRwRefApi::delete_bucket]
836  fn api_delete_bucket(self, name: &[u8]) -> crate::Result<()>;
837
838  fn write(self) -> crate::Result<()>;
839
840  fn write_meta(self) -> crate::Result<()>;
841
842  /// See [TxRwRefApi::on_commit]
843  fn api_on_commit(self, f: Box<dyn FnOnce() + 'tx>);
844
845  fn physical_rollback(self) -> crate::Result<()>;
846}
847
848pub struct TxR<'tx> {
849  b: &'tx Bump,
850  page_size: usize,
851  pub(crate) db: &'tx LockGuard<'tx, DbShared>,
852  pub(crate) stats: Option<Arc<TxStats>>,
853  pub(crate) meta: Meta,
854  marker: PhantomData<&'tx u8>,
855}
856
857pub struct TxW<'tx> {
858  pages: HashMap<'tx, PgId, SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>>,
859  commit_handlers: BVec<'tx, Box<dyn FnOnce() + 'tx>>,
860  no_sync: bool,
861  tx_closing_state: TxClosingState,
862  marker: PhantomData<&'tx u8>,
863}
864
865pub struct TxRW<'tx> {
866  pub(crate) r: TxR<'tx>,
867  w: Option<TxW<'tx>>,
868}
869
870#[derive(Copy, Clone)]
871pub struct TxCell<'tx> {
872  pub(crate) cell: BCell<'tx, TxRW<'tx>, BucketCell<'tx>>,
873}
874
875impl<'tx> SplitRef<TxR<'tx>, BucketCell<'tx>, TxW<'tx>> for TxCell<'tx> {
876  fn split_r(&self) -> Ref<TxR<'tx>> {
877    Ref::map(self.cell.borrow(), |c| &c.r)
878  }
879
880  fn split_ref(&self) -> (Ref<TxR<'tx>>, Ref<Option<TxW<'tx>>>) {
881    let (r, w) = Ref::map_split(self.cell.borrow(), |b| (&b.r, &b.w));
882    (r, w)
883  }
884
885  fn split_ow(&self) -> Ref<Option<TxW<'tx>>> {
886    Ref::map(self.cell.borrow(), |c| &c.w)
887  }
888
889  #[inline]
890  fn split_bound(&self) -> BucketCell<'tx> {
891    self.cell.bound()
892  }
893
894  fn split_r_mut(&self) -> RefMut<TxR<'tx>> {
895    RefMut::map(self.cell.borrow_mut(), |c| &mut c.r)
896  }
897
898  fn split_ow_mut(&self) -> RefMut<Option<TxW<'tx>>> {
899    RefMut::map(self.cell.borrow_mut(), |c| &mut c.w)
900  }
901}
902
903impl<'tx> TxIApi<'tx> for TxCell<'tx> {
904  #[inline]
905  fn bump(self) -> &'tx Bump {
906    self.split_r().b
907  }
908
909  #[inline]
910  fn page_size(self) -> usize {
911    self.split_r().page_size
912  }
913
914  fn meta<'a>(&'a self) -> Ref<'a, Meta>
915  where
916    'tx: 'a,
917  {
918    Ref::map(self.split_r(), |tx| &tx.meta)
919  }
920
921  fn mem_page(self, id: PgId) -> RefPage<'tx> {
922    self.split_r().db.page(id)
923  }
924
925  fn any_page<'a>(&'a self, id: PgId) -> AnyPage<'tx, 'a> {
926    if let Some(tx) = self.split_ow().as_ref() {
927      if let Some(page) = tx.pages.get(&id).map(|p| p.as_ref()) {
928        page.fast_check(id);
929        return AnyPage::Pending(*page);
930      }
931    }
932    let page = self.split_r().db.page(id);
933    page.fast_check(id);
934    AnyPage::Ref(page)
935  }
936
937  /// See [TxApi::id]
938  #[inline]
939  fn api_id(self) -> TxId {
940    self.split_r().meta.txid()
941  }
942
943  /// See [TxApi::size]
944  #[inline]
945  fn api_size(self) -> u64 {
946    let r = self.split_r();
947    r.meta.pgid().0 * r.meta.page_size() as u64
948  }
949
950  /// See [TxApi::cursor]
951  fn api_cursor(self) -> InnerCursor<'tx> {
952    let root_bucket = self.root_bucket();
953    root_bucket.i_cursor()
954  }
955
956  /// See [TxApi::stats]
957  fn api_stats(self) -> Arc<TxStats> {
958    self.split_r().stats.as_ref().unwrap().clone()
959  }
960
961  #[inline]
962  fn root_bucket(self) -> BucketCell<'tx> {
963    self.split_bound()
964  }
965
966  /// See [TxApi::bucket]
967  fn api_bucket(self, name: &[u8]) -> Option<BucketCell<'tx>> {
968    let root_bucket = self.root_bucket();
969    root_bucket.api_bucket(name)
970  }
971
972  fn api_bucket_path<T: AsRef<[u8]>>(self, names: &[T]) -> Option<BucketCell<'tx>> {
973    let mut b = self.root_bucket();
974    for n in names {
975      let name = n.as_ref();
976      b = match b.api_bucket(name) {
977        None => return None,
978        Some(next_b) => next_b,
979      };
980    }
981    Some(b)
982  }
983
984  /// See [TxApi::for_each]
985  fn api_for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
986    &self, mut f: F,
987  ) -> crate::Result<()>
988  where
989    'tx: 'a,
990  {
991    let root_bucket = self.root_bucket();
992    root_bucket.api_for_each_bucket(|k| {
993      let bucket = root_bucket.api_bucket(k).unwrap();
994      f(k, bucket.into_impl())?;
995      Ok(())
996    })
997  }
998
999  /// forEachPage iterates over every page within a given page and executes a function.
1000  fn for_each_page<F: FnMut(&RefPage<'tx>, usize, &mut BVec<PgId>)>(self, pg_id: PgId, f: &mut F) {
1001    let mut stack = BVec::with_capacity_in(10, self.bump());
1002    stack.push(pg_id);
1003    self.for_each_page_internal(&mut stack, f);
1004  }
1005
1006  fn for_each_page_internal<F: FnMut(&RefPage<'tx>, usize, &mut BVec<PgId>)>(
1007    self, pgid_stack: &mut BVec<PgId>, f: &mut F,
1008  ) {
1009    let p = self.mem_page(*pgid_stack.last().unwrap());
1010
1011    // Execute function.
1012    f(&p, pgid_stack.len() - 1, pgid_stack);
1013
1014    // Recursively loop over children.
1015    if let Some(branch_page) = MappedBranchPage::coerce_ref(&p) {
1016      for elem in branch_page.elements() {
1017        pgid_stack.push(elem.pgid());
1018        self.for_each_page_internal(pgid_stack, f);
1019        pgid_stack.pop();
1020      }
1021    }
1022  }
1023
1024  fn rollback(self) -> crate::Result<()> {
1025    if let Some(w) = self.split_ow_mut().as_mut() {
1026      w.tx_closing_state = TxClosingState::ExplicitRollback;
1027    }
1028    Ok(())
1029  }
1030
1031  /// See [TxApi::page]
1032  fn api_page(&self, id: PgId) -> Option<PageInfo> {
1033    let r = self.split_r();
1034    if id >= r.meta.pgid() {
1035      return None;
1036    }
1037    //TODO: Check if freelist loaded
1038    //WHEN: Freelists can be unloaded
1039
1040    let p = r.db.page(id);
1041    let id = p.id;
1042    let count = p.count as u64;
1043    let overflow_count = p.overflow as u64;
1044
1045    let t = if r.db.is_page_free(id) {
1046      Cow::Borrowed("free")
1047    } else {
1048      p.page_type()
1049    };
1050    let info = PageInfo {
1051      id: id.0,
1052      t,
1053      count,
1054      overflow_count,
1055    };
1056    Some(info)
1057  }
1058}
1059
1060impl<'tx> TxRwIApi<'tx> for TxCell<'tx> {
1061  fn freelist_free_page(self, txid: TxId, p: &PageHeader) {
1062    self.cell.borrow().r.db.free_page(txid, p)
1063  }
1064
1065  fn root_bucket_mut(self) -> BucketCell<'tx> {
1066    self.split_bound()
1067  }
1068
1069  fn allocate(
1070    self, count: usize,
1071  ) -> crate::Result<SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>> {
1072    let db = { self.cell.borrow().r.db };
1073    let page = match db.allocate(self, count as u64) {
1074      AllocateResult::Page(page) => page,
1075      AllocateResult::PageWithNewSize(page, min_size) => {
1076        db.get_mut().unwrap().mmap_to_new_size(min_size, self)?;
1077        page
1078      }
1079    };
1080
1081    Ok(page)
1082  }
1083
1084  fn queue_page(self, page: SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>) {
1085    let mut tx = self.cell.borrow_mut();
1086    if let Some(pending) = tx.w.as_mut().unwrap().pages.insert(page.id, page) {
1087      if pending.overflow == 0 {
1088        tx.r
1089          .db
1090          .get_mut()
1091          .unwrap()
1092          .repool_allocated(pending.into_owner());
1093      }
1094    }
1095  }
1096
1097  fn api_create_bucket(self, name: &[u8]) -> crate::Result<BucketCell<'tx>> {
1098    let root_bucket = self.root_bucket();
1099    root_bucket.api_create_bucket(name)
1100  }
1101
1102  fn api_create_bucket_if_not_exist(self, name: &[u8]) -> crate::Result<BucketCell<'tx>> {
1103    let root_bucket = self.root_bucket();
1104    root_bucket.api_create_bucket_if_not_exists(name)
1105  }
1106
1107  fn api_create_bucket_path<T: AsRef<[u8]>>(self, names: &[T]) -> crate::Result<BucketCell<'tx>> {
1108    let mut b = self.root_bucket();
1109    for n in names {
1110      let name = n.as_ref();
1111      b = b.api_create_bucket_if_not_exists(name)?;
1112    }
1113    Ok(b)
1114  }
1115
1116  fn api_delete_bucket(self, name: &[u8]) -> crate::Result<()> {
1117    let root_bucket = self.root_bucket();
1118    root_bucket.api_delete_bucket(name)
1119  }
1120
1121  fn write(self) -> crate::Result<()> {
1122    let (pages, db, page_size, no_sync) = {
1123      let mut tx = self.cell.borrow_mut();
1124      let mut swap_pages = HashMap::with_capacity_in(0, tx.r.b);
1125      // Clear out page cache early.
1126      mem::swap(&mut swap_pages, &mut tx.w.as_mut().unwrap().pages);
1127      let mut pages = BVec::from_iter_in(swap_pages.into_iter().map(|(_, page)| page), tx.r.b);
1128
1129      // Sort pages by id.
1130      pages.sort_by_key(|page| page.id);
1131      (
1132        pages,
1133        tx.r.db,
1134        tx.r.page_size,
1135        tx.w.as_ref().unwrap().no_sync,
1136      )
1137    };
1138
1139    let r = self.split_r();
1140
1141    // Write pages to disk in order.
1142    for page in &pages {
1143      let mut rem = (page.overflow as usize + 1) * page_size;
1144      let mut offset = page.id.0 * page_size as u64;
1145      let mut written = 0;
1146
1147      // Write out page in "max allocation" sized chunks.
1148      loop {
1149        let size = rem.min(MAX_ALLOC_SIZE.bytes() as usize - 1);
1150        let buf = &page.ref_owner()[written..size];
1151
1152        let size = db.write_all_at(buf, offset)?;
1153
1154        // Update statistics.
1155        r.stats.as_ref().unwrap().inc_write(1);
1156
1157        rem -= size;
1158        if rem == 0 {
1159          break;
1160        }
1161
1162        offset += size as u64;
1163        written += size;
1164      }
1165    }
1166
1167    if !no_sync || IGNORE_NO_SYNC {
1168      db.fsync()?;
1169    }
1170
1171    for page in pages.into_iter() {
1172      if page.overflow == 0 {
1173        db.repool_allocated(page.into_owner());
1174      }
1175    }
1176    Ok(())
1177  }
1178
1179  fn write_meta(self) -> crate::Result<()> {
1180    let tx = self.cell.borrow();
1181    let page_size = tx.r.page_size;
1182
1183    let layout = Layout::from_size_align(page_size, mem::align_of::<MetaPage>()).unwrap();
1184    let ptr = tx.r.b.alloc_layout(layout);
1185
1186    let mut meta_page = unsafe { MappedMetaPage::new(ptr.as_ptr()) };
1187    tx.r.meta.write(&mut meta_page);
1188
1189    let db = tx.r.db;
1190    let offset = meta_page.page.id.0 * page_size as u64;
1191    let buf = unsafe { from_raw_parts_mut(ptr.as_ptr(), page_size) };
1192    db.write_all_at(buf, offset)?;
1193
1194    if !tx.w.as_ref().unwrap().no_sync || IGNORE_NO_SYNC {
1195      db.fsync()?;
1196    }
1197
1198    tx.r.stats.as_ref().unwrap().inc_write(1);
1199
1200    Ok(())
1201  }
1202
1203  fn api_on_commit(self, f: Box<dyn FnOnce() + 'tx>) {
1204    self
1205      .cell
1206      .borrow_mut()
1207      .w
1208      .as_mut()
1209      .unwrap()
1210      .commit_handlers
1211      .push(f);
1212  }
1213
1214  fn physical_rollback(self) -> crate::Result<()> {
1215    if let Some(w) = self.split_ow_mut().as_mut() {
1216      w.tx_closing_state = TxClosingState::PhysicalRollback;
1217    }
1218    Ok(())
1219  }
1220}
1221
1222/// Read-only Transaction
1223pub struct TxImpl<'tx> {
1224  bump: SyncReusable<Pin<Box<PinBump>>>,
1225  db: Pin<AliasableBox<PinLockGuard<'tx, DbShared>>>,
1226  pub(crate) tx: TxCell<'tx>,
1227}
1228
1229impl<'tx> TxImpl<'tx> {
1230  pub(crate) fn new(
1231    bump: SyncReusable<Pin<Box<PinBump>>>, lock: RwLockReadGuard<'tx, DbShared>, meta: Meta,
1232  ) -> TxImpl<'tx> {
1233    let page_size = meta.page_size() as usize;
1234    let inline_bucket = meta.root();
1235    let mut uninit: MaybeUninit<TxImpl<'tx>> = MaybeUninit::uninit();
1236    let ptr = uninit.as_mut_ptr();
1237    unsafe {
1238      addr_of_mut!((*ptr).bump).write(bump);
1239
1240      let bump = Pin::as_ref(&*addr_of!((*ptr).bump)).bump().get_ref();
1241      addr_of_mut!((*ptr).db).write(AliasableBox::from_unique_pin(Box::pin(PinLockGuard::new(
1242        lock,
1243      ))));
1244      let db = Pin::as_ref(&*addr_of!((*ptr).db)).guard().get_ref();
1245      let tx = {
1246        let r = TxR {
1247          b: bump,
1248          page_size,
1249          db,
1250          meta,
1251          stats: Some(Default::default()),
1252          marker: Default::default(),
1253        };
1254
1255        let uninit_tx: MaybeUninit<(RefCell<TxRW>, BucketCell<'tx>)> = MaybeUninit::uninit();
1256        let cell_tx = bump.alloc(uninit_tx);
1257        let cell_tx_ptr = cell_tx.as_ptr().cast_mut();
1258        let const_cell_ptr = cell_tx_ptr.cast_const();
1259
1260        addr_of_mut!((*cell_tx_ptr).0).write(RefCell::new(TxRW { r, w: None }));
1261        addr_of_mut!((*cell_tx_ptr).1).write(BucketCell::new_r_in(
1262          bump,
1263          inline_bucket,
1264          TxCell {
1265            cell: BCell(const_cell_ptr, PhantomData),
1266          },
1267          None,
1268        ));
1269        TxCell {
1270          cell: BCell(cell_tx.assume_init_ref(), PhantomData),
1271        }
1272      };
1273      addr_of_mut!((*ptr).tx).write(tx);
1274      uninit.assume_init()
1275    }
1276  }
1277
1278  pub(crate) fn get_ref(&self) -> TxRef<'tx> {
1279    TxRef {
1280      tx: TxCell { cell: self.tx.cell },
1281    }
1282  }
1283}
1284
1285impl<'tx> Drop for TxImpl<'tx> {
1286  fn drop(&mut self) {
1287    let tx_id = self.id();
1288    let stats = self.tx.cell.borrow_mut().r.stats.take().unwrap();
1289    Pin::as_ref(&self.db).guard().remove_tx(tx_id, stats);
1290  }
1291}
1292
1293impl<'tx> TxApi<'tx> for TxImpl<'tx> {
1294  #[inline]
1295  fn id(&self) -> TxId {
1296    self.tx.api_id()
1297  }
1298
1299  #[inline]
1300  fn size(&self) -> u64 {
1301    self.tx.api_size()
1302  }
1303
1304  #[inline]
1305  fn writable(&self) -> bool {
1306    false
1307  }
1308
1309  fn cursor<'a>(&'a self) -> CursorImpl<'tx, 'a> {
1310    self.tx.api_cursor().into()
1311  }
1312
1313  fn stats(&self) -> Arc<TxStats> {
1314    self.tx.api_stats()
1315  }
1316
1317  fn bucket<'a, T: AsRef<[u8]>>(&'a self, name: T) -> Option<BucketImpl<'tx, 'a>> {
1318    self.tx.api_bucket(name.as_ref()).map(BucketImpl::from)
1319  }
1320
1321  fn bucket_path<'a, T: AsRef<[u8]>>(&'a self, names: &[T]) -> Option<BucketImpl<'tx, 'a>> {
1322    self.tx.api_bucket_path(names).map(BucketImpl::from)
1323  }
1324
1325  fn for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
1326    &self, f: F,
1327  ) -> crate::Result<()>
1328  where
1329    'tx: 'a,
1330  {
1331    self.tx.api_for_each(f)
1332  }
1333
1334  fn page(&self, id: PgId) -> Option<PageInfo> {
1335    self.tx.api_page(id)
1336  }
1337
1338  fn iter_buckets<'a>(&'a self) -> BucketIter<'tx, 'a> {
1339    BucketIter::new(self.tx.api_cursor())
1340  }
1341}
1342
1343/// Read-only Transaction reference used in managed transactions
1344pub struct TxRef<'tx> {
1345  pub(crate) tx: TxCell<'tx>,
1346}
1347
1348impl<'tx> TxApi<'tx> for TxRef<'tx> {
1349  #[inline]
1350  fn id(&self) -> TxId {
1351    self.tx.api_id()
1352  }
1353
1354  #[inline]
1355  fn size(&self) -> u64 {
1356    self.tx.api_size()
1357  }
1358
1359  #[inline]
1360  fn writable(&self) -> bool {
1361    false
1362  }
1363
1364  fn cursor<'a>(&'a self) -> CursorImpl<'tx, 'a> {
1365    self.tx.api_cursor().into()
1366  }
1367
1368  fn stats(&self) -> Arc<TxStats> {
1369    self.tx.api_stats()
1370  }
1371
1372  fn bucket<'a, T: AsRef<[u8]>>(&'a self, name: T) -> Option<BucketImpl<'tx, 'a>> {
1373    self.tx.api_bucket(name.as_ref()).map(BucketImpl::from)
1374  }
1375
1376  fn bucket_path<'a, T: AsRef<[u8]>>(&'a self, names: &[T]) -> Option<BucketImpl<'tx, 'a>> {
1377    self.tx.api_bucket_path(names).map(BucketImpl::from)
1378  }
1379
1380  fn for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
1381    &self, f: F,
1382  ) -> crate::Result<()>
1383  where
1384    'tx: 'a,
1385  {
1386    self.tx.api_for_each(f)
1387  }
1388
1389  fn page(&self, id: PgId) -> Option<PageInfo> {
1390    self.tx.api_page(id)
1391  }
1392
1393  fn iter_buckets<'a>(&'a self) -> BucketIter<'tx, 'a> {
1394    BucketIter::new(self.tx.api_cursor())
1395  }
1396}
1397
1398/// Read/Write Transaction
1399pub struct TxRwImpl<'tx> {
1400  bump: SyncReusable<Pin<Box<PinBump>>>,
1401  db: Pin<AliasableBox<PinLockGuard<'tx, DbShared>>>,
1402  pub(crate) tx: TxCell<'tx>,
1403}
1404
1405impl<'tx> TxRwImpl<'tx> {
1406  pub(crate) fn get_ref(&self) -> TxRwRef<'tx> {
1407    TxRwRef {
1408      tx: TxCell { cell: self.tx.cell },
1409    }
1410  }
1411
1412  pub(crate) fn new(
1413    bump: SyncReusable<Pin<Box<PinBump>>>, lock: RwLockUpgradableReadGuard<'tx, DbShared>,
1414    meta: Meta,
1415  ) -> TxRwImpl<'tx> {
1416    let no_sync = lock.options.no_sync();
1417    let page_size = meta.page_size() as usize;
1418    let inline_bucket = meta.root();
1419    let mut uninit: MaybeUninit<TxRwImpl<'tx>> = MaybeUninit::uninit();
1420    let ptr = uninit.as_mut_ptr();
1421    unsafe {
1422      addr_of_mut!((*ptr).bump).write(bump);
1423      let bump = Pin::as_ref(&*addr_of!((*ptr).bump)).bump().get_ref();
1424      addr_of_mut!((*ptr).db).write(AliasableBox::from_unique_pin(Box::pin(PinLockGuard::new(
1425        lock,
1426      ))));
1427
1428      let db = Pin::as_ref(&*addr_of!((*ptr).db)).guard().get_ref();
1429      let tx = {
1430        let tx_r = TxR {
1431          b: bump,
1432          page_size,
1433          db,
1434          meta,
1435          stats: Some(Default::default()),
1436          marker: Default::default(),
1437        };
1438        let tx_w = TxW {
1439          pages: HashMap::with_capacity_in(0, bump),
1440          commit_handlers: BVec::with_capacity_in(0, bump),
1441          no_sync,
1442          tx_closing_state: TxClosingState::Rollback,
1443          marker: Default::default(),
1444        };
1445
1446        let bucket_r = BucketR::new(inline_bucket);
1447        let bucket_w = BucketW::new_in(bump);
1448
1449        let uninit_tx: MaybeUninit<(RefCell<TxRW>, BucketCell<'tx>)> = MaybeUninit::uninit();
1450        let uninit_bucket: MaybeUninit<(RefCell<BucketRW<'tx>>, TxCell<'tx>)> =
1451          MaybeUninit::uninit();
1452        let cell_tx = bump.alloc(uninit_tx);
1453        let cell_tx_ptr = cell_tx.as_mut_ptr();
1454        let const_cell_tx_ptr = cell_tx_ptr.cast_const();
1455        let cell_bucket = bump.alloc(uninit_bucket);
1456        let cell_bucket_ptr = cell_bucket.as_mut_ptr();
1457
1458        addr_of_mut!((*cell_tx_ptr).0).write(RefCell::new(TxRW {
1459          r: tx_r,
1460          w: Some(tx_w),
1461        }));
1462        addr_of_mut!((*cell_bucket_ptr).0).write(RefCell::new(BucketRW {
1463          r: bucket_r,
1464          w: Some(bucket_w),
1465        }));
1466        addr_of_mut!((*cell_bucket_ptr).1).write(TxCell {
1467          cell: BCell(const_cell_tx_ptr, PhantomData),
1468        });
1469        addr_of_mut!((*cell_tx_ptr).1).write(BucketCell {
1470          cell: BCell(cell_bucket.assume_init_ref(), PhantomData),
1471        });
1472        TxCell {
1473          cell: BCell(cell_tx.assume_init_ref(), PhantomData),
1474        }
1475      };
1476      addr_of_mut!((*ptr).tx).write(tx);
1477      uninit.assume_init()
1478    }
1479  }
1480
1481  fn commit_freelist(&mut self) -> crate::Result<()> {
1482    let allocated_page = Pin::as_ref(&self.db).guard().commit_freelist(self.tx)?;
1483
1484    let freelist_page = match allocated_page {
1485      AllocateResult::Page(page) => page,
1486      AllocateResult::PageWithNewSize(page, min_size) => {
1487        Pin::as_ref(&self.db)
1488          .guard()
1489          .get_mut()
1490          .unwrap()
1491          .mmap_to_new_size(min_size, self.tx)?;
1492        page
1493      }
1494    };
1495    let pg_id = freelist_page.id;
1496    let mut tx = self.tx.cell.borrow_mut();
1497    tx.r.meta.set_free_list(pg_id);
1498    tx.w.as_mut().unwrap().pages.insert(pg_id, freelist_page);
1499    Ok(())
1500  }
1501}
1502
1503impl<'tx> Drop for TxRwImpl<'tx> {
1504  fn drop(&mut self) {
1505    let mut cell = self.tx.cell.borrow_mut();
1506    let tx_closing_state = cell.w.as_ref().unwrap().tx_closing_state;
1507    let tx_id = cell.r.meta.txid();
1508    let stats = cell.r.stats.take().unwrap();
1509    Pin::as_ref(&self.db)
1510      .guard()
1511      .remove_rw_tx(tx_closing_state, tx_id, stats);
1512  }
1513}
1514
1515impl<'tx> TxApi<'tx> for TxRwImpl<'tx> {
1516  #[inline]
1517  fn id(&self) -> TxId {
1518    self.tx.api_id()
1519  }
1520
1521  fn size(&self) -> u64 {
1522    self.tx.api_size()
1523  }
1524
1525  #[inline]
1526  fn writable(&self) -> bool {
1527    true
1528  }
1529
1530  fn cursor<'a>(&'a self) -> CursorImpl<'tx, 'a> {
1531    self.tx.api_cursor().into()
1532  }
1533
1534  fn stats(&self) -> Arc<TxStats> {
1535    self.tx.api_stats()
1536  }
1537
1538  fn bucket<'a, T: AsRef<[u8]>>(&'a self, name: T) -> Option<BucketImpl<'tx, 'a>> {
1539    self.tx.api_bucket(name.as_ref()).map(BucketImpl::from)
1540  }
1541
1542  fn bucket_path<'a, T: AsRef<[u8]>>(&'a self, names: &[T]) -> Option<BucketImpl<'tx, 'a>> {
1543    self.tx.api_bucket_path(names).map(BucketImpl::from)
1544  }
1545
1546  fn for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
1547    &self, f: F,
1548  ) -> crate::Result<()>
1549  where
1550    'tx: 'a,
1551  {
1552    self.tx.api_for_each(f)
1553  }
1554
1555  fn page(&self, id: PgId) -> Option<PageInfo> {
1556    self.tx.api_page(id)
1557  }
1558
1559  fn iter_buckets<'a>(&'a self) -> BucketIter<'tx, 'a> {
1560    BucketIter::new(self.tx.api_cursor())
1561  }
1562}
1563
1564impl<'tx> TxRwRefApi<'tx> for TxRwImpl<'tx> {
1565  fn bucket_mut<'a, T: AsRef<[u8]>>(&'a mut self, name: T) -> Option<BucketRwImpl<'tx, 'a>> {
1566    self.tx.api_bucket(name.as_ref()).map(BucketRwImpl::from)
1567  }
1568
1569  fn bucket_mut_path<'a, T: AsRef<[u8]>>(
1570    &'a mut self, names: &[T],
1571  ) -> Option<BucketRwImpl<'tx, 'a>> {
1572    self.tx.api_bucket_path(names).map(BucketRwImpl::from)
1573  }
1574
1575  fn create_bucket<'a, T: AsRef<[u8]>>(
1576    &'a mut self, name: T,
1577  ) -> crate::Result<BucketRwImpl<'tx, 'a>> {
1578    self
1579      .tx
1580      .api_create_bucket(name.as_ref())
1581      .map(BucketRwImpl::from)
1582  }
1583
1584  fn create_bucket_if_not_exists<'a, T: AsRef<[u8]>>(
1585    &'a mut self, name: T,
1586  ) -> crate::Result<BucketRwImpl<'tx, 'a>> {
1587    self
1588      .tx
1589      .api_create_bucket_if_not_exist(name.as_ref())
1590      .map(BucketRwImpl::from)
1591  }
1592
1593  fn create_bucket_path<'a, T: AsRef<[u8]>>(
1594    &'a mut self, names: &[T],
1595  ) -> crate::Result<BucketRwImpl<'tx, 'a>> {
1596    self
1597      .tx
1598      .api_create_bucket_path(names)
1599      .map(BucketRwImpl::from)
1600  }
1601
1602  fn delete_bucket<T: AsRef<[u8]>>(&mut self, name: T) -> crate::Result<()> {
1603    self.tx.api_delete_bucket(name.as_ref())
1604  }
1605
1606  fn on_commit<F: FnOnce() + 'tx>(&mut self, f: F) {
1607    self.tx.api_on_commit(Box::new(f))
1608  }
1609
1610  fn iter_mut_buckets<'a>(&'a mut self) -> BucketIterMut<'tx, 'a> {
1611    BucketIterMut::new(self.tx.api_cursor())
1612  }
1613}
1614
1615impl<'tx> TxRwApi<'tx> for TxRwImpl<'tx> {
1616  fn rollback(self) -> crate::Result<()> {
1617    self.tx.rollback()
1618  }
1619
1620  fn commit(mut self) -> crate::Result<()> {
1621    let tx_stats = {
1622      let mut tx = self.tx.cell.borrow_mut();
1623
1624      // Handle the case where the rollback is called within a managed transaction
1625      if tx.w.as_ref().unwrap().tx_closing_state == TxClosingState::ExplicitRollback {
1626        return Ok(());
1627      }
1628      tx.w.as_mut().unwrap().tx_closing_state = TxClosingState::Commit;
1629      tx.r.stats.as_ref().cloned().unwrap()
1630    };
1631
1632    let bump = self.tx.bump();
1633
1634    let start_time = Instant::now();
1635    self.tx.root_bucket().rebalance();
1636    if tx_stats.rebalance() > 0 {
1637      tx_stats.inc_rebalance_time(start_time.elapsed());
1638    }
1639    let opgid = self.tx.meta().pgid();
1640    let start_time = Instant::now();
1641    match self.tx.root_bucket().spill(bump) {
1642      Ok(_) => {
1643        tx_stats.inc_spill_time(start_time.elapsed());
1644      }
1645      Err(e) => {
1646        let _ = self.tx.physical_rollback();
1647        return Err(e);
1648      }
1649    }
1650    {
1651      let new_bucket = self.tx.cell.bound().split_r().bucket_header;
1652      let mut tx = self.tx.cell.borrow_mut();
1653      tx.r.meta.set_root(new_bucket);
1654
1655      //TODO: implement pgidNoFreeList
1656      let freelist_pg = tx.r.db.page(tx.r.meta.free_list());
1657      let tx_id = tx.r.meta.txid();
1658      Pin::as_ref(&self.db).guard().free_page(tx_id, &freelist_pg);
1659    }
1660    // TODO: implement noFreelistSync
1661
1662    match self.commit_freelist() {
1663      Ok(_) => {}
1664      Err(e) => {
1665        let _ = self.tx.physical_rollback();
1666        return Err(e);
1667      }
1668    }
1669
1670    let new_pgid = self.tx.meta().pgid();
1671    let page_size = self.tx.meta().page_size();
1672    {
1673      let tx = self.tx.cell.borrow();
1674      for page in tx.w.as_ref().unwrap().pages.values() {
1675        assert!(page.id.0 > 1, "Invalid page id");
1676      }
1677    }
1678    if new_pgid > opgid {
1679      Pin::as_ref(&self.db)
1680        .guard()
1681        .grow((new_pgid.0 + 1) * page_size as u64)?;
1682    }
1683    let start_time = Instant::now();
1684    match self.tx.write() {
1685      Ok(_) => {}
1686      Err(e) => {
1687        let _ = self.tx.physical_rollback();
1688        return Err(e);
1689      }
1690    };
1691
1692    #[cfg(feature = "strict")]
1693    {
1694      let errors = self.tx.check();
1695      if !errors.is_empty() {
1696        panic!("check fail: {}", errors.join("\n"))
1697      }
1698    }
1699
1700    match self.tx.write_meta() {
1701      Ok(_) => {
1702        tx_stats.inc_write_time(start_time.elapsed());
1703      }
1704      Err(e) => {
1705        let _ = self.tx.physical_rollback();
1706        return Err(e);
1707      }
1708    }
1709
1710    let mut tx = self.tx.cell.borrow_mut();
1711    let mut commit_handlers = BVec::with_capacity_in(0, tx.r.b);
1712    mem::swap(
1713      &mut commit_handlers,
1714      &mut tx.w.as_mut().unwrap().commit_handlers,
1715    );
1716    for f in commit_handlers.into_iter() {
1717      f();
1718    }
1719    Ok(())
1720  }
1721}
1722
1723/// Read/Write Transaction reference used in managed transactions
1724pub struct TxRwRef<'tx> {
1725  pub(crate) tx: TxCell<'tx>,
1726}
1727
1728impl<'tx> TxApi<'tx> for TxRwRef<'tx> {
1729  #[inline]
1730  fn id(&self) -> TxId {
1731    self.tx.api_id()
1732  }
1733
1734  fn size(&self) -> u64 {
1735    self.tx.api_size()
1736  }
1737
1738  #[inline]
1739  fn writable(&self) -> bool {
1740    true
1741  }
1742
1743  fn cursor<'a>(&'a self) -> CursorImpl<'tx, 'a> {
1744    self.tx.api_cursor().into()
1745  }
1746
1747  fn stats(&self) -> Arc<TxStats> {
1748    self.tx.api_stats()
1749  }
1750
1751  fn bucket<'a, T: AsRef<[u8]>>(&'a self, name: T) -> Option<BucketImpl<'tx, 'a>> {
1752    self.tx.api_bucket(name.as_ref()).map(BucketImpl::from)
1753  }
1754
1755  fn bucket_path<'a, T: AsRef<[u8]>>(&'a self, names: &[T]) -> Option<BucketImpl<'tx, 'a>> {
1756    self.tx.api_bucket_path(names).map(BucketImpl::from)
1757  }
1758
1759  fn for_each<'a, F: FnMut(&'a [u8], BucketImpl<'tx, 'a>) -> crate::Result<()>>(
1760    &self, f: F,
1761  ) -> crate::Result<()>
1762  where
1763    'tx: 'a,
1764  {
1765    self.tx.api_for_each(f)
1766  }
1767
1768  fn page(&self, id: PgId) -> Option<PageInfo> {
1769    self.tx.api_page(id)
1770  }
1771
1772  fn iter_buckets<'a>(&'a self) -> BucketIter<'tx, 'a> {
1773    BucketIter::new(self.tx.api_cursor())
1774  }
1775}
1776
1777impl<'tx> TxRwRefApi<'tx> for TxRwRef<'tx> {
1778  fn bucket_mut<'a, T: AsRef<[u8]>>(&'a mut self, name: T) -> Option<BucketRwImpl<'tx, 'a>> {
1779    self.tx.api_bucket(name.as_ref()).map(BucketRwImpl::from)
1780  }
1781
1782  fn bucket_mut_path<'a, T: AsRef<[u8]>>(
1783    &'a mut self, names: &[T],
1784  ) -> Option<BucketRwImpl<'tx, 'a>> {
1785    self.tx.api_bucket_path(names).map(BucketRwImpl::from)
1786  }
1787
1788  fn create_bucket<'a, T: AsRef<[u8]>>(
1789    &'a mut self, name: T,
1790  ) -> crate::Result<BucketRwImpl<'tx, 'a>> {
1791    self
1792      .tx
1793      .api_create_bucket(name.as_ref())
1794      .map(BucketRwImpl::from)
1795  }
1796
1797  fn create_bucket_if_not_exists<'a, T: AsRef<[u8]>>(
1798    &'a mut self, name: T,
1799  ) -> crate::Result<BucketRwImpl<'tx, 'a>> {
1800    self
1801      .tx
1802      .api_create_bucket_if_not_exist(name.as_ref())
1803      .map(BucketRwImpl::from)
1804  }
1805
1806  fn create_bucket_path<'a, T: AsRef<[u8]>>(
1807    &'a mut self, names: &[T],
1808  ) -> crate::Result<BucketRwImpl<'tx, 'a>> {
1809    self
1810      .tx
1811      .api_create_bucket_path(names)
1812      .map(BucketRwImpl::from)
1813  }
1814
1815  fn delete_bucket<T: AsRef<[u8]>>(&mut self, name: T) -> crate::Result<()> {
1816    self.tx.api_delete_bucket(name.as_ref())
1817  }
1818
1819  fn on_commit<F: FnOnce() + 'tx>(&mut self, f: F) {
1820    self.tx.api_on_commit(Box::new(f))
1821  }
1822
1823  fn iter_mut_buckets<'a>(&'a mut self) -> BucketIterMut<'tx, 'a> {
1824    BucketIterMut::new(self.tx.api_cursor())
1825  }
1826}
1827
1828pub(crate) mod check {
1829  use crate::bucket::{BucketCell, BucketIApi};
1830  use crate::common::page::tree::branch::MappedBranchPage;
1831  use crate::common::page::tree::leaf::MappedLeafPage;
1832  use crate::common::page::tree::TreePage;
1833  use crate::common::page::{CoerciblePage, RefPage};
1834  use crate::common::refstack::RefStack;
1835  use crate::common::{BVec, HashMap, HashSet, PgId, SplitRef, ZERO_PGID};
1836  use crate::db::DbIApi;
1837  use crate::tx::{TxCell, TxIApi, TxImpl, TxR, TxRef, TxRwIApi, TxRwImpl, TxRwRef, TxW};
1838
1839  pub(crate) trait UnsealTx<'tx> {
1840    fn unseal(&self) -> impl TxIApi<'tx> + TxICheck<'tx>;
1841  }
1842
1843  pub(crate) trait UnsealRwTx<'tx>: UnsealTx<'tx> {
1844    fn unseal_rw(&self) -> impl TxRwIApi<'tx>;
1845  }
1846
1847  impl<'tx> UnsealTx<'tx> for TxImpl<'tx> {
1848    #[inline]
1849    fn unseal(&self) -> impl TxIApi<'tx> + TxICheck<'tx> {
1850      TxCell { cell: self.tx.cell }
1851    }
1852  }
1853
1854  impl<'tx> UnsealTx<'tx> for TxRef<'tx> {
1855    #[inline]
1856    fn unseal(&self) -> impl TxIApi<'tx> + TxICheck<'tx> {
1857      TxCell { cell: self.tx.cell }
1858    }
1859  }
1860
1861  impl<'tx> UnsealTx<'tx> for TxRwImpl<'tx> {
1862    #[inline]
1863    fn unseal(&self) -> impl TxIApi<'tx> + TxICheck<'tx> {
1864      TxCell { cell: self.tx.cell }
1865    }
1866  }
1867
1868  impl<'tx> UnsealTx<'tx> for TxRwRef<'tx> {
1869    #[inline]
1870    fn unseal(&self) -> impl TxIApi<'tx> + TxICheck<'tx> {
1871      self.tx
1872    }
1873  }
1874
1875  impl<'tx> UnsealRwTx<'tx> for TxRwImpl<'tx> {
1876    #[inline]
1877    fn unseal_rw(&self) -> impl TxRwIApi<'tx> {
1878      TxCell { cell: self.tx.cell }
1879    }
1880  }
1881
1882  impl<'tx> UnsealRwTx<'tx> for TxRwRef<'tx> {
1883    #[inline]
1884    fn unseal_rw(&self) -> impl TxRwIApi<'tx> {
1885      self.tx
1886    }
1887  }
1888
1889  /// Check performs several consistency checks on the database for this transaction.
1890  /// An error is returned if any inconsistency is found.
1891  ///
1892  /// It can be safely run concurrently on a writable transaction. However, this
1893  /// incurs a high cost for large databases and databases with a lot of subbuckets
1894  /// because of caching. This overhead can be removed if running on a read-only
1895  /// transaction, however, it is not safe to execute other writer transactions at
1896  /// the same time.
1897  pub trait TxCheck<'tx> {
1898    fn check(&self) -> Vec<String>;
1899  }
1900
1901  impl<'tx, T> TxCheck<'tx> for T
1902  where
1903    T: UnsealTx<'tx>,
1904  {
1905    fn check(&self) -> Vec<String> {
1906      let tx = self.unseal();
1907      tx.check()
1908    }
1909  }
1910
1911  pub(crate) trait TxICheck<'tx>:
1912    TxIApi<'tx> + SplitRef<TxR<'tx>, BucketCell<'tx>, TxW<'tx>>
1913  {
1914    fn check(self) -> Vec<String>;
1915
1916    fn check_bucket(
1917      &self, bucket: BucketCell<'tx>, reachable: &mut HashMap<PgId, RefPage<'tx>>,
1918      freed: &mut HashSet<PgId>, errors: &mut Vec<String>,
1919    );
1920
1921    fn recursively_check_pages(self, pg_id: PgId, errors: &mut Vec<String>);
1922
1923    fn recursively_check_pages_internal(
1924      self, pg_id: PgId, min_key_closed: &[u8], max_key_open: &[u8], pageid_stack: &RefStack<PgId>,
1925      errors: &mut Vec<String>,
1926    ) -> &'tx [u8];
1927
1928    /***
1929     * verifyKeyOrder checks whether an entry with given #index on pgId (pageType: "branch|leaf") that has given "key",
1930     * is within range determined by (previousKey..maxKeyOpen) and reports found violations to the channel (ch).
1931     */
1932    fn verify_key_order(
1933      self, pg_id: PgId, page_type: &str, index: usize, key: &[u8], previous_key: &[u8],
1934      max_key_open: &[u8], pageid_stack: &RefStack<PgId>, errors: &mut Vec<String>,
1935    );
1936  }
1937
1938  impl<'tx> TxICheck<'tx> for TxCell<'tx> {
1939    fn check(self) -> Vec<String> {
1940      let mut errors = Vec::new();
1941      let bump = self.bump();
1942      let db = self.split_r().db;
1943      let freelist_count = db.freelist_count();
1944      let high_water = self.meta().pgid();
1945      // TODO: ReadOnly mode handling
1946
1947      // Check if any pages are double freed.
1948      let mut freed = HashSet::new_in(bump);
1949      let mut all = BVec::with_capacity_in(freelist_count as usize, bump);
1950      for _ in 0..freelist_count {
1951        all.push(ZERO_PGID);
1952      }
1953      db.freelist_copyall(&mut all);
1954      for id in &all {
1955        if freed.contains(id) {
1956          errors.push(format!("page {}: already freed", id));
1957        } else {
1958          freed.insert(*id);
1959        }
1960      }
1961
1962      // Track every reachable page.
1963      let mut reachable = HashMap::new_in(bump);
1964      reachable.insert(PgId(0), RefPage::new(std::ptr::null_mut())); //meta 0
1965      reachable.insert(PgId(1), RefPage::new(std::ptr::null_mut())); // meta 1
1966      let freelist_pgid = self.meta().free_list();
1967      for i in 0..=self.mem_page(freelist_pgid).overflow {
1968        let pg_id = freelist_pgid + i as u64;
1969        reachable.insert(pg_id, self.mem_page(freelist_pgid));
1970      }
1971
1972      // Recursively check buckets.
1973      self.check_bucket(self.split_bound(), &mut reachable, &mut freed, &mut errors);
1974
1975      // Ensure all pages below high water mark are either reachable or freed.
1976      for i in 0..high_water.0 {
1977        let pg_id = PgId(i);
1978        if !reachable.contains_key(&pg_id) && !freed.contains(&pg_id) {
1979          errors.push(format!("page {}: unreachable unfreed", pg_id));
1980        }
1981      }
1982
1983      errors
1984    }
1985
1986    fn check_bucket(
1987      &self, bucket: BucketCell<'tx>, reachable: &mut HashMap<PgId, RefPage<'tx>>,
1988      freed: &mut HashSet<PgId>, errors: &mut Vec<String>,
1989    ) {
1990      // ignore inline buckets
1991      if bucket.root() == ZERO_PGID {
1992        return;
1993      }
1994
1995      self.for_each_page(bucket.root(), &mut |p, _, pgid_stack| {
1996        if p.id > self.meta().pgid() {
1997          errors.push(format!(
1998            "page {}: out of bounds: {} (stack: {:?})",
1999            p.id,
2000            self.meta().pgid(),
2001            pgid_stack
2002          ));
2003        }
2004        for i in 0..=p.overflow {
2005          let id = p.id + i as u64;
2006          if reachable.contains_key(&id) {
2007            errors.push(format!(
2008              "page {}: multiple references (stack: {:?})",
2009              id, pgid_stack
2010            ));
2011          }
2012          reachable.insert(id, *p);
2013        }
2014
2015        if freed.contains(&p.id) {
2016          errors.push(format!("page {}: reachable freed", p.id));
2017        } else if !p.is_branch() && !p.is_leaf() {
2018          errors.push(format!(
2019            "page {}: invalid type: {} (stack: {:?})",
2020            p.id,
2021            p.page_type(),
2022            pgid_stack
2023          ));
2024        }
2025      });
2026
2027      self.recursively_check_pages(bucket.root(), errors);
2028
2029      bucket
2030        .api_for_each_bucket(|key| {
2031          let child = bucket.api_bucket(key).unwrap();
2032          self.check_bucket(child, reachable, freed, errors);
2033          Ok(())
2034        })
2035        .unwrap();
2036    }
2037
2038    fn recursively_check_pages(self, pg_id: PgId, errors: &mut Vec<String>) {
2039      let pgid_stack = RefStack::new(pg_id);
2040      self.recursively_check_pages_internal(pg_id, &[], &[], &pgid_stack, errors);
2041    }
2042
2043    fn recursively_check_pages_internal(
2044      self, pg_id: PgId, min_key_closed: &[u8], max_key_open: &[u8], pageid_stack: &RefStack<PgId>,
2045      errors: &mut Vec<String>,
2046    ) -> &'tx [u8] {
2047      let p = self.mem_page(pg_id);
2048      pageid_stack.push(pg_id);
2049      let mut max_key_in_subtree = [].as_slice();
2050      if let Some(branch_page) = MappedBranchPage::coerce_ref(&p) {
2051        let mut running_min = min_key_closed;
2052        let elements_len = branch_page.elements().len();
2053        for (i, (pg_id, key)) in branch_page
2054          .elements()
2055          .iter()
2056          .map(|e| {
2057            (e.pgid(), unsafe {
2058              e.key(branch_page.page_ptr().cast_const())
2059            })
2060          })
2061          .enumerate()
2062        {
2063          self.verify_key_order(
2064            pg_id,
2065            "branch",
2066            i,
2067            key,
2068            running_min,
2069            max_key_open,
2070            pageid_stack,
2071            errors,
2072          );
2073          let mut max_key = max_key_open;
2074          if i < elements_len - 1 {
2075            max_key = branch_page.get_elem(i as u16 + 1).unwrap().key();
2076          }
2077          max_key_in_subtree =
2078            self.recursively_check_pages_internal(pg_id, key, max_key, pageid_stack, errors);
2079          running_min = max_key_in_subtree;
2080        }
2081        return max_key_in_subtree;
2082      } else if let Some(leaf_page) = MappedLeafPage::coerce_ref(&p) {
2083        let mut running_min = min_key_closed;
2084        for (i, key) in leaf_page
2085          .elements()
2086          .iter()
2087          .map(|e| unsafe { e.key(leaf_page.page_ptr().cast_const()) })
2088          .enumerate()
2089        {
2090          self.verify_key_order(
2091            pg_id,
2092            "leaf",
2093            i,
2094            key,
2095            running_min,
2096            max_key_open,
2097            pageid_stack,
2098            errors,
2099          );
2100          running_min = key;
2101        }
2102        if p.count > 0 {
2103          return leaf_page.get_elem(p.count - 1).unwrap().key();
2104        }
2105      } else {
2106        errors.push(format!("unexpected page type for pgId: {}", pg_id));
2107      }
2108      &[]
2109    }
2110
2111    /***
2112     * verifyKeyOrder checks whether an entry with given #index on pgId (pageType: "branch|leaf") that has given "key",
2113     * is within range determined by (previousKey..maxKeyOpen) and reports found violations to the channel (ch).
2114     */
2115    fn verify_key_order(
2116      self, pg_id: PgId, page_type: &str, index: usize, key: &[u8], previous_key: &[u8],
2117      max_key_open: &[u8], pageid_stack: &RefStack<PgId>, errors: &mut Vec<String>,
2118    ) {
2119      if index == 0 && !previous_key.is_empty() && previous_key > key {
2120        errors.push(format!("the first key[{}]={:02X?} on {} page({}) needs to be >= the key in the ancestor ({:02X?}). Stack: {:?}", index, key, page_type, pg_id, previous_key, pageid_stack));
2121      }
2122      if index > 0 {
2123        if previous_key > key {
2124          errors.push(format!("key[{}]=(hex){:02X?} on {} page({}) needs to be > (found <) than previous element (hex){:02X?}. Stack: {:?}", index, key, page_type, pg_id, previous_key, pageid_stack));
2125        } else if previous_key == key {
2126          errors.push(format!("key[{}]=(hex){:02X?} on {} page({}) needs to be > (found =) than previous element (hex){:02X?}. Stack: {:?}", index, key, page_type, pg_id, previous_key, pageid_stack));
2127        }
2128      }
2129      if !max_key_open.is_empty() && key >= max_key_open {
2130        errors.push(format!("key[{}]=(hex){:02X?} on {} page({}) needs to be < than key of the next element in ancestor (hex){:02X?}. Pages stack: {:?}", index, key, page_type, pg_id, previous_key, pageid_stack));
2131      }
2132    }
2133  }
2134}
2135
2136#[cfg(test)]
2137mod test {
2138  use crate::common::cell::RefCell;
2139  use crate::common::defaults::DEFAULT_PAGE_SIZE;
2140  use crate::test_support::TestDb;
2141  use crate::tx::check::TxCheck;
2142  use crate::tx::{TxRwApi, TxStats};
2143  use crate::{
2144    Bolt, BoltOptions, BucketApi, BucketRwApi, CursorApi, DbApi, DbRwAPI, Error, TxApi, TxImpl,
2145    TxRwRefApi,
2146  };
2147  use anyhow::anyhow;
2148  use std::time::Duration;
2149
2150  #[test]
2151  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2152  fn test_tx_check_read_only() -> crate::Result<()> {
2153    let mut db = TestDb::new()?;
2154    db.update(|mut tx| {
2155      let mut b = tx.create_bucket("widgets")?;
2156      b.put("foo", "bar")?;
2157      Ok(())
2158    })?;
2159    let close_db = db.clone_db();
2160    close_db.close();
2161
2162    let file = db.tmp_file.as_ref().unwrap();
2163    let ro = Bolt::open_ro(file.as_ref());
2164    let ro_db = ro.unwrap();
2165    let tx = ro_db.begin()?;
2166    let errors = tx.check();
2167    assert!(errors.is_empty(), "{:?}", errors);
2168
2169    Ok(())
2170  }
2171
2172  #[test]
2173  fn test_tx_cursor() -> crate::Result<()> {
2174    let mut db = TestDb::new()?;
2175    db.update(|mut tx| {
2176      tx.create_bucket("widgets")?;
2177      tx.create_bucket("woojits")?;
2178      let mut c = tx.cursor();
2179      assert_eq!(Some(("widgets".as_bytes(), None)), c.first());
2180      assert_eq!(Some(("woojits".as_bytes(), None)), c.next());
2181      Ok(())
2182    })?;
2183    Ok(())
2184  }
2185
2186  #[test]
2187  fn test_tx_bucket() -> crate::Result<()> {
2188    let mut db = TestDb::new()?;
2189    db.update(|mut tx| {
2190      tx.create_bucket("widgets")?;
2191      assert!(tx.bucket("widgets").is_some(), "expected bucket");
2192      Ok(())
2193    })?;
2194    Ok(())
2195  }
2196
2197  #[test]
2198  fn test_tx_get_not_found() -> crate::Result<()> {
2199    let mut db = TestDb::new()?;
2200    db.update(|mut tx| {
2201      let mut b = tx.create_bucket("widgets")?;
2202      b.put("foo", "bar")?;
2203      assert_eq!(None, b.get("no_such_key"), "expected None");
2204      Ok(())
2205    })?;
2206    Ok(())
2207  }
2208
2209  #[test]
2210  fn test_tx_create_bucket() -> crate::Result<()> {
2211    let mut db = TestDb::new()?;
2212    db.update(|mut tx| {
2213      tx.create_bucket(b"widgets")?;
2214      Ok(())
2215    })?;
2216    db.view(|tx| {
2217      let bucket = tx.bucket(b"widgets");
2218      assert!(bucket.is_some(), "expected bucket");
2219      Ok(())
2220    })
2221  }
2222
2223  #[test]
2224  fn test_tx_create_bucket_if_not_exists() -> crate::Result<()> {
2225    let mut db = TestDb::new()?;
2226    db.update(|mut tx| {
2227      tx.create_bucket_if_not_exists("widgets")?;
2228      tx.create_bucket_if_not_exists("widgets")?;
2229      Ok(())
2230    })?;
2231    db.view(|tx| {
2232      assert!(tx.bucket("widgets").is_some());
2233      Ok(())
2234    })?;
2235    Ok(())
2236  }
2237
2238  #[test]
2239  fn test_tx_create_bucket_if_not_exists_err_bucket_name_required() -> crate::Result<()> {
2240    let mut db = TestDb::new()?;
2241    db.update(|mut tx| {
2242      assert_eq!(
2243        Some(Error::BucketNameRequired),
2244        tx.create_bucket_if_not_exists("").err()
2245      );
2246      Ok(())
2247    })?;
2248    Ok(())
2249  }
2250
2251  #[test]
2252  fn test_tx_create_bucket_err_bucket_exists() -> crate::Result<()> {
2253    let mut db = TestDb::new()?;
2254    db.update(|mut tx| {
2255      tx.create_bucket("widgets")?;
2256      Ok(())
2257    })?;
2258    db.update(|mut tx| {
2259      assert_eq!(Some(Error::BucketExists), tx.create_bucket("widgets").err());
2260      Ok(())
2261    })?;
2262    Ok(())
2263  }
2264
2265  #[test]
2266  fn test_tx_create_bucket_err_bucket_name_required() -> crate::Result<()> {
2267    let mut db = TestDb::new()?;
2268    db.update(|mut tx| {
2269      assert_eq!(Some(Error::BucketNameRequired), tx.create_bucket("").err());
2270      Ok(())
2271    })?;
2272    Ok(())
2273  }
2274
2275  #[test]
2276  fn test_tx_delete_bucket() -> crate::Result<()> {
2277    let mut db = TestDb::new()?;
2278    db.update(|mut tx| {
2279      let mut b = tx.create_bucket("widgets")?;
2280      b.put("foo", "bar")?;
2281      Ok(())
2282    })?;
2283    db.update(|mut tx| {
2284      tx.delete_bucket("widgets")?;
2285      assert!(tx.bucket("widgets").is_none());
2286      Ok(())
2287    })?;
2288    db.update(|mut tx| {
2289      let b = tx.create_bucket("widgets")?;
2290      assert!(b.get("widgets").is_none());
2291      Ok(())
2292    })?;
2293    Ok(())
2294  }
2295
2296  #[test]
2297  fn test_tx_delete_bucket_not_found() -> crate::Result<()> {
2298    let mut db = TestDb::new()?;
2299    db.update(|mut tx| {
2300      assert_eq!(
2301        Some(Error::BucketNotFound),
2302        tx.delete_bucket("widgets").err()
2303      );
2304      Ok(())
2305    })?;
2306    Ok(())
2307  }
2308
2309  #[test]
2310  fn test_tx_for_each_no_error() -> crate::Result<()> {
2311    let mut db = TestDb::new()?;
2312    db.update(|mut tx| {
2313      let mut b = tx.create_bucket("widgets")?;
2314      b.put("foo", "bar")?;
2315      tx.for_each(|_, _| Ok(()))?;
2316      Ok(())
2317    })?;
2318    Ok(())
2319  }
2320
2321  #[test]
2322  fn test_tx_for_each_with_error() -> crate::Result<()> {
2323    let mut db = TestDb::new()?;
2324    let result = db.update(|mut tx| {
2325      let mut b = tx.create_bucket("widgets")?;
2326      b.put("foo", "bar")?;
2327      tx.for_each(|_, _| Err(Error::Other(anyhow!("marker"))))?;
2328      Ok(())
2329    });
2330    let e = result.map_err(|e| e.to_string()).err().unwrap();
2331    assert_eq!("marker", e);
2332    Ok(())
2333  }
2334
2335  #[test]
2336  fn test_tx_on_commit() -> crate::Result<()> {
2337    let x = RefCell::new(0u64);
2338    let mut db = TestDb::new()?;
2339    db.update(|mut tx| {
2340      tx.on_commit(|| {
2341        *x.borrow_mut() += 1;
2342      });
2343      tx.on_commit(|| {
2344        *x.borrow_mut() += 2;
2345      });
2346      let mut b = tx.create_bucket("widgets")?;
2347      b.put("foo", "bar")?;
2348      Ok(())
2349    })?;
2350    assert_eq!(3, *x.borrow());
2351    Ok(())
2352  }
2353
2354  #[test]
2355  fn test_tx_on_commit_rollback() -> crate::Result<()> {
2356    let x = RefCell::new(0u64);
2357    let mut db = TestDb::new()?;
2358    let _ = db.update(|mut tx| {
2359      tx.on_commit(|| {
2360        *x.borrow_mut() += 1;
2361      });
2362      tx.on_commit(|| {
2363        *x.borrow_mut() += 2;
2364      });
2365      tx.create_bucket("widgets")?;
2366      Err(Error::Other(anyhow!("rollback")))
2367    });
2368    assert_eq!(0, *x.borrow());
2369    Ok(())
2370  }
2371
2372  #[test]
2373  #[ignore]
2374  fn test_tx_copy_file() {
2375    todo!()
2376  }
2377
2378  #[test]
2379  #[ignore]
2380  fn test_tx_copy_file_error_meta() {
2381    todo!()
2382  }
2383
2384  #[test]
2385  #[ignore]
2386  fn test_tx_copy_file_error_normal() {
2387    todo!()
2388  }
2389
2390  #[test]
2391  fn test_tx_rollback() -> crate::Result<()> {
2392    let mut db = TestDb::new()?;
2393    let mut tx = db.begin_rw_tx()?;
2394    tx.create_bucket("mybucket")?;
2395    tx.commit()?;
2396    let mut tx = db.begin_rw_tx()?;
2397    let mut b = tx.bucket_mut("mybucket").unwrap();
2398    b.put("k", "v")?;
2399    tx.rollback()?;
2400    let tx = db.begin_tx()?;
2401    let b = tx.bucket("mybucket").unwrap();
2402    assert_eq!(None, b.get("k"));
2403    drop(tx);
2404    //todo!("noSyncFreelist");
2405    Ok(())
2406  }
2407
2408  #[test]
2409  fn test_tx_release_range() -> crate::Result<()> {
2410    // Set initial mmap size well beyond the limit we will hit in this
2411    // test, since we are testing with long running read transactions
2412    // and will deadlock if db.grow is triggered.
2413    let initial_mmap_size = DEFAULT_PAGE_SIZE.bytes() as u64 * 100;
2414    let db_options = BoltOptions::builder()
2415      .initial_mmap_size(initial_mmap_size)
2416      .build();
2417    let db = TestDb::with_options(db_options)?;
2418    let bucket = "bucket";
2419
2420    let mut put_db = db.clone_db();
2421    let mut put = move |key, value| {
2422      put_db
2423        .update(|mut tx| {
2424          let mut b = tx.create_bucket_if_not_exists(bucket)?;
2425          b.put(key, value)?;
2426          Ok(())
2427        })
2428        .unwrap();
2429    };
2430
2431    let mut del_db = db.clone_db();
2432    let mut del = move |key| {
2433      del_db
2434        .update(|mut tx| {
2435          let mut b = tx.create_bucket_if_not_exists(bucket)?;
2436          b.delete(key)?;
2437          Ok(())
2438        })
2439        .unwrap();
2440    };
2441
2442    let open_read_tx = || db.begin_tx().unwrap();
2443
2444    let check_with_read_tx = |tx: &TxImpl, key, want_value| {
2445      let bucket = tx.bucket(bucket).unwrap();
2446      let value = bucket.get(key);
2447      assert_eq!(want_value, value);
2448    };
2449
2450    put("k1", "v1");
2451    let rtx1 = open_read_tx();
2452    put("k2", "v2");
2453    let hold1 = open_read_tx();
2454    put("k3", "v3");
2455    let hold2 = open_read_tx();
2456    del("k3");
2457    let rtx2 = open_read_tx();
2458    del("k1");
2459    let hold3 = open_read_tx();
2460    del("k2");
2461    let hold4 = open_read_tx();
2462    put("k4", "v4");
2463    let hold5 = open_read_tx();
2464
2465    // Close the read transactions we established to hold a portion of the pages in pending state.
2466    drop(hold1);
2467    drop(hold2);
2468    drop(hold3);
2469    drop(hold4);
2470    drop(hold5);
2471
2472    // Execute a write transaction to trigger a releaseRange operation in the db
2473    // that will free multiple ranges between the remaining open read transactions, now that the
2474    // holds have been rolled back.
2475    put("k4", "v4");
2476
2477    // Check that all long running reads still read correct values.
2478    check_with_read_tx(&rtx1, "k1", Some("v1".as_bytes()));
2479    check_with_read_tx(&rtx2, "k2", Some("v2".as_bytes()));
2480    drop(rtx1);
2481    drop(rtx2);
2482
2483    // Check that the final state is correct.
2484    let rtx7 = open_read_tx();
2485    check_with_read_tx(&rtx7, "k1", None);
2486    check_with_read_tx(&rtx7, "k2", None);
2487    check_with_read_tx(&rtx7, "k3", None);
2488    check_with_read_tx(&rtx7, "k4", Some("v4".as_bytes()));
2489    Ok(())
2490  }
2491
2492  #[test]
2493  #[ignore]
2494  fn example_tx_copy_file() {
2495    todo!()
2496  }
2497
2498  #[test]
2499  fn test_tx_stats_get_and_inc_atomically() {
2500    let stats = TxStats::default();
2501
2502    stats.inc_page_count(1);
2503    assert_eq!(1, stats.page_count());
2504
2505    stats.inc_page_alloc(2);
2506    assert_eq!(2, stats.page_alloc());
2507
2508    stats.inc_cursor_count(3);
2509    assert_eq!(3, stats.cursor_count());
2510
2511    stats.inc_node_count(100);
2512    assert_eq!(100, stats.node_count());
2513
2514    stats.inc_node_deref(101);
2515    assert_eq!(101, stats.node_deref());
2516
2517    stats.inc_rebalance(1000);
2518    assert_eq!(1000, stats.rebalance());
2519
2520    stats.inc_rebalance_time(Duration::from_secs(1001));
2521    assert_eq!(1001, stats.rebalance_time().as_secs());
2522
2523    stats.inc_split(10000);
2524    assert_eq!(10000, stats.split());
2525
2526    stats.inc_spill(10001);
2527    assert_eq!(10001, stats.spill());
2528
2529    stats.inc_spill_time(Duration::from_secs(10001));
2530    assert_eq!(10001, stats.spill_time().as_secs());
2531
2532    stats.inc_write(100_000);
2533    assert_eq!(100_000, stats.write());
2534
2535    stats.inc_write_time(Duration::from_secs(100_001));
2536    assert_eq!(100_001, stats.write_time().as_secs());
2537
2538    let expected_stats = TxStats {
2539      page_count: 1.into(),
2540      page_alloc: 2.into(),
2541      cursor_count: 3.into(),
2542      node_count: 100.into(),
2543      node_deref: 101.into(),
2544      rebalance: 1000.into(),
2545      rebalance_time: Duration::from_secs(1001).into(),
2546      split: 10000.into(),
2547      spill: 10001.into(),
2548      spill_time: Duration::from_secs(10001).into(),
2549      write: 100_000.into(),
2550      write_time: Duration::from_secs(100_001).into(),
2551    };
2552
2553    assert_eq!(expected_stats, stats);
2554  }
2555
2556  #[test]
2557  fn test_tx_stats_sub() {
2558    let stats_a = TxStats {
2559      page_count: 1.into(),
2560      page_alloc: 2.into(),
2561      cursor_count: 3.into(),
2562      node_count: 100.into(),
2563      node_deref: 101.into(),
2564      rebalance: 1000.into(),
2565      rebalance_time: Duration::from_secs(1001).into(),
2566      split: 10000.into(),
2567      spill: 10001.into(),
2568      spill_time: Duration::from_secs(10001).into(),
2569      write: 100_000.into(),
2570      write_time: Duration::from_secs(100_001).into(),
2571    };
2572
2573    let stats_b = TxStats {
2574      page_count: 2.into(),
2575      page_alloc: 3.into(),
2576      cursor_count: 4.into(),
2577      node_count: 101.into(),
2578      node_deref: 102.into(),
2579      rebalance: 1001.into(),
2580      rebalance_time: Duration::from_secs(1002).into(),
2581      split: 11001.into(),
2582      spill: 11002.into(),
2583      spill_time: Duration::from_secs(11002).into(),
2584      write: 110_001.into(),
2585      write_time: Duration::from_secs(110_010).into(),
2586    };
2587
2588    let diff = stats_b.sub(&stats_a);
2589    let expected_stats = TxStats {
2590      page_count: 1.into(),
2591      page_alloc: 1.into(),
2592      cursor_count: 1.into(),
2593      node_count: 1.into(),
2594      node_deref: 1.into(),
2595      rebalance: 1.into(),
2596      rebalance_time: Duration::from_secs(1).into(),
2597      split: 1001.into(),
2598      spill: 1001.into(),
2599      spill_time: Duration::from_secs(1001).into(),
2600      write: 10001.into(),
2601      write_time: Duration::from_secs(10009).into(),
2602    };
2603
2604    assert_eq!(expected_stats, diff);
2605  }
2606
2607  #[test]
2608  #[ignore]
2609  fn test_tx_truncate_before_write() {
2610    todo!()
2611  }
2612
2613  #[test]
2614  fn test_tx_stats_add() {
2615    let stats_a = TxStats {
2616      page_count: 1.into(),
2617      page_alloc: 2.into(),
2618      cursor_count: 3.into(),
2619      node_count: 100.into(),
2620      node_deref: 101.into(),
2621      rebalance: 1000.into(),
2622      rebalance_time: Duration::from_secs(1001).into(),
2623      split: 10000.into(),
2624      spill: 10001.into(),
2625      spill_time: Duration::from_secs(10001).into(),
2626      write: 100_000.into(),
2627      write_time: Duration::from_secs(100_001).into(),
2628    };
2629
2630    let stats_b = TxStats {
2631      page_count: 2.into(),
2632      page_alloc: 3.into(),
2633      cursor_count: 4.into(),
2634      node_count: 101.into(),
2635      node_deref: 102.into(),
2636      rebalance: 1001.into(),
2637      rebalance_time: Duration::from_secs(1002).into(),
2638      split: 11001.into(),
2639      spill: 11002.into(),
2640      spill_time: Duration::from_secs(11002).into(),
2641      write: 110_001.into(),
2642      write_time: Duration::from_secs(110_010).into(),
2643    };
2644
2645    let add = stats_b.add(&stats_a);
2646    let expected_stats = TxStats {
2647      page_count: 3.into(),
2648      page_alloc: 5.into(),
2649      cursor_count: 7.into(),
2650      node_count: 201.into(),
2651      node_deref: 203.into(),
2652      rebalance: 2001.into(),
2653      rebalance_time: Duration::from_secs(2003).into(),
2654      split: 21001.into(),
2655      spill: 21003.into(),
2656      spill_time: Duration::from_secs(21003).into(),
2657      write: 210001.into(),
2658      write_time: Duration::from_secs(210011).into(),
2659    };
2660
2661    assert_eq!(expected_stats, add);
2662  }
2663}