bbolt_rs/
db.rs

1use crate::arch::size::MAX_MAP_SIZE;
2use crate::bucket::BucketRwIApi;
3use crate::common::bucket::BucketHeader;
4use crate::common::bump::PinBump;
5use crate::common::defaults::{
6  DEFAULT_ALLOC_SIZE, DEFAULT_MAX_BATCH_DELAY, DEFAULT_MAX_BATCH_SIZE, DEFAULT_PAGE_SIZE, MAGIC,
7  MAX_MMAP_STEP, PGID_NO_FREE_LIST, VERSION,
8};
9use crate::common::lock::LockGuard;
10use crate::common::page::freelist::{Freelist, MappedFreeListPage};
11use crate::common::page::meta::{MappedMetaPage, Meta};
12use crate::common::page::tree::leaf::MappedLeafPage;
13use crate::common::page::{CoerciblePage, MutPage, PageHeader, RefPage};
14use crate::common::pool::{SyncPool, SyncReusable};
15use crate::common::self_owned::SelfOwned;
16use crate::common::{BVec, PgId, SplitRef, TxId};
17use crate::tx::{
18  TxCell, TxClosingState, TxIApi, TxImpl, TxRef, TxRwApi, TxRwImpl, TxRwRef, TxStats,
19};
20use crate::{Error, TxApi};
21use aligners::{alignment, AlignedBytes};
22use anyhow::anyhow;
23use fs4::fs_std::FileExt;
24use memmap2::{Advice, MmapOptions, MmapRaw};
25use monotonic_timer::{Guard, Timer};
26use parking_lot::{Mutex, MutexGuard, RwLock};
27use std::fs::File;
28use std::io::{Read, Seek, SeekFrom, Write};
29use std::ops::{Deref, DerefMut};
30use std::path::{Path, PathBuf};
31use std::pin::Pin;
32use std::sync::atomic::{AtomicI64, Ordering};
33use std::sync::mpsc::{Receiver, SyncSender};
34use std::sync::{mpsc, Arc, OnceLock, Weak};
35use std::time::Duration;
36#[cfg(feature = "try-begin")]
37use std::time::Instant;
38use std::{fs, io, mem, thread};
39use typed_builder::TypedBuilder;
40
41/// Read-only DB API
42pub trait DbApi: Clone + Send + Sync
43where
44  Self: Sized,
45{
46  /// Begin starts a new transaction.
47  ///
48  /// Multiple read-only transactions can be used concurrently but only one
49  /// write transaction can be used at a time. Starting multiple write transactions
50  /// will cause the calls to block and be serialized until the current write
51  /// transaction finishes.
52  ///
53  /// Transactions should not be dependent on one another. Opening a read
54  /// transaction and a write transaction in the same goroutine can cause the
55  /// writer to deadlock because the database periodically needs to re-mmap itself
56  /// as it grows and it cannot do that while a read transaction is open.
57  ///
58  /// If a long running read transaction (for example, a snapshot transaction) is
59  /// needed, you might want to set BoltOptions.initial_map_size to a large enough value
60  /// to avoid potential blocking of write transaction.
61  ///
62  /// IMPORTANT: You must drop the read-only transactions after you are finished or
63  /// else the database will not reclaim old pages.
64  ///
65  /// ```rust
66  /// use bbolt_rs::*;
67  ///
68  /// fn main() -> Result<()> {
69  ///   let mut db = Bolt::open_mem()?;
70  ///
71  ///   db.update(|mut tx| {
72  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
73  ///     b.put("key", "value")?;
74  ///     Ok(())
75  ///   })?;
76  ///
77  ///   let tx = db.begin()?;
78  ///   let b = tx.bucket("test").unwrap();
79  ///   assert_eq!(Some(b"value".as_ref()), b.get("key"));
80  ///
81  ///   Ok(())
82  /// }
83  /// ```
84  fn begin(&self) -> crate::Result<impl TxApi>;
85
86  #[cfg(feature = "try-begin")]
87  fn try_begin(&self) -> crate::Result<Option<impl TxApi>>;
88
89  #[cfg(feature = "try-begin")]
90  fn try_begin_for(&self, duration: Duration) -> crate::Result<Option<impl TxApi>>;
91
92  #[cfg(feature = "try-begin")]
93  fn try_begin_until(&self, instant: Instant) -> crate::Result<Option<impl TxApi>>;
94
95  /// View executes a function within the context of a managed read-only transaction.
96  /// Any error that is returned from the function is returned from the View() method.
97  ///
98  /// ```rust
99  /// use bbolt_rs::*;
100  ///
101  /// fn main() -> Result<()> {
102  ///   let mut db = Bolt::open_mem()?;
103  ///
104  ///   db.update(|mut tx| {
105  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
106  ///     b.put("key", "value")?;
107  ///     Ok(())
108  ///   })?;
109  ///
110  ///   db.view(|tx| {
111  ///     let b = tx.bucket("test").unwrap();
112  ///     assert_eq!(Some(b"value".as_ref()), b.get("key"));
113  ///     Ok(())
114  ///   })?;
115  ///
116  ///   Ok(())
117  /// }
118  /// ```
119  fn view<'tx, F: FnMut(TxRef<'tx>) -> crate::Result<()>>(&'tx self, f: F) -> crate::Result<()>;
120
121  /// Stats retrieves ongoing performance stats for the database.
122  ///
123  /// This is only updated when a transaction closes.
124  fn stats(&self) -> Arc<DbStats>;
125
126  /// Returns the database's path
127  ///
128  /// ```rust
129  /// use bbolt_rs::*;
130  ///
131  /// fn main() -> Result<()> {
132  ///   let db = Bolt::open_mem()?;
133  ///
134  ///   assert_eq!(&DbPath::Memory, db.path());
135  ///   Ok(())
136  /// }
137  /// ```
138  fn path(&self) -> &DbPath;
139
140  /// Returns the database internal information
141  ///
142  /// ```rust
143  /// use bbolt_rs::*;
144  ///
145  /// fn main() -> Result<()> {
146  ///   let db = Bolt::open_mem()?;
147  ///
148  ///   assert_eq!(page_size::get(), db.info().page_size);
149  ///   Ok(())
150  /// }
151  /// ```
152  fn info(&self) -> DbInfo;
153
154  /// Close releases all database resources.
155  ///
156  /// It will block waiting for any open transactions to finish
157  /// before closing the database and returning.
158  ///
159  /// Once closed, other instances return [Error::DatabaseNotOpen]
160  ///
161  /// ```rust
162  /// use bbolt_rs::*;
163  ///
164  /// fn main() -> Result<()> {
165  ///   let mut db = Bolt::open_mem()?;
166  ///   let cloned = db.clone();
167  ///   db.update(|mut tx| {
168  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
169  ///     b.put("key", "value")?;
170  ///     Ok(())
171  ///   })?;
172  ///
173  ///   db.close();
174  ///   assert_eq!(Some(Error::DatabaseNotOpen), cloned.begin().err());
175  ///   Ok(())
176  /// }
177  /// ```
178  fn close(self);
179}
180
181/// RW DB API
182pub trait DbRwAPI: DbApi {
183  /// Starts a new transaction.
184  /// Multiple read-only transactions can be used concurrently but only one
185  /// write transaction can be used at a time. Starting multiple write transactions
186  /// will cause the calls to block and be serialized until the current write
187  /// transaction finishes.
188  ///
189  /// Transactions should not be dependent on one another. Opening a read
190  /// transaction and a write transaction in the same goroutine can cause the
191  /// writer to deadlock because the database periodically needs to re-mmap itself
192  /// as it grows and it cannot do that while a read transaction is open.
193  ///
194  /// If a long running read transaction (for example, a snapshot transaction) is
195  /// needed, you might want to set BoltOptions.initial_map_size to a large enough value
196  /// to avoid potential blocking of write transaction.
197  ///
198  /// Dropping the transaction will cause it to rollback.
199  ///
200  /// ```rust
201  /// use bbolt_rs::*;
202  ///
203  /// fn main() -> Result<()> {
204  ///   let mut db = Bolt::open_mem()?;
205  ///
206  ///   let mut tx = db.begin_rw()?;
207  ///   let mut b = tx.create_bucket_if_not_exists("test")?;
208  ///   b.put("key", "value")?;
209  ///   tx.commit()?;
210  ///
211  ///   db.view(|tx| {
212  ///     let b = tx.bucket("test").unwrap();
213  ///     assert_eq!(Some(b"value".as_ref()), b.get("key"));
214  ///     Ok(())
215  ///   })?;
216  ///
217  ///   Ok(())
218  /// }
219  /// ```
220  fn begin_rw(&mut self) -> crate::Result<impl TxRwApi>;
221
222  #[cfg(feature = "try-begin")]
223  fn try_begin_rw(&self) -> crate::Result<Option<impl TxRwApi>>;
224
225  #[cfg(feature = "try-begin")]
226  fn try_begin_rw_for(&self, duration: Duration) -> crate::Result<Option<impl TxRwApi>>;
227
228  #[cfg(feature = "try-begin")]
229  fn try_begin_rw_until(&self, instant: Instant) -> crate::Result<Option<impl TxRwApi>>;
230
231  /// Executes a function within the context of a read-write managed transaction.
232  ///
233  /// If no error is returned from the function then the transaction is committed.
234  /// If an error is returned then the entire transaction is rolled back.
235  /// Any error that is returned from the function or returned from the commit is
236  /// returned from the Update() method.
237  ///
238  /// ```rust
239  /// use bbolt_rs::*;
240  ///
241  /// fn main() -> Result<()> {
242  ///   let mut db = Bolt::open_mem()?;
243  ///
244  ///   db.update(|mut tx| {
245  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
246  ///     b.put("key", "value")?;
247  ///     Ok(())
248  ///   })?;
249  ///
250  ///   db.view(|tx| {
251  ///     let b = tx.bucket("test").unwrap();
252  ///     assert_eq!(Some(b"value".as_ref()), b.get("key"));
253  ///     Ok(())
254  ///   })?;
255  ///
256  ///   Ok(())
257  /// }
258  /// ```
259  fn update<'tx, F: FnMut(TxRwRef<'tx>) -> crate::Result<()>>(
260    &'tx mut self, f: F,
261  ) -> crate::Result<()>;
262
263  /// Calls a function as part of a batch. It behaves similar to Update,
264  /// except:
265  ///
266  /// 1. concurrent Batch calls can be combined into a single Bolt
267  /// transaction.
268  ///
269  /// 2. the function passed to Batch may be called multiple times,
270  /// regardless of whether it returns error or not.
271  ///
272  /// This means that Batch function side effects must be idempotent and
273  /// take permanent effect only after a successful return is seen in
274  /// caller.
275  ///
276  /// The maximum batch size and delay can be adjusted with MaxBatchSize
277  /// and MaxBatchDelay, respectively.
278  ///
279  /// Batch is only useful when there are multiple threads calling it.
280  ///
281  /// ```rust
282  /// use bbolt_rs::*;
283  ///
284  /// fn main() -> Result<()> {
285  ///   let mut db = Bolt::open_mem()?;
286  ///
287  ///   db.batch(|mut tx| {
288  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
289  ///     b.put("key", "value")?;
290  ///     Ok(())
291  ///   })?;
292  ///
293  ///   db.view(|tx| {
294  ///     let b = tx.bucket("test").unwrap();
295  ///     assert_eq!(Some(b"value".as_ref()), b.get("key"));
296  ///     Ok(())
297  ///   })?;
298  ///
299  ///   Ok(())
300  /// }
301  /// ```
302  fn batch<F>(&mut self, f: F) -> crate::Result<()>
303  where
304    F: FnMut(&mut TxRwRef) -> crate::Result<()> + Send + Sync + Clone + 'static;
305
306  /// Executes fdatasync() against the database file handle.
307  ///
308  /// This is not necessary under normal operation, however, if you use NoSync
309  /// then it allows you to force the database file to sync against the disk.
310  /// ```rust
311  /// use bbolt_rs::*;
312  ///
313  /// fn main() -> Result<()> {
314  ///   let mut db = Bolt::open_mem()?;
315  ///
316  ///   db.batch(|mut tx| {
317  ///     let mut b = tx.create_bucket_if_not_exists("test")?;
318  ///     b.put("key", "value")?;
319  ///     Ok(())
320  ///   })?;
321  ///
322  ///   db.sync()?;
323  ///
324  ///   Ok(())
325  /// }
326  /// ```
327  fn sync(&mut self) -> crate::Result<()>;
328}
329
330#[derive(Default)]
331/// Stats represents statistics about the database.
332pub struct DbStats {
333  /// global, ongoing stats.
334  tx_stats: TxStats,
335
336  // Freelist stats
337  /// total number of free pages on the freelist
338  free_page_n: AtomicI64,
339  /// total number of pending pages on the freelist
340  pending_page_n: AtomicI64,
341  /// total bytes allocated in free pages
342  free_alloc: AtomicI64,
343  /// total bytes used by the freelist
344  free_list_in_use: AtomicI64,
345
346  // transaction stats
347  /// total number of started read transactions
348  tx_n: AtomicI64,
349  /// number of currently open read transactions
350  open_tx_n: AtomicI64,
351}
352
353impl DbStats {
354  /// global, ongoing stats.
355  pub fn tx_stats(&self) -> &TxStats {
356    &self.tx_stats
357  }
358
359  /// total number of free pages on the freelist
360  pub fn free_page_n(&self) -> i64 {
361    self.free_page_n.load(Ordering::Acquire)
362  }
363
364  pub(crate) fn set_free_page_n(&self, value: i64) {
365    self.free_page_n.store(value, Ordering::Release);
366  }
367
368  /// total number of pending pages on the freelist
369  pub fn pending_page_n(&self) -> i64 {
370    self.pending_page_n.load(Ordering::Acquire)
371  }
372
373  pub(crate) fn set_pending_page_n(&self, value: i64) {
374    self.pending_page_n.store(value, Ordering::Release);
375  }
376
377  /// total bytes allocated in free pages
378  pub fn free_alloc(&self) -> i64 {
379    self.free_alloc.load(Ordering::Acquire)
380  }
381
382  pub(crate) fn set_free_alloc(&self, value: i64) {
383    self.free_alloc.store(value, Ordering::Release);
384  }
385
386  /// total bytes used by the freelist
387  pub fn free_list_in_use(&self) -> i64 {
388    self.free_list_in_use.load(Ordering::Acquire)
389  }
390
391  pub(crate) fn set_free_list_in_use(&self, value: i64) {
392    self.free_list_in_use.store(value, Ordering::Release);
393  }
394
395  /// total number of started read transactions
396  pub fn tx_n(&self) -> i64 {
397    self.tx_n.load(Ordering::Acquire)
398  }
399
400  pub(crate) fn inc_tx_n(&self, delta: i64) {
401    self.tx_n.fetch_add(delta, Ordering::Relaxed);
402  }
403
404  /// number of currently open read transactions
405  pub fn open_tx_n(&self) -> i64 {
406    self.open_tx_n.load(Ordering::Acquire)
407  }
408
409  pub(crate) fn sub(&self, rhs: &DbStats) -> DbStats {
410    let diff = self.clone();
411    diff.inc_tx_n(-rhs.tx_n());
412    diff.tx_stats.sub_assign(&rhs.tx_stats);
413    diff
414  }
415}
416
417impl Clone for DbStats {
418  fn clone(&self) -> Self {
419    DbStats {
420      tx_stats: self.tx_stats.clone(),
421      free_page_n: self.free_page_n().into(),
422      pending_page_n: self.pending_page_n().into(),
423      free_alloc: self.free_alloc().into(),
424      free_list_in_use: self.free_list_in_use().into(),
425      tx_n: self.tx_n().into(),
426      open_tx_n: self.open_tx_n().into(),
427    }
428  }
429}
430
431pub struct DbState {
432  txs: Vec<TxId>,
433  rwtx: Option<TxId>,
434  is_open: bool,
435  current_meta: Meta,
436}
437
438impl DbState {
439  fn new(current_meta: Meta) -> DbState {
440    DbState {
441      txs: vec![],
442      rwtx: None,
443      is_open: true,
444      current_meta,
445    }
446  }
447}
448
449fn mmap_size(page_size: usize, size: u64) -> crate::Result<u64> {
450  for i in 15..=30usize {
451    if size <= 1 << i {
452      return Ok(1 << i);
453    }
454  }
455  if size > MAX_MAP_SIZE.bytes() as u64 {
456    return Err(Error::MMapTooLarge);
457  }
458
459  let mut sz = size;
460  let remainder = sz % MAX_MMAP_STEP.bytes() as u64;
461  if remainder > 0 {
462    sz += MAX_MMAP_STEP.bytes() as u64 - remainder;
463  }
464
465  let ps = page_size as u64;
466  if sz % ps != 0 {
467    sz = ((sz / ps) + 1) * ps;
468  }
469
470  if sz > MAX_MAP_SIZE.bytes() as u64 {
471    sz = MAX_MAP_SIZE.bytes() as u64;
472  }
473
474  Ok(sz)
475}
476
477/// Database path
478#[derive(Clone, PartialOrd, PartialEq, Ord, Eq, Debug)]
479pub enum DbPath {
480  Memory,
481  FilePath(PathBuf),
482}
483
484impl DbPath {
485  pub fn file_path(&self) -> Option<&Path> {
486    match self {
487      DbPath::Memory => None,
488      DbPath::FilePath(p) => Some(p),
489    }
490  }
491}
492
493/// Database information
494#[derive(Clone, Debug)]
495pub struct DbInfo {
496  pub page_size: usize,
497}
498
499pub(crate) trait DBBackend: Send + Sync {
500  fn page_size(&self) -> usize;
501  fn data_size(&self) -> u64;
502
503  fn validate_meta(&self) -> crate::Result<()> {
504    let meta0 = self.meta0();
505    let meta1 = self.meta1();
506    if let (Err(error), Err(_)) = (meta0.meta.validate(), meta1.meta.validate()) {
507      return Err(error);
508    }
509    Ok(())
510  }
511
512  fn meta(&self) -> Meta {
513    let meta0 = self.meta0();
514    let meta1 = self.meta1();
515    let (meta_a, meta_b) = {
516      if meta1.meta.txid() > meta0.meta.txid() {
517        (meta1.meta, meta0.meta)
518      } else {
519        (meta0.meta, meta1.meta)
520      }
521    };
522    if meta_a.validate().is_ok() {
523      return meta_a;
524    } else if meta_b.validate().is_ok() {
525      return meta_b;
526    }
527    panic!("bolt.db.meta: invalid meta page")
528  }
529
530  fn meta0(&self) -> MappedMetaPage;
531
532  fn meta1(&self) -> MappedMetaPage;
533
534  fn page<'tx>(&self, pg_id: PgId) -> RefPage<'tx>;
535
536  /// grow grows the size of the database to the given `size`.
537  fn grow(&self, size: u64) -> crate::Result<()>;
538
539  /// mmap opens the underlying memory-mapped file and initializes the meta references.
540  /// min_size is the minimum size that the new mmap can be.
541  fn mmap(&mut self, min_size: u64, tx: TxCell) -> crate::Result<()>;
542
543  fn fsync(&self) -> crate::Result<()>;
544  fn write_all_at(&self, buffer: &[u8], offset: u64) -> crate::Result<usize>;
545
546  fn freelist(&self) -> MutexGuard<Freelist>;
547}
548
549struct ClosedBackend {}
550
551impl DBBackend for ClosedBackend {
552  fn page_size(&self) -> usize {
553    unreachable!()
554  }
555
556  fn data_size(&self) -> u64 {
557    unreachable!()
558  }
559
560  fn meta0(&self) -> MappedMetaPage {
561    unreachable!()
562  }
563
564  fn meta1(&self) -> MappedMetaPage {
565    unreachable!()
566  }
567
568  fn page<'tx>(&self, _pg_id: PgId) -> RefPage<'tx> {
569    unreachable!()
570  }
571
572  fn grow(&self, _size: u64) -> crate::Result<()> {
573    unreachable!()
574  }
575
576  fn mmap(&mut self, _min_size: u64, _tx: TxCell) -> crate::Result<()> {
577    unreachable!()
578  }
579
580  fn fsync(&self) -> crate::Result<()> {
581    unreachable!()
582  }
583
584  fn write_all_at(&self, _buffer: &[u8], _offset: u64) -> crate::Result<usize> {
585    unreachable!()
586  }
587
588  fn freelist(&self) -> MutexGuard<Freelist> {
589    unreachable!()
590  }
591}
592
593struct MemBackend {
594  mmap: Mutex<AlignedBytes<alignment::Page>>,
595  freelist: OnceLock<Mutex<Freelist>>,
596  page_size: usize,
597  alloc_size: u64,
598  /// current on disk file size
599  file_size: u64,
600  data_size: u64,
601}
602
603unsafe impl Send for MemBackend {}
604unsafe impl Sync for MemBackend {}
605
606impl DBBackend for MemBackend {
607  fn page_size(&self) -> usize {
608    self.page_size
609  }
610
611  fn data_size(&self) -> u64 {
612    self.data_size
613  }
614
615  fn meta0(&self) -> MappedMetaPage {
616    // Safe because we will never actually mutate this ptr
617    unsafe { MappedMetaPage::new(self.mmap.lock().as_ptr().cast_mut()) }
618  }
619
620  fn meta1(&self) -> MappedMetaPage {
621    // Safe because we will never actually mutate this ptr
622    unsafe { MappedMetaPage::new(self.mmap.lock().as_ptr().add(self.page_size).cast_mut()) }
623  }
624
625  fn page<'tx>(&self, pg_id: PgId) -> RefPage<'tx> {
626    let mmap = self.mmap.lock();
627    debug_assert!(((pg_id.0 as usize + 1) * self.page_size) <= mmap.len());
628    unsafe { RefPage::new(mmap.as_ptr().byte_add(pg_id.0 as usize * self.page_size)) }
629  }
630
631  fn grow(&self, size: u64) -> crate::Result<()> {
632    let mut mmap = self.mmap.lock();
633    if size <= mmap.len() as u64 {
634      return Ok(());
635    }
636    let mut new_mmap = AlignedBytes::new_zeroed(size as usize);
637    new_mmap[0..mmap.len()].copy_from_slice(&mmap);
638    *mmap = new_mmap;
639    Ok(())
640  }
641
642  //TODO: This is a mess, too
643  fn mmap(&mut self, min_size: u64, _tx: TxCell) -> crate::Result<()> {
644    let mut size = {
645      let mmap = self.mmap.lock();
646      if mmap.len() < self.page_size * 2 {
647        return Err(Error::MMapTooSmall(mmap.len() as u64));
648      }
649      (mmap.len() as u64).max(min_size)
650    };
651    size = mmap_size(self.page_size, size)?;
652    self.validate_meta()?;
653
654    self.data_size = size;
655    Ok(())
656  }
657
658  fn fsync(&self) -> crate::Result<()> {
659    Ok(())
660  }
661
662  fn write_all_at(&self, buffer: &[u8], offset: u64) -> crate::Result<usize> {
663    let mut mmap = self.mmap.lock();
664    let write_to = &mut mmap[offset as usize..offset as usize + buffer.len()];
665    write_to.copy_from_slice(buffer);
666    let written = write_to.len();
667    Ok(written)
668  }
669
670  fn freelist(&self) -> MutexGuard<Freelist> {
671    self
672      .freelist
673      .get_or_init(|| {
674        let meta = self.meta();
675        let freelist_pgid = meta.free_list();
676        let refpage = self.page(freelist_pgid);
677        let freelist_page = MappedFreeListPage::coerce_ref(&refpage).unwrap();
678        let freelist = freelist_page.read();
679        Mutex::new(freelist)
680      })
681      .lock()
682  }
683}
684
685struct FileState {
686  file: File,
687  /// current on disk file size
688  file_size: u64,
689}
690
691impl Deref for FileState {
692  type Target = File;
693
694  fn deref(&self) -> &Self::Target {
695    &self.file
696  }
697}
698
699impl DerefMut for FileState {
700  fn deref_mut(&mut self) -> &mut Self::Target {
701    &mut self.file
702  }
703}
704
705pub struct FileBackend {
706  path: Arc<PathBuf>,
707  file: Mutex<FileState>,
708  page_size: usize,
709  mmap: Option<MmapRaw>,
710  freelist: OnceLock<Mutex<Freelist>>,
711  alloc_size: u64,
712  data_size: u64,
713  use_mlock: bool,
714  grow_async: bool,
715  read_only: bool,
716}
717
718impl FileBackend {
719  fn invalidate(&mut self) {
720    self.data_size = 0;
721  }
722
723  fn munmap(&mut self) -> crate::Result<()> {
724    self.mmap = None;
725    self.invalidate();
726    Ok(())
727  }
728
729  fn has_synced_free_list(&self) -> bool {
730    self.meta().free_list() != PGID_NO_FREE_LIST
731  }
732
733  fn mmap_unlock(&mut self) -> crate::Result<()> {
734    if let Some(mmap) = &mut self.mmap {
735      mmap.unlock()?;
736    }
737    Ok(())
738  }
739
740  fn mmap_lock(&mut self) -> crate::Result<()> {
741    if let Some(mmap) = &mut self.mmap {
742      mmap.lock()?;
743    }
744    Ok(())
745  }
746
747  fn mmap_relock(&mut self) -> crate::Result<()> {
748    self.mmap_unlock()?;
749    self.mmap_lock()?;
750    Ok(())
751  }
752
753  fn get_page_size(file: &mut File) -> crate::Result<usize> {
754    // Read the first meta page to determine the page size.
755    let meta0_can_read = match Self::get_page_size_from_first_meta(file) {
756      Ok(page_size) => return Ok(page_size),
757      // We cannot read the page size from page 0, but can read page 0.
758      Err(Error::InvalidDatabase(meta_can_read)) => meta_can_read,
759      Err(e) => return Err(e),
760    };
761
762    // Read the second meta page to determine the page size.
763    let meta1_can_read = match Self::get_page_size_from_second_meta(file) {
764      Ok(page_size) => return Ok(page_size),
765      // We cannot read the page size from page 1, but can read page 1.
766      Err(Error::InvalidDatabase(meta_can_read)) => meta_can_read,
767      Err(e) => return Err(e),
768    };
769
770    // If we can't read the page size from both pages, but can read
771    // either page, then we assume it's the same as the OS or the one
772    // given, since that's how the page size was chosen in the first place.
773    //
774    // If both pages are invalid, and (this OS uses a different page size
775    // from what the database was created with or the given page size is
776    // different from what the database was created with), then we are out
777    // of luck and cannot access the database.
778    if meta0_can_read || meta1_can_read {
779      return Ok(DEFAULT_PAGE_SIZE.bytes() as usize);
780    }
781    Err(Error::InvalidDatabase(false))
782  }
783
784  //TODO: These can be done better
785  fn get_page_size_from_first_meta(file: &mut File) -> crate::Result<usize> {
786    // we need this aligned to Page so we don't hit any runtime issues
787    let mut buffer = AlignedBytes::<alignment::Page>::new_zeroed(4096);
788    let refpage = RefPage::new(buffer.as_ptr());
789    let mut meta_can_read = false;
790    let bw = file
791      .seek(SeekFrom::Start(0))
792      .and_then(|_| file.read(&mut buffer))
793      .map_err(|_| Error::InvalidDatabase(meta_can_read))?;
794    if bw == buffer.len() {
795      meta_can_read = true;
796      if let Some(meta_page) = MappedMetaPage::coerce_ref(&refpage) {
797        if meta_page.meta.validate().is_ok() {
798          let page_size = meta_page.meta.page_size();
799          return Ok(page_size as usize);
800        }
801      }
802    }
803    Err(Error::InvalidDatabase(meta_can_read))
804  }
805
806  fn get_page_size_from_second_meta(file: &mut File) -> crate::Result<usize> {
807    let mut meta_can_read = false;
808    let metadata = file.metadata()?;
809    let file_size = metadata.len();
810    // we need this aligned to Page so we don't hit any runtime issues
811    let mut buffer = AlignedBytes::<alignment::Page>::new_zeroed(4096);
812    for i in 0..15u64 {
813      let pos = 1024u64 << i;
814      if file_size < 1024 || pos >= file_size - 1024 {
815        break;
816      }
817      let bw = file
818        .seek(SeekFrom::Start(pos))
819        .and_then(|_| file.read(&mut buffer))
820        .map_err(|_| Error::InvalidDatabase(meta_can_read))? as u64;
821      if bw == buffer.len() as u64 || bw == file_size - pos {
822        meta_can_read = true;
823        if let Some(meta_page) = MappedMetaPage::coerce_ref(&RefPage::new(buffer.as_ptr())) {
824          if meta_page.meta.validate().is_ok() {
825            return Ok(meta_page.meta.page_size() as usize);
826          }
827        }
828      }
829      // reset the buffer
830      buffer.fill(0);
831    }
832    Err(Error::InvalidDatabase(meta_can_read))
833  }
834
835  pub(crate) fn file_size(&self) -> crate::Result<u64> {
836    let file_lock = self.file.lock();
837    let info = file_lock.metadata()?;
838    let size = info.len();
839    if size < (self.page_size * 2) as u64 {
840      return Err(Error::FileSizeTooSmall(size));
841    }
842    Ok(size)
843  }
844}
845
846impl DBBackend for FileBackend {
847  fn page_size(&self) -> usize {
848    self.page_size
849  }
850
851  fn data_size(&self) -> u64 {
852    self.data_size
853  }
854
855  fn meta0(&self) -> MappedMetaPage {
856    self
857      .mmap
858      .as_ref()
859      .map(|mmap| unsafe { MappedMetaPage::new(mmap.as_mut_ptr()) })
860      .unwrap()
861  }
862
863  fn meta1(&self) -> MappedMetaPage {
864    self
865      .mmap
866      .as_ref()
867      .map(|mmap| unsafe { MappedMetaPage::new(mmap.as_mut_ptr().add(self.page_size)) })
868      .unwrap()
869  }
870
871  fn page<'tx>(&self, pg_id: PgId) -> RefPage<'tx> {
872    let page_addr = pg_id.0 as usize * self.page_size;
873    let page_ptr = unsafe { self.mmap.as_ref().unwrap().as_ptr().add(page_addr) };
874    RefPage::new(page_ptr)
875  }
876
877  fn grow(&self, mut size: u64) -> crate::Result<()> {
878    // Ignore if the new size is less than available file size.
879    let file_size = self.file.lock().file_size;
880    if size <= file_size {
881      return Ok(());
882    }
883    // If the data is smaller than the alloc size then only allocate what's needed.
884    // Once it goes over the allocation size then allocate in chunks.
885    if self.data_size <= self.alloc_size {
886      size = self.data_size;
887    } else {
888      size += self.alloc_size;
889    }
890
891    // Truncate and fsync to ensure file size metadata is flushed.
892    // https://github.com/boltdb/bolt/issues/284
893    if self.grow_async && !self.read_only {
894      let file_lock = self.file.lock();
895      #[cfg(mlock_supported)]
896      if self.use_mlock {
897        self.mmap.as_ref().unwrap().unlock()?;
898      }
899      if cfg!(not(target_os = "windows")) {
900        file_lock.set_len(size)?;
901      }
902      file_lock.sync_all()?;
903      #[cfg(mlock_supported)]
904      if self.use_mlock {
905        self.mmap.as_ref().unwrap().lock()?;
906      }
907    }
908
909    // TODO: This is overkill. Move file_size behind the file mutex
910    self.file.lock().file_size = size;
911    Ok(())
912  }
913
914  /// mmap opens the underlying memory-mapped file and initializes the meta references.
915  /// min_size is the minimum size that the new mmap can be.
916  fn mmap(&mut self, min_size: u64, tx: TxCell) -> crate::Result<()> {
917    let file_size = self.file_size()?;
918    let mut size = file_size.max(min_size);
919
920    size = mmap_size(self.page_size, size)?;
921    if let Some(mmap) = self.mmap.take() {
922      #[cfg(mlock_supported)]
923      if self.use_mlock {
924        mmap.unlock()?;
925      }
926      tx.cell.bound().own_in();
927    }
928
929    let file_lock = self.file.lock();
930    let mmap = MmapOptions::new()
931      .len(size as usize)
932      .map_raw(&**file_lock)?;
933    #[cfg(mlock_supported)]
934    if self.use_mlock {
935      mmap.lock()?;
936    }
937    #[cfg(mmap_advise_supported)]
938    mmap.advise(Advice::Random)?;
939
940    self.mmap = Some(mmap);
941
942    let r0 = self.meta0().meta.validate();
943    let r1 = self.meta1().meta.validate();
944
945    if r0.is_err() && r1.is_err() {
946      return r0;
947    }
948
949    self.data_size = size;
950    Ok(())
951  }
952
953  fn fsync(&self) -> crate::Result<()> {
954    self.file.lock().sync_all().map_err(Error::IO)
955  }
956
957  // TODO: take all of the pages and handle it here
958  fn write_all_at(&self, buffer: &[u8], offset: u64) -> crate::Result<usize> {
959    let mut file_lock = self.file.lock();
960    file_lock.seek(SeekFrom::Start(offset)).map_err(Error::IO)?;
961    file_lock
962      .write_all(buffer)
963      .map_err(Error::IO)
964      .map(|_| buffer.len())
965  }
966
967  fn freelist(&self) -> MutexGuard<Freelist> {
968    self
969      .freelist
970      .get_or_init(|| {
971        let meta = self.meta();
972        let freelist_pgid = meta.free_list();
973        let refpage = self.page(freelist_pgid);
974        let freelist_page = MappedFreeListPage::coerce_ref(&refpage).unwrap();
975        let freelist = freelist_page.read();
976        Mutex::new(freelist)
977      })
978      .lock()
979  }
980}
981
982impl Drop for FileBackend {
983  fn drop(&mut self) {
984    if !self.read_only {
985      match self.file.lock().unlock() {
986        Ok(_) => {}
987        // TODO: log error
988        Err(_) => {
989          todo!("log unlock error")
990        }
991      }
992    }
993  }
994}
995
996pub(crate) enum AllocateResult<'tx> {
997  Page(SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>),
998  PageWithNewSize(SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>, u64),
999}
1000
1001pub(crate) trait DbIApi<'tx>: 'tx {
1002  fn page(&self, pg_id: PgId) -> RefPage<'tx>;
1003
1004  fn is_page_free(&self, pg_id: PgId) -> bool;
1005
1006  fn remove_tx(&self, rem_tx: TxId, tx_stats: Arc<TxStats>);
1007  fn allocate(&self, tx: TxCell, page_count: u64) -> AllocateResult<'tx>;
1008
1009  fn free_page(&self, txid: TxId, p: &PageHeader);
1010  fn free_pages(&self, state: &mut DbState);
1011
1012  fn freelist_count(&self) -> u64;
1013
1014  fn freelist_copyall(&self, all: &mut BVec<PgId>);
1015
1016  fn commit_freelist(&self, tx: TxCell<'tx>) -> crate::Result<AllocateResult<'tx>>;
1017
1018  fn write_all_at(&self, buf: &[u8], offset: u64) -> crate::Result<usize>;
1019
1020  fn fsync(&self) -> crate::Result<()>;
1021  fn repool_allocated(&self, page: AlignedBytes<alignment::Page>);
1022
1023  fn remove_rw_tx(&self, tx_closing_state: TxClosingState, rem_tx: TxId, tx_stats: Arc<TxStats>);
1024
1025  fn grow(&self, size: u64) -> crate::Result<()>;
1026}
1027pub(crate) trait DbMutIApi<'tx>: DbIApi<'tx> {
1028  fn mmap_to_new_size(&mut self, min_size: u64, tx: TxCell) -> crate::Result<()>;
1029}
1030
1031impl<'tx> DbIApi<'tx> for LockGuard<'tx, DbShared> {
1032  fn page(&self, pg_id: PgId) -> RefPage<'tx> {
1033    match self {
1034      LockGuard::R(guard) => guard.page(pg_id),
1035      LockGuard::U(guard) => guard.borrow().page(pg_id),
1036    }
1037  }
1038
1039  fn is_page_free(&self, pg_id: PgId) -> bool {
1040    match self {
1041      LockGuard::R(guard) => guard.is_page_free(pg_id),
1042      LockGuard::U(guard) => guard.borrow().is_page_free(pg_id),
1043    }
1044  }
1045
1046  fn remove_tx(&self, rem_tx: TxId, tx_stats: Arc<TxStats>) {
1047    match self {
1048      LockGuard::R(guard) => guard.remove_tx(rem_tx, tx_stats),
1049      LockGuard::U(guard) => guard.borrow().remove_tx(rem_tx, tx_stats),
1050    }
1051  }
1052
1053  fn allocate(&self, tx: TxCell, page_count: u64) -> AllocateResult<'tx> {
1054    match self {
1055      LockGuard::R(guard) => guard.allocate(tx, page_count),
1056      LockGuard::U(guard) => guard.borrow().allocate(tx, page_count),
1057    }
1058  }
1059
1060  fn free_page(&self, txid: TxId, p: &PageHeader) {
1061    match self {
1062      LockGuard::R(guard) => guard.free_page(txid, p),
1063      LockGuard::U(guard) => guard.borrow().free_page(txid, p),
1064    }
1065  }
1066
1067  fn free_pages(&self, state: &mut DbState) {
1068    match self {
1069      LockGuard::R(guard) => guard.free_pages(state),
1070      LockGuard::U(guard) => guard.borrow().free_pages(state),
1071    }
1072  }
1073
1074  fn freelist_count(&self) -> u64 {
1075    match self {
1076      LockGuard::R(guard) => guard.freelist_count(),
1077      LockGuard::U(guard) => guard.borrow().freelist_count(),
1078    }
1079  }
1080
1081  fn freelist_copyall(&self, all: &mut BVec<PgId>) {
1082    match self {
1083      LockGuard::R(guard) => guard.freelist_copyall(all),
1084      LockGuard::U(guard) => guard.borrow().freelist_copyall(all),
1085    }
1086  }
1087
1088  fn commit_freelist(&self, tx: TxCell<'tx>) -> crate::Result<AllocateResult<'tx>> {
1089    match self {
1090      LockGuard::R(guard) => guard.commit_freelist(tx),
1091      LockGuard::U(guard) => guard.borrow().commit_freelist(tx),
1092    }
1093  }
1094
1095  fn write_all_at(&self, buf: &[u8], offset: u64) -> crate::Result<usize> {
1096    match self {
1097      LockGuard::R(guard) => guard.write_all_at(buf, offset),
1098      LockGuard::U(guard) => guard.borrow().write_all_at(buf, offset),
1099    }
1100  }
1101
1102  fn fsync(&self) -> crate::Result<()> {
1103    match self {
1104      LockGuard::R(guard) => guard.fsync(),
1105      LockGuard::U(guard) => guard.borrow().fsync(),
1106    }
1107  }
1108
1109  fn repool_allocated(&self, page: AlignedBytes<alignment::Page>) {
1110    match self {
1111      LockGuard::R(guard) => guard.repool_allocated(page),
1112      LockGuard::U(guard) => guard.borrow().repool_allocated(page),
1113    }
1114  }
1115
1116  fn remove_rw_tx(&self, tx_closing_state: TxClosingState, rem_tx: TxId, tx_stats: Arc<TxStats>) {
1117    match self {
1118      LockGuard::R(guard) => guard.remove_rw_tx(tx_closing_state, rem_tx, tx_stats),
1119      LockGuard::U(guard) => guard
1120        .borrow()
1121        .remove_rw_tx(tx_closing_state, rem_tx, tx_stats),
1122    }
1123  }
1124
1125  fn grow(&self, size: u64) -> crate::Result<()> {
1126    match self {
1127      LockGuard::R(guard) => guard.grow(size),
1128      LockGuard::U(guard) => guard.borrow().grow(size),
1129    }
1130  }
1131}
1132
1133// In theory things are wired up ok. Here's hoping Miri is happy
1134pub struct DbShared {
1135  pub(crate) stats: Arc<DbStats>,
1136  pub(crate) db_state: Arc<Mutex<DbState>>,
1137  page_pool: Mutex<Vec<AlignedBytes<alignment::Page>>>,
1138  pub(crate) backend: Box<dyn DBBackend>,
1139  pub(crate) options: BoltOptions,
1140}
1141
1142// Safe because this is all protected by RwLock
1143unsafe impl Sync for DbShared {}
1144unsafe impl Send for DbShared {}
1145
1146impl<'tx> DbIApi<'tx> for DbShared {
1147  fn page(&self, pg_id: PgId) -> RefPage<'tx> {
1148    self.backend.page(pg_id)
1149  }
1150
1151  fn is_page_free(&self, pg_id: PgId) -> bool {
1152    self.backend.freelist().freed(pg_id)
1153  }
1154
1155  fn remove_tx(&self, rem_tx: TxId, tx_stats: Arc<TxStats>) {
1156    let mut records = self.db_state.lock();
1157    if let Some(pos) = records.txs.iter().position(|tx| *tx == rem_tx) {
1158      records.txs.swap_remove(pos);
1159    }
1160
1161    let n = records.txs.len();
1162    self.stats.open_tx_n.store(n as i64, Ordering::Release);
1163    self.stats.tx_stats.add_assign(&tx_stats);
1164  }
1165
1166  fn allocate(&self, tx: TxCell, page_count: u64) -> AllocateResult<'tx> {
1167    let tx_id = tx.api_id();
1168    let high_water = tx.meta().pgid();
1169    let bytes = if page_count == 1 && !self.page_pool.lock().is_empty() {
1170      let mut page = self.page_pool.lock().pop().unwrap();
1171      page.fill(0);
1172      page
1173    } else {
1174      AlignedBytes::new_zeroed(page_count as usize * self.backend.page_size())
1175    };
1176
1177    //TODO: This should reside in tx.allocate
1178    {
1179      let tx = tx.cell.borrow();
1180      let stats = tx.r.stats.as_ref().unwrap();
1181      stats.inc_page_count(page_count as i64);
1182      stats.inc_page_alloc((page_count * tx.r.meta.page_size() as u64) as i64);
1183    }
1184
1185    let mut mut_page = SelfOwned::new_with_map(bytes, |b| MutPage::new(b.as_mut_ptr()));
1186    mut_page.overflow = (page_count - 1) as u32;
1187
1188    if let Some(pid) = self.backend.freelist().allocate(tx_id, page_count) {
1189      mut_page.id = pid;
1190      return AllocateResult::Page(mut_page);
1191    }
1192
1193    // Resize mmap() if we're at the end.
1194    mut_page.id = high_water;
1195    let min_size = (high_water.0 + page_count + 1) * self.backend.page_size() as u64;
1196    tx.split_r_mut().meta.set_pgid(high_water + page_count);
1197    if min_size > self.backend.data_size() {
1198      AllocateResult::PageWithNewSize(mut_page, min_size)
1199    } else {
1200      AllocateResult::Page(mut_page)
1201    }
1202  }
1203
1204  fn free_page(&self, txid: TxId, p: &PageHeader) {
1205    self.backend.freelist().free(txid, p)
1206  }
1207
1208  fn free_pages(&self, state: &mut DbState) {
1209    let mut freelist = self.backend.freelist();
1210    // Free all pending pages prior to earliest open transaction.
1211
1212    state.txs.sort();
1213    let mut min_id = TxId(0xFFFFFFFFFFFFFFFF);
1214    if !state.txs.is_empty() {
1215      min_id = *state.txs.first().unwrap();
1216    }
1217    if min_id.0 > 0 {
1218      freelist.release(min_id - 1);
1219    }
1220
1221    // Release unused txid extents.
1222    for t in &state.txs {
1223      freelist.release_range(min_id, *t - 1);
1224      min_id = *t + 1;
1225    }
1226    freelist.release_range(min_id, TxId(0xFFFFFFFFFFFFFFFF));
1227    // Any page both allocated and freed in an extent is safe to release.
1228  }
1229
1230  fn freelist_count(&self) -> u64 {
1231    self.backend.freelist().count()
1232  }
1233
1234  fn freelist_copyall(&self, all: &mut BVec<PgId>) {
1235    self.backend.freelist().copy_all(all)
1236  }
1237
1238  fn commit_freelist(&self, tx: TxCell<'tx>) -> crate::Result<AllocateResult<'tx>> {
1239    // Allocate new pages for the new free list. This will overestimate
1240    // the size of the freelist but not underestimate the size (which would be bad).
1241    let count = {
1242      let page_size = self.backend.page_size();
1243      let freelist_size = self.backend.freelist().size();
1244      (freelist_size / page_size as u64) + 1
1245    };
1246
1247    let mut freelist_page = self.allocate(tx, count);
1248    {
1249      let page = match &mut freelist_page {
1250        AllocateResult::Page(page) => page,
1251        AllocateResult::PageWithNewSize(page, _) => page,
1252      };
1253      self
1254        .backend
1255        .freelist()
1256        .write(MappedFreeListPage::mut_into(page))
1257    }
1258
1259    Ok(freelist_page)
1260  }
1261
1262  fn write_all_at(&self, buf: &[u8], offset: u64) -> crate::Result<usize> {
1263    self.backend.write_all_at(buf, offset)
1264  }
1265
1266  fn fsync(&self) -> crate::Result<()> {
1267    self.backend.fsync()
1268  }
1269
1270  fn repool_allocated(&self, page: AlignedBytes<alignment::Page>) {
1271    self.page_pool.lock().push(page);
1272  }
1273
1274  fn remove_rw_tx(&self, tx_closing_state: TxClosingState, rem_tx: TxId, tx_stats: Arc<TxStats>) {
1275    let mut state = self.db_state.lock();
1276
1277    let page_size = self.backend.page_size();
1278    let mut freelist = self.backend.freelist();
1279    if tx_closing_state.is_rollback() {
1280      freelist.rollback(rem_tx);
1281      if tx_closing_state.is_physical_rollback() {
1282        let freelist_page_id = self.backend.meta().free_list();
1283        let freelist_page_ref = self.backend.page(freelist_page_id);
1284        let freelist_page = MappedFreeListPage::coerce_ref(&freelist_page_ref).unwrap();
1285        freelist.reload(freelist_page);
1286      }
1287    }
1288
1289    let free_list_free_n = freelist.free_count();
1290    let free_list_pending_n = freelist.pending_count();
1291    let free_list_alloc = freelist.size();
1292
1293    let new_meta = self.backend.meta();
1294    state.current_meta = new_meta;
1295
1296    state.rwtx = None;
1297
1298    self.stats.set_free_page_n(free_list_free_n as i64);
1299    self.stats.set_pending_page_n(free_list_pending_n as i64);
1300    self
1301      .stats
1302      .set_free_alloc(((free_list_free_n + free_list_pending_n) * page_size as u64) as i64);
1303    self.stats.set_free_list_in_use(free_list_alloc as i64);
1304    self.stats.tx_stats.add_assign(&tx_stats);
1305  }
1306
1307  fn grow(&self, size: u64) -> crate::Result<()> {
1308    self.backend.grow(size)
1309  }
1310}
1311
1312impl<'tx> DbMutIApi<'tx> for DbShared {
1313  fn mmap_to_new_size(&mut self, min_size: u64, tx: TxCell) -> crate::Result<()> {
1314    self.backend.as_mut().mmap(min_size, tx)
1315  }
1316}
1317
1318/// Database options
1319#[derive(Clone, Default, Debug, PartialEq, Eq, TypedBuilder)]
1320#[builder(doc)]
1321pub struct BoltOptions {
1322  // TODO: How do we handle this?
1323  #[cfg(timeout_supported)]
1324  #[builder(
1325    default,
1326    setter(
1327      strip_option,
1328      skip,
1329      doc = "Timeout is the amount of time to wait to obtain a file lock. \
1330    When set to zero it will wait indefinitely."
1331    )
1332  )]
1333  timeout: Option<Duration>,
1334  #[builder(
1335    default,
1336    setter(
1337      skip,
1338      doc = "Sets the DB.NoGrowSync flag before memory mapping the file."
1339    )
1340  )]
1341  no_grow_sync: bool,
1342  // TODO: How do we handle this?
1343  #[builder(
1344    default,
1345    setter(
1346      skip,
1347      doc = "Do not sync freelist to disk.\
1348    This improves the database write performance under normal operation,\
1349    but requires a full database re-sync during recovery."
1350    )
1351  )]
1352  no_freelist_sync: bool,
1353  #[builder(setter(
1354    strip_bool,
1355    doc = "Sets whether to load the free pages when opening the db file.\
1356    Note when opening db in write mode, bbolt will always load the free pages."
1357  ))]
1358  preload_freelist: bool,
1359  //mmap_flags,
1360  #[builder(
1361    default,
1362    setter(
1363      strip_option,
1364      doc = "InitialMmapSize is the initial mmap size of the database in bytes. \
1365    Read transactions won't block write transaction if the InitialMmapSize is \
1366    large enough to hold database mmap size."
1367    )
1368  )]
1369  /// initial_mmap_size is the initial mmap size of the database
1370  /// in bytes. Read transactions won't block write transaction
1371  /// if the initial_mmap_size is large enough to hold database mmap
1372  /// size. (See DB.Begin for more information)
1373  ///
1374  /// If <=0, the initial map size is 0.
1375  /// If initial_mmap_size is smaller than the previous database size,
1376  /// it takes no effect.
1377  initial_mmap_size: Option<u64>,
1378  #[builder(default, setter(strip_option))]
1379  /// PageSize overrides the default OS page size.
1380  page_size: Option<usize>,
1381  /// NoSync sets the initial value of DB.NoSync. Normally this can just be
1382  /// set directly on the DB itself when returned from Open(), but this option
1383  /// is useful in APIs which expose Options but not the underlying DB.
1384  #[builder(setter(strip_bool))]
1385  no_sync: bool,
1386  /// Mlock locks database file in memory when set to true.
1387  /// It prevents potential page faults, however
1388  /// used memory can't be reclaimed. (UNIX only)
1389  #[cfg(mlock_supported)]
1390  #[builder(setter(strip_bool))]
1391  mlock: bool,
1392  #[builder(
1393    default,
1394    setter(strip_option, doc = "max_batch_size is the maximum size of a batch.")
1395  )]
1396  max_batch_size: Option<u32>,
1397  #[builder(
1398    default,
1399    setter(
1400      strip_option,
1401      doc = "max_batch_delay is the maximum delay before a batch starts."
1402    )
1403  )]
1404  max_batch_delay: Option<Duration>,
1405  #[builder(default = false, setter(skip))]
1406  /// Open database in read-only mode. Uses flock(..., LOCK_SH |LOCK_NB) to
1407  /// grab a shared lock (UNIX).
1408  read_only: bool,
1409}
1410
1411impl BoltOptions {
1412  #[inline]
1413  pub(crate) fn timeout(&self) -> Option<Duration> {
1414    if cfg!(timeout_supported) {
1415      self.timeout
1416    } else {
1417      None
1418    }
1419  }
1420
1421  #[inline]
1422  pub(crate) fn no_grow_sync(&self) -> bool {
1423    self.no_grow_sync
1424  }
1425
1426  #[inline]
1427  pub(crate) fn no_freelist_sync(&self) -> bool {
1428    self.no_freelist_sync
1429  }
1430
1431  #[inline]
1432  pub(crate) fn preload_freelist(&self) -> bool {
1433    if self.read_only {
1434      self.preload_freelist
1435    } else {
1436      true
1437    }
1438  }
1439
1440  #[inline]
1441  pub(crate) fn initial_map_size(&self) -> Option<u64> {
1442    self.initial_mmap_size
1443  }
1444
1445  #[inline]
1446  pub(crate) fn page_size(&self) -> Option<usize> {
1447    self.page_size
1448  }
1449
1450  #[inline]
1451  pub(crate) fn no_sync(&self) -> bool {
1452    self.no_sync
1453  }
1454
1455  #[inline]
1456  pub(crate) fn mlock(&self) -> bool {
1457    if cfg!(mlock_supported) {
1458      self.mlock
1459    } else {
1460      false
1461    }
1462  }
1463
1464  #[inline]
1465  pub(crate) fn read_only(&self) -> bool {
1466    self.read_only
1467  }
1468
1469  /// Open creates and opens a database at the given path.
1470  /// If the file does not exist then it will be created automatically.
1471  pub fn open<T: AsRef<Path>>(self, path: T) -> crate::Result<Bolt> {
1472    Bolt::open_path(path, self)
1473  }
1474
1475  /// Opens a database as read-only at the given path.
1476  /// If the file does not exist then it will be created automatically.
1477  pub fn open_ro<T: AsRef<Path>>(mut self, path: T) -> crate::Result<impl DbApi> {
1478    self.read_only = true;
1479    Bolt::open_path(path, self)
1480  }
1481
1482  /// Opens an in-memory database
1483  pub fn open_mem(self) -> crate::Result<Bolt> {
1484    Bolt::new_mem_with_options(self)
1485  }
1486}
1487
1488type BatchFn = dyn FnMut(&mut TxRwRef) -> crate::Result<()> + Send + Sync + 'static;
1489
1490struct Call {
1491  f: Box<BatchFn>,
1492  err: SyncSender<crate::Result<()>>,
1493}
1494
1495struct ScheduledBatch {
1496  timer_guard: Option<Guard>,
1497  calls: Vec<Call>,
1498}
1499
1500impl ScheduledBatch {
1501  fn cancel_schedule(&mut self) {
1502    if let Some(guard) = self.timer_guard.take() {
1503      guard.ignore()
1504    }
1505  }
1506
1507  fn run(&mut self, db: &mut Bolt) {
1508    'retry: loop {
1509      if self.calls.is_empty() {
1510        break;
1511      }
1512      let mut fail_idx = None;
1513      let _ = db.update(|mut tx| {
1514        for (i, call) in self.calls.iter_mut().enumerate() {
1515          let result = (call.f)(&mut tx);
1516          if result.is_err() {
1517            fail_idx = Some(i);
1518            return result;
1519          }
1520        }
1521        Ok(())
1522      });
1523      if let Some(idx) = fail_idx {
1524        let call = self.calls.remove(idx);
1525        call.err.send(Err(Error::TrySolo)).unwrap();
1526        continue 'retry;
1527      }
1528      for call in &self.calls {
1529        call.err.send(Ok(())).unwrap()
1530      }
1531      break;
1532    }
1533  }
1534}
1535
1536struct InnerBatcher {
1537  timer: Timer,
1538  batch_pool: Arc<SyncPool<ScheduledBatch>>,
1539  scheduled: Mutex<SyncReusable<ScheduledBatch>>,
1540}
1541
1542impl InnerBatcher {
1543  fn new(parent: &Arc<Batcher>) -> InnerBatcher {
1544    let b = Arc::downgrade(parent);
1545    let timer = Timer::new();
1546    let batch_pool = SyncPool::new(
1547      || ScheduledBatch {
1548        timer_guard: None,
1549        calls: Vec::with_capacity(0),
1550      },
1551      |batch| {
1552        batch.timer_guard = None;
1553        batch.calls.clear();
1554      },
1555    );
1556    let guard = timer.schedule_with_delay(parent.max_batch_delay, move || {
1557      let batcher = b.upgrade().unwrap();
1558      let mut db = Bolt {
1559        inner: batcher.db.upgrade().unwrap(),
1560      };
1561      batcher.take_batch().run(&mut db)
1562    });
1563
1564    let mut scheduled = batch_pool.pull();
1565    scheduled.timer_guard = Some(guard);
1566    InnerBatcher {
1567      timer,
1568      batch_pool,
1569      scheduled: scheduled.into(),
1570    }
1571  }
1572}
1573
1574struct Batcher {
1575  inner: OnceLock<InnerBatcher>,
1576  db: Weak<InnerDB>,
1577  max_batch_delay: Duration,
1578  max_batch_size: u32,
1579}
1580
1581impl Batcher {
1582  fn new(db: Weak<InnerDB>, max_batch_delay: Duration, max_batch_size: u32) -> Arc<Batcher> {
1583    Arc::new(Batcher {
1584      inner: Default::default(),
1585      db,
1586      max_batch_delay,
1587      max_batch_size,
1588    })
1589  }
1590
1591  fn inner<'a>(self: &'a Arc<Batcher>) -> &'a InnerBatcher {
1592    self.inner.get_or_init(move || InnerBatcher::new(self))
1593  }
1594
1595  fn batch<F>(self: &Arc<Batcher>, mut db: Bolt, mut f: F) -> crate::Result<()>
1596  where
1597    F: FnMut(&mut TxRwRef) -> crate::Result<()> + Send + Sync + Clone + 'static,
1598  {
1599    if self.max_batch_size == 0 || self.max_batch_delay.is_zero() {
1600      return Err(Error::BatchDisabled);
1601    }
1602    let inner = self.inner();
1603    let (call_len, rx) = {
1604      let mut batch = inner.scheduled.lock();
1605
1606      let (tx, rx): (SyncSender<crate::Result<()>>, Receiver<crate::Result<()>>) =
1607        mpsc::sync_channel(1);
1608      batch.calls.push(Call {
1609        f: Box::new(f.clone()),
1610        err: tx,
1611      });
1612      (batch.calls.len(), rx)
1613    };
1614    if call_len > self.max_batch_size as usize {
1615      let mut immediate = self.take_batch();
1616      if !immediate.calls.is_empty() {
1617        let mut i_db = db.clone();
1618        thread::spawn(move || immediate.run(&mut i_db));
1619      }
1620    }
1621
1622    let result = rx.recv().unwrap();
1623    if Err(Error::TrySolo) == result {
1624      db.update(|mut tx| f(&mut tx))?;
1625    }
1626    Ok(())
1627  }
1628
1629  fn schedule_batch(self: &Arc<Batcher>) -> Guard {
1630    let inner = self.inner();
1631    let b = Arc::downgrade(self);
1632    inner
1633      .timer
1634      .schedule_with_delay(self.max_batch_delay, move || {
1635        let batcher = b.upgrade().unwrap();
1636        let mut db = Bolt {
1637          inner: batcher.db.upgrade().unwrap(),
1638        };
1639        batcher.take_batch().run(&mut db)
1640      })
1641  }
1642
1643  fn take_batch(self: &Arc<Batcher>) -> SyncReusable<ScheduledBatch> {
1644    let inner = self.inner();
1645    let mut swap_batch = inner.batch_pool.pull();
1646    let mut lock = inner.scheduled.lock();
1647    mem::swap(&mut swap_batch, &mut *lock);
1648    swap_batch.cancel_schedule();
1649    let guard = self.schedule_batch();
1650    lock.timer_guard = Some(guard);
1651    swap_batch
1652  }
1653}
1654
1655/// A BBolt Database
1656pub struct InnerDB {
1657  path: Arc<DbPath>,
1658  bump_pool: Arc<SyncPool<Pin<Box<PinBump>>>>,
1659  db: RwLock<DbShared>,
1660  stats: Arc<DbStats>,
1661  db_state: Arc<Mutex<DbState>>,
1662  batcher: Arc<Batcher>,
1663}
1664
1665unsafe impl Send for InnerDB {}
1666unsafe impl Sync for InnerDB {}
1667
1668/// The Bolt Database
1669#[derive(Clone)]
1670pub struct Bolt {
1671  inner: Arc<InnerDB>,
1672}
1673
1674impl Bolt {
1675  /// Open creates and opens a database at the given path.
1676  /// If the file does not exist then it will be created automatically.
1677  pub fn open<T: AsRef<Path>>(path: T) -> crate::Result<Self> {
1678    Bolt::open_path(path, BoltOptions::default())
1679  }
1680
1681  /// Opens a database as read-only at the given path.
1682  /// If the file does not exist then it will be created automatically.
1683  pub fn open_ro<T: AsRef<Path>>(path: T) -> crate::Result<impl DbApi> {
1684    Bolt::open_path(
1685      path,
1686      BoltOptions {
1687        read_only: true,
1688        ..Default::default()
1689      },
1690    )
1691  }
1692
1693  fn new_file_backend(path: &Path, bolt_options: BoltOptions) -> crate::Result<Bolt> {
1694    let read_only = bolt_options.read_only();
1695    let mut file = if bolt_options.read_only() {
1696      let file = fs::OpenOptions::new().read(true).open(path)?;
1697      file.lock_shared()?;
1698      file
1699    } else {
1700      let mut file = fs::OpenOptions::new()
1701        .write(true)
1702        .read(true)
1703        .create(true)
1704        .truncate(false)
1705        .open(path)?;
1706      file.lock_exclusive()?;
1707      if !path.exists() || path.metadata()?.len() == 0 {
1708        let page_size = bolt_options
1709          .page_size()
1710          .unwrap_or(DEFAULT_PAGE_SIZE.bytes() as usize);
1711        Bolt::init(path, &mut file, page_size)?;
1712      }
1713      file
1714    };
1715    let page_size = FileBackend::get_page_size(&mut file)?;
1716    assert!(page_size > 0, "invalid page size");
1717
1718    let file_size = file.metadata()?.len();
1719    let data_size = if let Some(initial_mmap_size) = bolt_options.initial_map_size() {
1720      file_size.max(initial_mmap_size)
1721    } else {
1722      file_size
1723    };
1724    let options = MmapOptions::new()
1725      .offset(0)
1726      .len(data_size as usize)
1727      .to_owned();
1728    let mmap = if read_only {
1729      options.map_raw_read_only(&file)?
1730    } else {
1731      options.map_raw(&file)?
1732    };
1733    #[cfg(mlock_supported)]
1734    if bolt_options.mlock() {
1735      mmap.lock()?;
1736    }
1737
1738    #[cfg(mmap_advise_supported)]
1739    mmap.advise(Advice::Random)?;
1740
1741    let backend = FileBackend {
1742      path: Arc::new(path.into()),
1743      file: Mutex::new(FileState { file, file_size }),
1744      page_size,
1745      mmap: Some(mmap),
1746      freelist: OnceLock::new(),
1747      alloc_size: DEFAULT_ALLOC_SIZE.bytes() as u64,
1748      data_size,
1749      use_mlock: bolt_options.mlock(),
1750      grow_async: !bolt_options.no_grow_sync(),
1751      read_only,
1752    };
1753    backend.file_size()?;
1754    let backend = Box::new(backend);
1755    Self::new_db(DbPath::FilePath(path.into()), bolt_options, backend)
1756  }
1757
1758  fn new_mem_with_options(bolt_options: BoltOptions) -> crate::Result<Bolt> {
1759    let page_size = bolt_options
1760      .page_size()
1761      .unwrap_or(DEFAULT_PAGE_SIZE.bytes() as usize);
1762    let mut mmap = Bolt::init_page(page_size);
1763    let file_size = mmap.len() as u64;
1764    let data_size = if let Some(initial_mmap_size) = bolt_options.initial_map_size() {
1765      file_size.max(initial_mmap_size)
1766    } else {
1767      file_size
1768    };
1769    if file_size < data_size {
1770      let mut new_mmap = AlignedBytes::new_zeroed(data_size as usize);
1771      new_mmap
1772        .split_at_mut(file_size as usize)
1773        .0
1774        .copy_from_slice(&mmap);
1775      mmap = new_mmap;
1776    }
1777    let backend = MemBackend {
1778      mmap: Mutex::new(mmap),
1779      freelist: OnceLock::new(),
1780      page_size,
1781      alloc_size: DEFAULT_ALLOC_SIZE.bytes() as u64,
1782      file_size,
1783      data_size,
1784    };
1785    let backend = Box::new(backend);
1786    Self::new_db(DbPath::Memory, bolt_options, backend)
1787  }
1788
1789  fn new_db(
1790    db_path: DbPath, bolt_options: BoltOptions, backend: Box<dyn DBBackend>,
1791  ) -> crate::Result<Self> {
1792    backend.validate_meta()?;
1793    let mut free_count = 0u64;
1794    if bolt_options.preload_freelist() {
1795      free_count = backend.freelist().free_count();
1796    }
1797    let meta = backend.meta();
1798    if meta.free_list() == PGID_NO_FREE_LIST {
1799      return Err(Error::Other(anyhow!(
1800        "PGID_NO_FREE_LIST not currently supported"
1801      )));
1802    }
1803    let db_state = Arc::new(Mutex::new(DbState::new(meta)));
1804    let stats = DbStats {
1805      free_page_n: (free_count as i64).into(),
1806      ..Default::default()
1807    };
1808    let arc_stats = Arc::new(stats);
1809    let bump_pool = SyncPool::new(
1810      || Box::pin(PinBump::default()),
1811      |bump| Pin::as_mut(bump).reset(),
1812    );
1813
1814    let inner = Arc::new_cyclic(|weak| InnerDB {
1815      path: Arc::new(db_path),
1816      bump_pool,
1817      db: RwLock::new(DbShared {
1818        stats: arc_stats.clone(),
1819        db_state: db_state.clone(),
1820        backend,
1821        page_pool: Mutex::new(vec![]),
1822        options: bolt_options.clone(),
1823      }),
1824      stats: arc_stats,
1825      db_state,
1826      batcher: Arc::new(Batcher {
1827        inner: Default::default(),
1828        db: weak.clone(),
1829        max_batch_delay: bolt_options
1830          .max_batch_delay
1831          .unwrap_or(DEFAULT_MAX_BATCH_DELAY),
1832        max_batch_size: bolt_options
1833          .max_batch_size
1834          .unwrap_or(DEFAULT_MAX_BATCH_SIZE),
1835      }),
1836    });
1837    Ok(Bolt { inner })
1838  }
1839
1840  fn open_path<T: AsRef<Path>>(path: T, db_options: BoltOptions) -> crate::Result<Self> {
1841    let pref = path.as_ref();
1842    Self::new_file_backend(pref, db_options)
1843  }
1844
1845  /// Opens an in-memory database
1846  pub fn open_mem() -> crate::Result<Self> {
1847    Bolt::new_mem_with_options(BoltOptions::default())
1848  }
1849
1850  fn init_page(page_size: usize) -> AlignedBytes<alignment::Page> {
1851    let mut buffer = AlignedBytes::<alignment::Page>::new_zeroed(page_size * 4);
1852    for (i, page_bytes) in buffer.chunks_mut(page_size).enumerate() {
1853      let mut page = MutPage::new(page_bytes.as_mut_ptr());
1854      if i < 2 {
1855        let meta_page = MappedMetaPage::mut_into(&mut page);
1856        let ph = &mut meta_page.page;
1857        ph.id = PgId(i as u64);
1858        // set overflow and count to keep Miri happy
1859        ph.count = 0;
1860        ph.overflow = 0;
1861        let meta = &mut meta_page.meta;
1862        meta.set_magic(MAGIC);
1863        meta.set_version(VERSION);
1864        meta.set_page_size(page_size as u32);
1865        meta.set_free_list(PgId(2));
1866        meta.set_root(BucketHeader::new(PgId(3), 0));
1867        meta.set_pgid(PgId(4));
1868        meta.set_txid(TxId(i as u64));
1869        meta.set_checksum(meta.sum64());
1870      } else if i == 2 {
1871        let free_list = MappedFreeListPage::mut_into(&mut page);
1872        free_list.id = PgId(2);
1873        free_list.count = 0;
1874        free_list.overflow = 0;
1875      } else if i == 3 {
1876        let leaf_page = MappedLeafPage::mut_into(&mut page);
1877        leaf_page.id = PgId(3);
1878        leaf_page.count = 0;
1879        leaf_page.overflow = 0;
1880      }
1881    }
1882    buffer
1883  }
1884
1885  fn init(path: &Path, db: &mut File, page_size: usize) -> io::Result<usize> {
1886    let buffer = Bolt::init_page(page_size);
1887    #[cfg(unix)]
1888    {
1889      use std::os::unix::fs::PermissionsExt;
1890      let metadata = db.metadata()?;
1891      let mut permissions = metadata.permissions();
1892      permissions.set_mode(0o600);
1893      fs::set_permissions(path, permissions)?;
1894    }
1895    db.write_all(&buffer)?;
1896    db.flush()?;
1897    Ok(buffer.len())
1898  }
1899
1900  fn require_open(state: &DbState) -> crate::Result<()> {
1901    if !state.is_open {
1902      return Err(Error::DatabaseNotOpen);
1903    }
1904    Ok(())
1905  }
1906
1907  pub(crate) fn begin_tx(&self) -> crate::Result<TxImpl> {
1908    let mut state = self.inner.db_state.lock();
1909    Bolt::require_open(&state)?;
1910    let lock = self.inner.db.read();
1911    let bump = self.inner.bump_pool.pull();
1912    let meta = state.current_meta;
1913    let txid = meta.txid();
1914    state.txs.push(txid);
1915    self.inner.stats.inc_tx_n(1);
1916    self
1917      .inner
1918      .stats
1919      .open_tx_n
1920      .store(state.txs.len() as i64, Ordering::Release);
1921    Ok(TxImpl::new(bump, lock, meta))
1922  }
1923
1924  #[cfg(feature = "try-begin")]
1925  pub(crate) fn try_begin_tx<'a, F>(&'a self, f: F) -> crate::Result<Option<TxImpl>>
1926  where
1927    F: Fn() -> Option<RwLockReadGuard<'a, DbShared>>,
1928  {
1929    let mut state = self.inner.db_state.lock();
1930    Bolt::require_open(&state)?;
1931    if let Some(lock) = f() {
1932      let bump = self.inner.bump_pool.pull();
1933      let meta = state.current_meta;
1934      let txid = meta.txid();
1935      state.txs.push(txid);
1936      self.inner.stats.inc_tx_n(1);
1937      self
1938        .inner
1939        .stats
1940        .open_tx_n
1941        .store(state.txs.len() as i64, Ordering::Release);
1942      Ok(Some(TxImpl::new(bump, lock, meta)))
1943    } else {
1944      Ok(None)
1945    }
1946  }
1947
1948  pub(crate) fn begin_rw_tx(&mut self) -> crate::Result<TxRwImpl> {
1949    let lock = self.inner.db.upgradable_read();
1950    let mut state = self.inner.db_state.lock();
1951    Bolt::require_open(&state)?;
1952    lock.free_pages(&mut state);
1953    let bump = self.inner.bump_pool.pull();
1954    let mut meta = state.current_meta;
1955    let txid = meta.txid() + 1;
1956    meta.set_txid(txid);
1957    state.rwtx = Some(txid);
1958    Ok(TxRwImpl::new(bump, lock, meta))
1959  }
1960
1961  #[cfg(feature = "try-begin")]
1962  pub(crate) fn try_begin_rw_tx<'a, F>(&'a self, f: F) -> crate::Result<Option<TxRwImpl>>
1963  where
1964    F: Fn() -> Option<RwLockUpgradableReadGuard<'a, DbShared>>,
1965  {
1966    if let Some(lock) = f() {
1967      lock.free_pages();
1968      let mut state = self.inner.db_state.lock();
1969      Bolt::require_open(&state)?;
1970      let bump = self.inner.bump_pool.pull();
1971      let mut meta = state.current_meta;
1972      let txid = meta.txid() + 1;
1973      meta.set_txid(txid);
1974      state.rwtx = Some(txid);
1975      Ok(Some(TxRwImpl::new(bump, lock, meta)))
1976    } else {
1977      Ok(None)
1978    }
1979  }
1980}
1981
1982impl DbApi for Bolt {
1983  fn begin(&self) -> crate::Result<impl TxApi> {
1984    self.begin_tx()
1985  }
1986
1987  #[cfg(feature = "try-begin")]
1988  fn try_begin(&self) -> crate::Result<Option<impl TxApi>> {
1989    self.try_begin_tx(|| self.inner.db.try_read())
1990  }
1991
1992  #[cfg(feature = "try-begin")]
1993  fn try_begin_for(&self, duration: Duration) -> crate::Result<Option<impl TxApi>> {
1994    self.try_begin_tx(|| self.inner.db.try_read_for(duration))
1995  }
1996
1997  #[cfg(feature = "try-begin")]
1998  fn try_begin_until(&self, instant: Instant) -> crate::Result<Option<impl TxApi>> {
1999    self.try_begin_tx(|| self.inner.db.try_read_until(instant))
2000  }
2001
2002  fn view<'tx, F: FnMut(TxRef<'tx>) -> crate::Result<()>>(
2003    &'tx self, mut f: F,
2004  ) -> crate::Result<()> {
2005    let tx = self.begin_tx()?;
2006    let tx_ref = tx.get_ref();
2007    let r = f(tx_ref);
2008    r
2009  }
2010
2011  fn stats(&self) -> Arc<DbStats> {
2012    self.inner.stats.clone()
2013  }
2014
2015  fn path(&self) -> &DbPath {
2016    &self.inner.path
2017  }
2018
2019  fn info(&self) -> DbInfo {
2020    DbInfo {
2021      page_size: self.inner.db.read().backend.page_size(),
2022    }
2023  }
2024
2025  fn close(self) {
2026    let mut lock = self.inner.db.write();
2027    let mut state = self.inner.db_state.lock();
2028    if Bolt::require_open(&state).is_ok() {
2029      state.is_open = false;
2030      let mut closed_db: Box<dyn DBBackend> = Box::new(ClosedBackend {});
2031      mem::swap(&mut closed_db, &mut lock.backend);
2032      lock.page_pool.lock().clear();
2033      self.inner.bump_pool.clear();
2034      if let Some(inner_batcher) = self.inner.batcher.inner.get() {
2035        inner_batcher.batch_pool.clear();
2036      }
2037    }
2038  }
2039}
2040
2041impl DbRwAPI for Bolt {
2042  fn begin_rw(&mut self) -> crate::Result<impl TxRwApi> {
2043    self.begin_rw_tx()
2044  }
2045
2046  #[cfg(feature = "try-begin")]
2047  fn try_begin_rw(&self) -> crate::Result<Option<impl TxRwApi>> {
2048    self.try_begin_rw_tx(|| self.inner.db.try_upgradable_read())
2049  }
2050
2051  #[cfg(feature = "try-begin")]
2052  fn try_begin_rw_for(&self, duration: Duration) -> crate::Result<Option<impl TxRwApi>> {
2053    self.try_begin_rw_tx(|| self.inner.db.try_upgradable_read_for(duration))
2054  }
2055
2056  #[cfg(feature = "try-begin")]
2057  fn try_begin_rw_until(&self, instant: Instant) -> crate::Result<Option<impl TxRwApi>> {
2058    self.try_begin_rw_tx(|| self.inner.db.try_upgradable_read_until(instant))
2059  }
2060
2061  fn update<'tx, F: FnMut(TxRwRef<'tx>) -> crate::Result<()>>(
2062    &'tx mut self, mut f: F,
2063  ) -> crate::Result<()> {
2064    let txrw = self.begin_rw_tx()?;
2065    let tx_ref = txrw.get_ref();
2066    match f(tx_ref) {
2067      Ok(_) => {
2068        txrw.commit()?;
2069        Ok(())
2070      }
2071      Err(e) => {
2072        let _ = txrw.rollback();
2073        Err(e)
2074      }
2075    }
2076  }
2077
2078  fn batch<F>(&mut self, f: F) -> crate::Result<()>
2079  where
2080    F: FnMut(&mut TxRwRef) -> crate::Result<()> + Send + Sync + Clone + 'static,
2081  {
2082    self.inner.batcher.batch(self.clone(), f)
2083  }
2084
2085  fn sync(&mut self) -> crate::Result<()> {
2086    self.inner.db.write().backend.fsync()?;
2087    Ok(())
2088  }
2089}
2090
2091#[cfg(test)]
2092mod test {
2093  use crate::common::defaults::DEFAULT_PAGE_SIZE;
2094  use crate::common::page::meta::MappedMetaPage;
2095  use crate::db::DbStats;
2096  use crate::test_support::{temp_file, TestDb};
2097  use crate::{
2098    Bolt, BoltOptions, BucketApi, BucketRwApi, DbApi, DbPath, DbRwAPI, Error, PgId, TxApi, TxCheck,
2099    TxRwApi, TxRwRefApi,
2100  };
2101  use aligners::{alignment, AlignedBytes};
2102  use std::io::{Read, Seek, SeekFrom, Write};
2103  use std::sync::mpsc::channel;
2104  use std::sync::Arc;
2105  use std::thread;
2106
2107  #[test]
2108  #[cfg(not(miri))]
2109  fn test_open() -> crate::Result<()> {
2110    let db = TestDb::new()?;
2111    db.clone_db().close();
2112    Ok(())
2113  }
2114
2115  #[test]
2116  #[cfg(feature = "long-tests")]
2117  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2118  fn test_open_multiple_threads() -> crate::Result<()> {
2119    let instances = 30;
2120    let iterations = 30;
2121    let mut threads = Vec::new();
2122    let temp_file = Arc::new(temp_file()?);
2123    let (tx, rx) = channel();
2124    for _ in 0..iterations {
2125      for _ in 0..instances {
2126        let t_file = temp_file.clone();
2127        let t_tx = tx.clone();
2128        let handle = thread::spawn(move || {
2129          let db = Bolt::open(t_file.path());
2130          if let Some(error) = db.err() {
2131            let s = format!("{}", &error);
2132            t_tx.send(error).unwrap();
2133          }
2134        });
2135        threads.push(handle);
2136      }
2137      while let Some(handle) = threads.pop() {
2138        handle.join().unwrap();
2139      }
2140    }
2141    drop(tx);
2142    if let Ok(error) = rx.try_recv() {
2143      panic!("Fatal error: {}", error);
2144    }
2145    Ok(())
2146  }
2147
2148  #[test]
2149  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2150  fn test_open_err_path_required() -> crate::Result<()> {
2151    let r = Bolt::open("");
2152    assert!(r.is_err());
2153    Ok(())
2154  }
2155
2156  #[test]
2157  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2158  fn test_open_err_not_exists() -> crate::Result<()> {
2159    let file = temp_file()?;
2160    let path = file.path().join("bad-path");
2161    let r = Bolt::open(path);
2162    assert!(r.is_err());
2163    Ok(())
2164  }
2165
2166  #[test]
2167  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2168  fn test_open_err_invalid() -> crate::Result<()> {
2169    let mut file = temp_file()?;
2170    file
2171      .as_file_mut()
2172      .write_all(b"this is not a bolt database")?;
2173    let r = Bolt::open(file.path());
2174    assert_eq!(Some(Error::InvalidDatabase(false)), r.err());
2175    Ok(())
2176  }
2177
2178  #[test]
2179  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2180  fn test_open_err_version_mismatch() -> crate::Result<()> {
2181    // TODO: Make this cleaner
2182    let mut file = temp_file()?;
2183    let db = Bolt::open(file.path())?;
2184    db.close();
2185    let mut bytes = AlignedBytes::<alignment::Page>::new_zeroed(4096 * 2);
2186    file.seek(SeekFrom::Start(0))?;
2187    file.read_exact(&mut bytes)?;
2188    let mut meta_0 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr()) };
2189    let meta0_version = meta_0.meta.version();
2190    meta_0.meta.set_version(meta0_version + 1);
2191    let mut meta_1 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr().add(4096)) };
2192    let meta1_version = meta_1.meta.version();
2193    meta_1.meta.set_version(meta1_version + 1);
2194    file.seek(SeekFrom::Start(0))?;
2195    file.write_all(&bytes)?;
2196    file.flush()?;
2197    let r = Bolt::open(file.path());
2198    assert_eq!(Some(Error::VersionMismatch), r.err());
2199    Ok(())
2200  }
2201
2202  #[test]
2203  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2204  fn test_open_err_checksum() -> crate::Result<()> {
2205    // TODO: Make this cleaner
2206    let mut file = temp_file()?;
2207    let db = Bolt::open(file.path())?;
2208    db.close();
2209    let mut bytes = AlignedBytes::<alignment::Page>::new_zeroed(4096 * 2);
2210    file.seek(SeekFrom::Start(0))?;
2211    file.read_exact(&mut bytes)?;
2212    let mut meta_0 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr()) };
2213    let meta0_pgid = meta_0.meta.pgid();
2214    meta_0.meta.set_pgid(meta0_pgid + 1);
2215    let mut meta_1 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr().add(4096)) };
2216    let meta1_pgid = meta_1.meta.pgid();
2217    meta_1.meta.set_pgid(meta1_pgid + 1);
2218    file.seek(SeekFrom::Start(0))?;
2219    file.write_all(&bytes)?;
2220    file.flush()?;
2221    let r = Bolt::open(file.path());
2222    assert_eq!(Some(Error::ChecksumMismatch), r.err());
2223    Ok(())
2224  }
2225
2226  #[test]
2227  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2228  fn test_open_read_page_size_from_meta1_os() -> crate::Result<()> {
2229    // TODO: Make this cleaner
2230    let mut file = temp_file()?;
2231    let db = Bolt::open(file.path())?;
2232    db.close();
2233    let mut bytes = AlignedBytes::<alignment::Page>::new_zeroed(4096 * 2);
2234    file.seek(SeekFrom::Start(0))?;
2235    file.read_exact(&mut bytes)?;
2236    let mut meta_0 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr()) };
2237    let meta0_pgid = meta_0.meta.pgid();
2238    meta_0.meta.set_pgid(meta0_pgid + 1);
2239    file.seek(SeekFrom::Start(0))?;
2240    file.write_all(&bytes)?;
2241    file.flush()?;
2242    let db = Bolt::open(file.path())?;
2243    assert_eq!(4096, db.info().page_size);
2244    Ok(())
2245  }
2246
2247  #[test]
2248  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2249  fn test_open_read_page_size_from_meta1_given() -> crate::Result<()> {
2250    for i in 0..=14usize {
2251      let given_page_size = 1024usize << i;
2252      let mut db = TestDb::with_options(BoltOptions::builder().page_size(given_page_size).build())?;
2253
2254      if i % 3 == 0 {
2255        db.must_close();
2256        let named_file = db.tmp_file.as_mut();
2257        let file = named_file.unwrap();
2258        let mut bytes = AlignedBytes::<alignment::Page>::new_zeroed(given_page_size * 2);
2259        file.seek(SeekFrom::Start(0))?;
2260        file.read_exact(&mut bytes)?;
2261        let mut meta_0 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr()) };
2262        let meta0_pgid = meta_0.meta.pgid();
2263        meta_0.meta.set_pgid(meta0_pgid + 1);
2264        file.seek(SeekFrom::Start(0))?;
2265        file.write_all(&bytes)?;
2266        file.flush()?;
2267        db.must_reopen();
2268      }
2269
2270      assert_eq!(given_page_size, db.info().page_size);
2271    }
2272    Ok(())
2273  }
2274
2275  #[test]
2276  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2277  fn test_open_size() -> crate::Result<()> {
2278    let mut db = TestDb::new()?;
2279    let page_size = db.info().page_size;
2280    let v = [0; 1000];
2281    for _tx_ct in 0..1 {
2282      db.update(|mut tx| {
2283        let mut b = tx.create_bucket_if_not_exists("data")?;
2284        for keys_ct in 0..10000 {
2285          let k = format!("{:04}", keys_ct);
2286          b.put(&k, &v)?;
2287        }
2288        Ok(())
2289      })?;
2290    }
2291    db.must_close();
2292
2293    let file_size = {
2294      let tmp_file = db.tmp_file.as_ref();
2295      let file = tmp_file.unwrap();
2296      file.path().metadata()?.len()
2297    };
2298    assert_ne!(0, file_size, "unexpected new file size");
2299
2300    db.must_reopen();
2301    db.update(|mut tx| {
2302      tx.bucket_mut("data").unwrap().put("0", "0")?;
2303      Ok(())
2304    })?;
2305
2306    db.must_close();
2307
2308    let new_size = {
2309      let tmp_file = db.tmp_file.as_ref();
2310      let file = tmp_file.unwrap();
2311      file.path().metadata()?.len()
2312    };
2313
2314    assert!(
2315      file_size > (new_size - (5 * page_size) as u64),
2316      "unexpected file growth: {} => {}",
2317      file_size,
2318      new_size
2319    );
2320
2321    Ok(())
2322  }
2323
2324  #[test]
2325  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2326  #[cfg(feature = "long-tests")]
2327  fn test_open_size_large() -> crate::Result<()> {
2328    let mut db = TestDb::new()?;
2329    let page_size = db.info().page_size;
2330    let v = [0; 50];
2331    for _tx_ct in 0..10000 {
2332      db.update(|mut tx| {
2333        let mut b = tx.create_bucket_if_not_exists("data")?;
2334        for keys_ct in 0..1000u64 {
2335          let k = keys_ct.to_be_bytes();
2336          b.put(k, v)?;
2337        }
2338        Ok(())
2339      })?;
2340    }
2341    db.must_close();
2342
2343    let file_size = {
2344      let tmp_file = db.tmp_file.as_ref();
2345      let file = tmp_file.unwrap();
2346      file.path().metadata()?.len()
2347    };
2348    assert_ne!(0, file_size, "unexpected new file size");
2349
2350    db.must_reopen();
2351    db.update(|mut tx| {
2352      tx.bucket_mut("data").unwrap().put("0", "0")?;
2353      Ok(())
2354    })?;
2355
2356    db.must_close();
2357
2358    let new_size = {
2359      let tmp_file = db.tmp_file.as_ref();
2360      let file = tmp_file.unwrap();
2361      file.path().metadata()?.len()
2362    };
2363
2364    assert!(
2365      file_size > (new_size - (5 * page_size) as u64),
2366      "unexpected file growth: {} => {}",
2367      file_size,
2368      new_size
2369    );
2370
2371    Ok(())
2372  }
2373
2374  #[test]
2375  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2376  fn test_open_check() -> crate::Result<()> {
2377    let mut db = TestDb::new()?;
2378    db.view(|tx| {
2379      assert!(tx.check().is_empty());
2380      Ok(())
2381    })?;
2382    db.must_close();
2383    db.must_reopen();
2384    db.view(|tx| {
2385      assert!(tx.check().is_empty());
2386      Ok(())
2387    })?;
2388    Ok(())
2389  }
2390
2391  #[test]
2392  #[ignore]
2393  fn test_open_meta_init_write_error() {
2394    todo!("pending in go")
2395  }
2396
2397  #[test]
2398  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2399  fn test_open_file_too_small() -> crate::Result<()> {
2400    let mut db = TestDb::new()?;
2401    db.must_close();
2402    {
2403      let temp_file = db.tmp_file.as_mut();
2404      let file = temp_file.unwrap();
2405      file.as_file_mut().set_len(4096)?;
2406    }
2407    assert_eq!(Some(Error::FileSizeTooSmall(4096)), db.reopen().err());
2408    Ok(())
2409  }
2410
2411  #[test]
2412  #[ignore]
2413  fn test_db_open_initial_mmap_size() {
2414    todo!()
2415  }
2416
2417  #[test]
2418  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2419  fn test_db_open_read_only() -> crate::Result<()> {
2420    let mut db = TestDb::new()?;
2421    db.update(|mut tx| {
2422      let mut b = tx.create_bucket("widgets")?;
2423      b.put("foo", "bar")?;
2424      Ok(())
2425    })?;
2426    let path = db.path().clone();
2427    db.must_close();
2428    let ro = match path {
2429      DbPath::Memory => panic!("Path is DbPath::Memory"),
2430      DbPath::FilePath(path) => Bolt::open_ro(path)?,
2431    };
2432    ro.view(|tx| {
2433      let b = tx.bucket("widgets").unwrap();
2434      assert_eq!(Some(b"bar".as_slice()), b.get("foo"));
2435      Ok(())
2436    })?;
2437    ro.close();
2438    Ok(())
2439  }
2440
2441  #[test]
2442  #[cfg(not(any(miri, feature = "test-mem-backend")))]
2443  fn test_open_big_page() -> crate::Result<()> {
2444    let page_size = DEFAULT_PAGE_SIZE.bytes() as usize;
2445    let options = BoltOptions::builder().page_size(page_size * 2).build();
2446    let db1 = TestDb::with_options(options.clone())?;
2447    let options = BoltOptions::builder().page_size(page_size * 4).build();
2448    let db2 = TestDb::with_options(options.clone())?;
2449    let db1_len = db1.tmp_file.as_ref().unwrap().as_file().metadata()?.len();
2450    let db2_len = db2.tmp_file.as_ref().unwrap().as_file().metadata()?.len();
2451    assert!(db1_len < db2_len, "expected {} < {}", db1_len, db2_len);
2452    Ok(())
2453  }
2454
2455  #[test]
2456  #[ignore]
2457  fn test_open_recover_free_list() {
2458    todo!()
2459  }
2460
2461  #[test]
2462  fn test_db_begin_err_database_not_open() -> crate::Result<()> {
2463    let db = TestDb::new()?;
2464    let t_db = db.clone_db();
2465    t_db.close();
2466    let r = db.begin_tx();
2467    assert_eq!(Some(Error::DatabaseNotOpen), r.err());
2468    Ok(())
2469  }
2470
2471  #[test]
2472  fn test_db_begin_rw() -> crate::Result<()> {
2473    let mut db = TestDb::new()?;
2474    let tx = db.begin_rw()?;
2475    assert!(tx.writable());
2476    tx.commit()?;
2477    Ok(())
2478  }
2479
2480  #[test]
2481  #[ignore]
2482  fn test_db_concurrent_write_to() {
2483    todo!()
2484  }
2485
2486  #[test]
2487  fn test_db_begin_rw_closed() -> crate::Result<()> {
2488    let mut db = TestDb::new()?;
2489    let t_db = db.clone_db();
2490    t_db.close();
2491    let r = db.begin_rw_tx();
2492    assert_eq!(Some(Error::DatabaseNotOpen), r.err());
2493    Ok(())
2494  }
2495
2496  #[test]
2497  #[ignore]
2498  fn test_db_close_pending_tx_rw() {
2499    todo!()
2500  }
2501
2502  #[test]
2503  #[ignore]
2504  fn test_db_close_pending_tx_ro() {
2505    todo!()
2506  }
2507
2508  #[test]
2509  fn test_db_update() -> crate::Result<()> {
2510    let mut db = TestDb::new()?;
2511    db.update(|mut tx| {
2512      let mut b = tx.create_bucket("widgets")?;
2513      b.put("foo", "bar")?;
2514      b.put("baz", "bat")?;
2515      b.delete("foo")?;
2516      Ok(())
2517    })?;
2518
2519    db.view(|tx| {
2520      let b = tx.bucket("widgets").unwrap();
2521      assert_eq!(None, b.get("foo"));
2522      assert_eq!(Some(b"bat".as_slice()), b.get("baz"));
2523      Ok(())
2524    })?;
2525    Ok(())
2526  }
2527
2528  #[test]
2529  fn test_db_update_closed() -> crate::Result<()> {
2530    let mut db = TestDb::new()?;
2531    let t_db = db.clone_db();
2532    t_db.close();
2533    let r = db.update(|mut tx| {
2534      tx.create_bucket("widgets")?;
2535      Ok(())
2536    });
2537    assert_eq!(Some(Error::DatabaseNotOpen), r.err());
2538    Ok(())
2539  }
2540
2541  #[test]
2542  #[ignore]
2543  fn test_db_update_panic() -> crate::Result<()> {
2544    todo!()
2545  }
2546
2547  #[test]
2548  fn test_db_view_error() -> crate::Result<()> {
2549    let db = TestDb::new()?;
2550    let r = db.view(|_| Err(Error::InvalidDatabase(false))).err();
2551    assert_eq!(Some(Error::InvalidDatabase(false)), r);
2552    Ok(())
2553  }
2554
2555  #[test]
2556  #[ignore]
2557  fn test_db_view_panic() {
2558    todo!()
2559  }
2560
2561  #[test]
2562  fn test_db_stats() -> crate::Result<()> {
2563    let mut db = TestDb::new()?;
2564    db.update(|mut tx| {
2565      tx.create_bucket("widgets")?;
2566      Ok(())
2567    })?;
2568    let stats = db.stats();
2569    assert_eq!(2, stats.tx_stats.page_count());
2570    assert_eq!(0, stats.free_page_n());
2571    assert_eq!(2, stats.pending_page_n());
2572    Ok(())
2573  }
2574
2575  #[test]
2576  fn test_db_consistency() -> crate::Result<()> {
2577    let mut db = TestDb::new()?;
2578    db.update(|mut tx| {
2579      tx.create_bucket("widgets")?;
2580      Ok(())
2581    })?;
2582
2583    for _ in 0..10 {
2584      db.update(|mut tx| {
2585        tx.bucket_mut("widgets").unwrap().put("foo", "bar")?;
2586        Ok(())
2587      })?;
2588    }
2589
2590    db.update(|tx| {
2591      let p = tx.page(PgId(0)).expect("expected page");
2592      assert_eq!("meta", p.t);
2593      let p = tx.page(PgId(1)).expect("expected page");
2594      assert_eq!("meta", p.t);
2595      let p = tx.page(PgId(2)).expect("expected page");
2596      assert_eq!("free", p.t);
2597      let p = tx.page(PgId(3)).expect("expected page");
2598      assert_eq!("free", p.t);
2599      let p = tx.page(PgId(4)).expect("expected page");
2600      assert_eq!("leaf", p.t);
2601      let p = tx.page(PgId(5)).expect("expected page");
2602      assert_eq!("freelist", p.t);
2603      assert_eq!(None, tx.page(PgId(6)));
2604      Ok(())
2605    })?;
2606    Ok(())
2607  }
2608
2609  #[test]
2610  fn test_dbstats_sub() {
2611    let a = DbStats::default();
2612    let b = DbStats::default();
2613    a.tx_stats.inc_page_count(3);
2614    a.set_free_page_n(4);
2615    b.tx_stats.inc_page_count(10);
2616    b.set_free_page_n(14);
2617    let diff = b.sub(&a);
2618    assert_eq!(7, diff.tx_stats.page_count());
2619    assert_eq!(14, diff.free_page_n());
2620  }
2621
2622  #[test]
2623  fn test_db_batch() -> crate::Result<()> {
2624    let mut db = TestDb::new()?;
2625    db.update(|mut tx| {
2626      let _ = tx.create_bucket("widgets")?;
2627      Ok(())
2628    })?;
2629
2630    let n = 2;
2631    let mut threads = Vec::with_capacity(n);
2632    for i in 0..n {
2633      let mut t_db = db.clone_db();
2634      let join = thread::spawn(move || {
2635        t_db.batch(move |tx| {
2636          let mut b = tx.bucket_mut("widgets").unwrap();
2637          b.put(format!("{}", i), "")
2638        })
2639      });
2640      threads.push(join);
2641    }
2642
2643    for t in threads {
2644      t.join().unwrap()?;
2645    }
2646
2647    db.view(|tx| {
2648      let b = tx.bucket("widgets").unwrap();
2649      for i in 0..n {
2650        let g = b.get(format!("{}", i));
2651        assert!(g.is_some(), "key not found {}", i);
2652      }
2653      Ok(())
2654    })?;
2655
2656    Ok(())
2657  }
2658
2659  #[test]
2660  #[ignore]
2661  fn test_db_batch_panic() {
2662    todo!()
2663  }
2664
2665  #[test]
2666  #[ignore]
2667  fn test_db_batch_full() {
2668    todo!()
2669  }
2670
2671  #[test]
2672  #[ignore]
2673  fn test_db_batch_time() {
2674    todo!()
2675  }
2676
2677  #[test]
2678  #[ignore]
2679  fn test_dbunmap() {
2680    todo!()
2681  }
2682
2683  #[test]
2684  #[ignore]
2685  fn benchmark_dbbatch_automatic() {
2686    todo!()
2687  }
2688
2689  #[test]
2690  #[ignore]
2691  fn benchmark_dbbatch_single() {
2692    todo!()
2693  }
2694
2695  #[test]
2696  #[ignore]
2697  fn benchmark_dbbatch_manual10x100() {
2698    todo!()
2699  }
2700}