use crate::arch::size::MAX_MAP_SIZE;
use crate::bucket::BucketRwIApi;
use crate::common::bucket::BucketHeader;
use crate::common::bump::PinBump;
use crate::common::defaults::{
DEFAULT_ALLOC_SIZE, DEFAULT_MAX_BATCH_DELAY, DEFAULT_MAX_BATCH_SIZE, DEFAULT_PAGE_SIZE, MAGIC,
MAX_MMAP_STEP, PGID_NO_FREE_LIST, VERSION,
};
use crate::common::lock::LockGuard;
use crate::common::meta::{MappedMetaPage, Meta};
use crate::common::page::{CoerciblePage, MutPage, PageHeader, RefPage};
use crate::common::pool::{SyncPool, SyncReusable};
use crate::common::self_owned::SelfOwned;
use crate::common::tree::MappedLeafPage;
use crate::common::{BVec, PgId, SplitRef, TxId};
use crate::freelist::{Freelist, MappedFreeListPage};
use crate::tx::{
TxClosingState, TxIApi, TxImpl, TxRef, TxRwApi, TxRwCell, TxRwImpl, TxRwRef, TxStats,
};
use crate::{Error, TxApi};
use aligners::{alignment, AlignedBytes};
use anyhow::anyhow;
use fs4::FileExt;
use memmap2::{Advice, MmapOptions, MmapRaw};
use monotonic_timer::{Guard, Timer};
use parking_lot::{Mutex, MutexGuard, RwLock};
use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write};
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::mpsc::{Receiver, SyncSender};
use std::sync::{mpsc, Arc, OnceLock, Weak};
use std::time::Duration;
#[cfg(feature = "try-begin")]
use std::time::Instant;
use std::{fs, io, mem, thread};
use typed_builder::TypedBuilder;
pub trait DbApi: Clone + Send + Sync
where
Self: Sized,
{
fn begin(&self) -> crate::Result<impl TxApi>;
#[cfg(feature = "try-begin")]
fn try_begin(&self) -> crate::Result<Option<impl TxApi>>;
#[cfg(feature = "try-begin")]
fn try_begin_for(&self, duration: Duration) -> crate::Result<Option<impl TxApi>>;
#[cfg(feature = "try-begin")]
fn try_begin_until(&self, instant: Instant) -> crate::Result<Option<impl TxApi>>;
fn view<'tx, F: Fn(TxRef<'tx>) -> crate::Result<()>>(&'tx self, f: F) -> crate::Result<()>;
fn stats(&self) -> Arc<DbStats>;
fn path(&self) -> &DbPath;
fn info(&self) -> DbInfo;
fn close(self);
}
pub trait DbRwAPI: DbApi {
fn begin_rw(&mut self) -> crate::Result<impl TxRwApi>;
#[cfg(feature = "try-begin")]
fn try_begin_rw(&self) -> crate::Result<Option<impl TxRwApi>>;
#[cfg(feature = "try-begin")]
fn try_begin_rw_for(&self, duration: Duration) -> crate::Result<Option<impl TxRwApi>>;
#[cfg(feature = "try-begin")]
fn try_begin_rw_until(&self, instant: Instant) -> crate::Result<Option<impl TxRwApi>>;
fn update<'tx, F: FnMut(TxRwRef<'tx>) -> crate::Result<()>>(
&'tx mut self, f: F,
) -> crate::Result<()>;
fn batch<F>(&mut self, f: F) -> crate::Result<()>
where
F: FnMut(&mut TxRwRef) -> crate::Result<()> + Send + Sync + Clone + 'static;
fn sync(&mut self) -> crate::Result<()>;
}
#[derive(Default)]
pub struct DbStats {
tx_stats: TxStats,
free_page_n: AtomicI64,
pending_page_n: AtomicI64,
free_alloc: AtomicI64,
free_list_in_use: AtomicI64,
tx_n: AtomicI64,
open_tx_n: AtomicI64,
}
impl DbStats {
pub fn tx_stats(&self) -> &TxStats {
&self.tx_stats
}
pub fn free_page_n(&self) -> i64 {
self.free_page_n.load(Ordering::Acquire)
}
pub(crate) fn set_free_page_n(&self, value: i64) {
self.free_page_n.store(value, Ordering::Release);
}
pub fn pending_page_n(&self) -> i64 {
self.pending_page_n.load(Ordering::Acquire)
}
pub(crate) fn set_pending_page_n(&self, value: i64) {
self.pending_page_n.store(value, Ordering::Release);
}
pub fn free_alloc(&self) -> i64 {
self.free_alloc.load(Ordering::Acquire)
}
pub(crate) fn set_free_alloc(&self, value: i64) {
self.free_alloc.store(value, Ordering::Release);
}
pub fn free_list_in_use(&self) -> i64 {
self.free_list_in_use.load(Ordering::Acquire)
}
pub(crate) fn set_free_list_in_use(&self, value: i64) {
self.free_list_in_use.store(value, Ordering::Release);
}
pub fn tx_n(&self) -> i64 {
self.tx_n.load(Ordering::Acquire)
}
pub(crate) fn inc_tx_n(&self, delta: i64) {
self.tx_n.fetch_add(delta, Ordering::AcqRel);
}
pub fn open_tx_n(&self) -> i64 {
self.open_tx_n.load(Ordering::Acquire)
}
pub(crate) fn sub(&self, rhs: &DbStats) -> DbStats {
let diff = self.clone();
diff.inc_tx_n(-rhs.tx_n());
diff.tx_stats.sub_assign(&rhs.tx_stats);
diff
}
}
impl Clone for DbStats {
fn clone(&self) -> Self {
DbStats {
tx_stats: self.tx_stats.clone(),
free_page_n: self.free_page_n().into(),
pending_page_n: self.pending_page_n().into(),
free_alloc: self.free_alloc().into(),
free_list_in_use: self.free_list_in_use().into(),
tx_n: self.tx_n().into(),
open_tx_n: self.open_tx_n().into(),
}
}
}
pub struct DbState {
txs: Vec<TxId>,
rwtx: Option<TxId>,
is_open: bool,
current_meta: Meta,
}
impl DbState {
fn new(current_meta: Meta) -> DbState {
DbState {
txs: vec![],
rwtx: None,
is_open: true,
current_meta,
}
}
}
fn mmap_size(page_size: usize, size: u64) -> crate::Result<u64> {
for i in 15..=30usize {
if size <= 1 << i {
return Ok(1 << i);
}
}
if size > MAX_MAP_SIZE.bytes() as u64 {
return Err(Error::MMapTooLarge);
}
let mut sz = size;
let remainder = sz % MAX_MMAP_STEP.bytes() as u64;
if remainder > 0 {
sz += MAX_MMAP_STEP.bytes() as u64 - remainder;
}
let ps = page_size as u64;
if sz % ps != 0 {
sz = ((sz / ps) + 1) * ps;
}
if sz > MAX_MAP_SIZE.bytes() as u64 {
sz = MAX_MAP_SIZE.bytes() as u64;
}
Ok(sz)
}
#[derive(Clone, PartialOrd, PartialEq, Ord, Eq, Debug)]
pub enum DbPath {
Memory,
FilePath(PathBuf),
}
impl DbPath {
pub fn file_path(&self) -> Option<&Path> {
match self {
DbPath::Memory => None,
DbPath::FilePath(p) => Some(p),
}
}
}
#[derive(Clone, Debug)]
pub struct DbInfo {
pub page_size: usize,
}
pub(crate) trait DBBackend: Send + Sync {
fn page_size(&self) -> usize;
fn data_size(&self) -> u64;
fn validate_meta(&self) -> crate::Result<()> {
let meta0 = self.meta0();
let meta1 = self.meta1();
if let (Err(error), Err(_)) = (meta0.meta.validate(), meta1.meta.validate()) {
return Err(error);
}
Ok(())
}
fn meta(&self) -> Meta {
let meta0 = self.meta0();
let meta1 = self.meta1();
let (meta_a, meta_b) = {
if meta1.meta.txid() > meta0.meta.txid() {
(meta1.meta, meta0.meta)
} else {
(meta0.meta, meta1.meta)
}
};
if meta_a.validate().is_ok() {
return meta_a;
} else if meta_b.validate().is_ok() {
return meta_b;
}
panic!("bolt.db.meta: invalid meta page")
}
fn meta0(&self) -> MappedMetaPage;
fn meta1(&self) -> MappedMetaPage;
fn page<'tx>(&self, pg_id: PgId) -> RefPage<'tx>;
fn grow(&self, size: u64) -> crate::Result<()>;
fn mmap(&mut self, min_size: u64, tx: TxRwCell) -> crate::Result<()>;
fn fsync(&self) -> crate::Result<()>;
fn write_all_at(&self, buffer: &[u8], offset: u64) -> crate::Result<usize>;
fn freelist(&self) -> MutexGuard<Freelist>;
}
struct ClosedBackend {}
impl DBBackend for ClosedBackend {
fn page_size(&self) -> usize {
unreachable!()
}
fn data_size(&self) -> u64 {
unreachable!()
}
fn meta0(&self) -> MappedMetaPage {
unreachable!()
}
fn meta1(&self) -> MappedMetaPage {
unreachable!()
}
fn page<'tx>(&self, _pg_id: PgId) -> RefPage<'tx> {
unreachable!()
}
fn grow(&self, _size: u64) -> crate::Result<()> {
unreachable!()
}
fn mmap(&mut self, _min_size: u64, _tx: TxRwCell) -> crate::Result<()> {
unreachable!()
}
fn fsync(&self) -> crate::Result<()> {
unreachable!()
}
fn write_all_at(&self, _buffer: &[u8], _offset: u64) -> crate::Result<usize> {
unreachable!()
}
fn freelist(&self) -> MutexGuard<Freelist> {
unreachable!()
}
}
struct MemBackend {
mmap: Mutex<AlignedBytes<alignment::Page>>,
freelist: OnceLock<Mutex<Freelist>>,
page_size: usize,
alloc_size: u64,
file_size: u64,
data_size: u64,
}
unsafe impl Send for MemBackend {}
unsafe impl Sync for MemBackend {}
impl DBBackend for MemBackend {
fn page_size(&self) -> usize {
self.page_size
}
fn data_size(&self) -> u64 {
self.data_size
}
fn meta0(&self) -> MappedMetaPage {
unsafe { MappedMetaPage::new(self.mmap.lock().as_ptr().cast_mut()) }
}
fn meta1(&self) -> MappedMetaPage {
unsafe { MappedMetaPage::new(self.mmap.lock().as_ptr().add(self.page_size).cast_mut()) }
}
fn page<'tx>(&self, pg_id: PgId) -> RefPage<'tx> {
let mmap = self.mmap.lock();
debug_assert!(((pg_id.0 as usize + 1) * self.page_size) <= mmap.len());
unsafe { RefPage::new(mmap.as_ptr().byte_add(pg_id.0 as usize * self.page_size)) }
}
fn grow(&self, size: u64) -> crate::Result<()> {
let mut mmap = self.mmap.lock();
if size <= mmap.len() as u64 {
return Ok(());
}
let mut new_mmap = AlignedBytes::new_zeroed(size as usize);
new_mmap[0..mmap.len()].copy_from_slice(&mmap);
*mmap = new_mmap;
Ok(())
}
fn mmap(&mut self, min_size: u64, _tx: TxRwCell) -> crate::Result<()> {
let mut size = {
let mmap = self.mmap.lock();
if mmap.len() < self.page_size * 2 {
return Err(Error::MMapTooSmall(mmap.len() as u64));
}
(mmap.len() as u64).max(min_size)
};
size = mmap_size(self.page_size, size)?;
self.validate_meta()?;
self.data_size = size;
Ok(())
}
fn fsync(&self) -> crate::Result<()> {
Ok(())
}
fn write_all_at(&self, buffer: &[u8], offset: u64) -> crate::Result<usize> {
let mut mmap = self.mmap.lock();
let write_to = &mut mmap[offset as usize..offset as usize + buffer.len()];
write_to.copy_from_slice(buffer);
let written = write_to.len();
Ok(written)
}
fn freelist(&self) -> MutexGuard<Freelist> {
self
.freelist
.get_or_init(|| {
let meta = self.meta();
let freelist_pgid = meta.free_list();
let refpage = self.page(freelist_pgid);
let freelist_page = MappedFreeListPage::coerce_ref(&refpage).unwrap();
let freelist = freelist_page.read();
Mutex::new(freelist)
})
.lock()
}
}
struct FileState {
file: File,
file_size: u64,
}
impl Deref for FileState {
type Target = File;
fn deref(&self) -> &Self::Target {
&self.file
}
}
impl DerefMut for FileState {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}
pub struct FileBackend {
path: Arc<PathBuf>,
file: Mutex<FileState>,
page_size: usize,
mmap: Option<MmapRaw>,
freelist: OnceLock<Mutex<Freelist>>,
alloc_size: u64,
data_size: u64,
use_mlock: bool,
grow_async: bool,
read_only: bool,
}
impl FileBackend {
fn invalidate(&mut self) {
self.data_size = 0;
}
fn munmap(&mut self) -> crate::Result<()> {
self.mmap = None;
self.invalidate();
Ok(())
}
fn has_synced_free_list(&self) -> bool {
self.meta().free_list() != PGID_NO_FREE_LIST
}
fn mmap_unlock(&mut self) -> crate::Result<()> {
if let Some(mmap) = &mut self.mmap {
mmap.unlock()?;
}
Ok(())
}
fn mmap_lock(&mut self) -> crate::Result<()> {
if let Some(mmap) = &mut self.mmap {
mmap.lock()?;
}
Ok(())
}
fn mmap_relock(&mut self) -> crate::Result<()> {
self.mmap_unlock()?;
self.mmap_lock()?;
Ok(())
}
fn get_page_size(file: &mut File) -> crate::Result<usize> {
let meta0_can_read = match Self::get_page_size_from_first_meta(file) {
Ok(page_size) => return Ok(page_size),
Err(Error::InvalidDatabase(meta_can_read)) => meta_can_read,
Err(e) => return Err(e),
};
let meta1_can_read = match Self::get_page_size_from_second_meta(file) {
Ok(page_size) => return Ok(page_size),
Err(Error::InvalidDatabase(meta_can_read)) => meta_can_read,
Err(e) => return Err(e),
};
if meta0_can_read || meta1_can_read {
return Ok(DEFAULT_PAGE_SIZE.bytes() as usize);
}
Err(Error::InvalidDatabase(false))
}
fn get_page_size_from_first_meta(file: &mut File) -> crate::Result<usize> {
let mut buffer = AlignedBytes::<alignment::Page>::new_zeroed(4096);
let refpage = RefPage::new(buffer.as_ptr());
let mut meta_can_read = false;
let bw = file
.seek(SeekFrom::Start(0))
.and_then(|_| file.read(&mut buffer))
.map_err(|_| Error::InvalidDatabase(meta_can_read))?;
if bw == buffer.len() {
meta_can_read = true;
if let Some(meta_page) = MappedMetaPage::coerce_ref(&refpage) {
if meta_page.meta.validate().is_ok() {
let page_size = meta_page.meta.page_size();
return Ok(page_size as usize);
}
}
}
Err(Error::InvalidDatabase(meta_can_read))
}
fn get_page_size_from_second_meta(file: &mut File) -> crate::Result<usize> {
let mut meta_can_read = false;
let metadata = file.metadata()?;
let file_size = metadata.len();
let mut buffer = AlignedBytes::<alignment::Page>::new_zeroed(4096);
for i in 0..15u64 {
let pos = 1024u64 << i;
if file_size < 1024 || pos >= file_size - 1024 {
break;
}
let bw = file
.seek(SeekFrom::Start(pos))
.and_then(|_| file.read(&mut buffer))
.map_err(|_| Error::InvalidDatabase(meta_can_read))? as u64;
if bw == buffer.len() as u64 || bw == file_size - pos {
meta_can_read = true;
if let Some(meta_page) = MappedMetaPage::coerce_ref(&RefPage::new(buffer.as_ptr())) {
if meta_page.meta.validate().is_ok() {
return Ok(meta_page.meta.page_size() as usize);
}
}
}
buffer.fill(0);
}
Err(Error::InvalidDatabase(meta_can_read))
}
pub(crate) fn file_size(&self) -> crate::Result<u64> {
let file_lock = self.file.lock();
let info = file_lock.metadata()?;
let size = info.len();
if size < (self.page_size * 2) as u64 {
return Err(Error::FileSizeTooSmall(size));
}
Ok(size)
}
}
impl DBBackend for FileBackend {
fn page_size(&self) -> usize {
self.page_size
}
fn data_size(&self) -> u64 {
self.data_size
}
fn meta0(&self) -> MappedMetaPage {
self
.mmap
.as_ref()
.map(|mmap| unsafe { MappedMetaPage::new(mmap.as_mut_ptr()) })
.unwrap()
}
fn meta1(&self) -> MappedMetaPage {
self
.mmap
.as_ref()
.map(|mmap| unsafe { MappedMetaPage::new(mmap.as_mut_ptr().add(self.page_size)) })
.unwrap()
}
fn page<'tx>(&self, pg_id: PgId) -> RefPage<'tx> {
let page_addr = pg_id.0 as usize * self.page_size;
let page_ptr = unsafe { self.mmap.as_ref().unwrap().as_ptr().add(page_addr) };
RefPage::new(page_ptr)
}
fn grow(&self, mut size: u64) -> crate::Result<()> {
let file_size = self.file.lock().file_size;
if size <= file_size {
return Ok(());
}
if self.data_size <= self.alloc_size {
size = self.data_size;
} else {
size += self.alloc_size;
}
if self.grow_async && !self.read_only {
let file_lock = self.file.lock();
#[cfg(mlock_supported)]
if self.use_mlock {
self.mmap.as_ref().unwrap().unlock()?;
}
if cfg!(not(target_os = "windows")) {
file_lock.set_len(size)?;
}
file_lock.sync_all()?;
#[cfg(mlock_supported)]
if self.use_mlock {
self.mmap.as_ref().unwrap().lock()?;
}
}
self.file.lock().file_size = size;
Ok(())
}
fn mmap(&mut self, min_size: u64, tx: TxRwCell) -> crate::Result<()> {
let file_size = self.file_size()?;
let mut size = file_size.max(min_size);
size = mmap_size(self.page_size, size)?;
if let Some(mmap) = self.mmap.take() {
#[cfg(mlock_supported)]
if self.use_mlock {
mmap.unlock()?;
}
tx.cell.bound().own_in();
}
let file_lock = self.file.lock();
let mmap = MmapOptions::new()
.len(size as usize)
.map_raw(&**file_lock)?;
#[cfg(mlock_supported)]
if self.use_mlock {
mmap.lock()?;
}
#[cfg(mmap_advise_supported)]
mmap.advise(Advice::Random)?;
self.mmap = Some(mmap);
let r0 = self.meta0().meta.validate();
let r1 = self.meta1().meta.validate();
if r0.is_err() && r1.is_err() {
return r0;
}
self.data_size = size;
Ok(())
}
fn fsync(&self) -> crate::Result<()> {
self.file.lock().sync_all().map_err(Error::IO)
}
fn write_all_at(&self, buffer: &[u8], offset: u64) -> crate::Result<usize> {
let mut file_lock = self.file.lock();
file_lock.seek(SeekFrom::Start(offset)).map_err(Error::IO)?;
file_lock
.write_all(buffer)
.map_err(Error::IO)
.map(|_| buffer.len())
}
fn freelist(&self) -> MutexGuard<Freelist> {
self
.freelist
.get_or_init(|| {
let meta = self.meta();
let freelist_pgid = meta.free_list();
let refpage = self.page(freelist_pgid);
let freelist_page = MappedFreeListPage::coerce_ref(&refpage).unwrap();
let freelist = freelist_page.read();
Mutex::new(freelist)
})
.lock()
}
}
impl Drop for FileBackend {
fn drop(&mut self) {
if !self.read_only {
match self.file.lock().unlock() {
Ok(_) => {}
Err(_) => {
todo!("log unlock error")
}
}
}
}
}
pub(crate) enum AllocateResult<'tx> {
Page(SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>),
PageWithNewSize(SelfOwned<AlignedBytes<alignment::Page>, MutPage<'tx>>, u64),
}
pub(crate) trait DbIApi<'tx>: 'tx {
fn page(&self, pg_id: PgId) -> RefPage<'tx>;
fn is_page_free(&self, pg_id: PgId) -> bool;
fn remove_tx(&self, rem_tx: TxId, tx_stats: Arc<TxStats>);
fn allocate(&self, tx: TxRwCell, page_count: u64) -> AllocateResult<'tx>;
fn free_page(&self, txid: TxId, p: &PageHeader);
fn free_pages(&self, state: &mut DbState);
fn freelist_count(&self) -> u64;
fn freelist_copyall(&self, all: &mut BVec<PgId>);
fn commit_freelist(&self, tx: TxRwCell<'tx>) -> crate::Result<AllocateResult<'tx>>;
fn write_all_at(&self, buf: &[u8], offset: u64) -> crate::Result<usize>;
fn fsync(&self) -> crate::Result<()>;
fn repool_allocated(&self, page: AlignedBytes<alignment::Page>);
fn remove_rw_tx(&self, tx_closing_state: TxClosingState, rem_tx: TxId, tx_stats: Arc<TxStats>);
fn grow(&self, size: u64) -> crate::Result<()>;
}
pub(crate) trait DbMutIApi<'tx>: DbIApi<'tx> {
fn mmap_to_new_size(&mut self, min_size: u64, tx: TxRwCell) -> crate::Result<()>;
}
impl<'tx> DbIApi<'tx> for LockGuard<'tx, DbShared> {
fn page(&self, pg_id: PgId) -> RefPage<'tx> {
match self {
LockGuard::R(guard) => guard.page(pg_id),
LockGuard::U(guard) => guard.borrow().page(pg_id),
}
}
fn is_page_free(&self, pg_id: PgId) -> bool {
match self {
LockGuard::R(guard) => guard.is_page_free(pg_id),
LockGuard::U(guard) => guard.borrow().is_page_free(pg_id),
}
}
fn remove_tx(&self, rem_tx: TxId, tx_stats: Arc<TxStats>) {
match self {
LockGuard::R(guard) => guard.remove_tx(rem_tx, tx_stats),
LockGuard::U(guard) => guard.borrow().remove_tx(rem_tx, tx_stats),
}
}
fn allocate(&self, tx: TxRwCell, page_count: u64) -> AllocateResult<'tx> {
match self {
LockGuard::R(guard) => guard.allocate(tx, page_count),
LockGuard::U(guard) => guard.borrow().allocate(tx, page_count),
}
}
fn free_page(&self, txid: TxId, p: &PageHeader) {
match self {
LockGuard::R(guard) => guard.free_page(txid, p),
LockGuard::U(guard) => guard.borrow().free_page(txid, p),
}
}
fn free_pages(&self, state: &mut DbState) {
match self {
LockGuard::R(guard) => guard.free_pages(state),
LockGuard::U(guard) => guard.borrow().free_pages(state),
}
}
fn freelist_count(&self) -> u64 {
match self {
LockGuard::R(guard) => guard.freelist_count(),
LockGuard::U(guard) => guard.borrow().freelist_count(),
}
}
fn freelist_copyall(&self, all: &mut BVec<PgId>) {
match self {
LockGuard::R(guard) => guard.freelist_copyall(all),
LockGuard::U(guard) => guard.borrow().freelist_copyall(all),
}
}
fn commit_freelist(&self, tx: TxRwCell<'tx>) -> crate::Result<AllocateResult<'tx>> {
match self {
LockGuard::R(guard) => guard.commit_freelist(tx),
LockGuard::U(guard) => guard.borrow().commit_freelist(tx),
}
}
fn write_all_at(&self, buf: &[u8], offset: u64) -> crate::Result<usize> {
match self {
LockGuard::R(guard) => guard.write_all_at(buf, offset),
LockGuard::U(guard) => guard.borrow().write_all_at(buf, offset),
}
}
fn fsync(&self) -> crate::Result<()> {
match self {
LockGuard::R(guard) => guard.fsync(),
LockGuard::U(guard) => guard.borrow().fsync(),
}
}
fn repool_allocated(&self, page: AlignedBytes<alignment::Page>) {
match self {
LockGuard::R(guard) => guard.repool_allocated(page),
LockGuard::U(guard) => guard.borrow().repool_allocated(page),
}
}
fn remove_rw_tx(&self, tx_closing_state: TxClosingState, rem_tx: TxId, tx_stats: Arc<TxStats>) {
match self {
LockGuard::R(guard) => guard.remove_rw_tx(tx_closing_state, rem_tx, tx_stats),
LockGuard::U(guard) => guard
.borrow()
.remove_rw_tx(tx_closing_state, rem_tx, tx_stats),
}
}
fn grow(&self, size: u64) -> crate::Result<()> {
match self {
LockGuard::R(guard) => guard.grow(size),
LockGuard::U(guard) => guard.borrow().grow(size),
}
}
}
pub struct DbShared {
pub(crate) stats: Arc<DbStats>,
pub(crate) db_state: Arc<Mutex<DbState>>,
page_pool: Mutex<Vec<AlignedBytes<alignment::Page>>>,
pub(crate) backend: Box<dyn DBBackend>,
pub(crate) options: BoltOptions,
}
unsafe impl Sync for DbShared {}
unsafe impl Send for DbShared {}
impl<'tx> DbIApi<'tx> for DbShared {
fn page(&self, pg_id: PgId) -> RefPage<'tx> {
self.backend.page(pg_id)
}
fn is_page_free(&self, pg_id: PgId) -> bool {
self.backend.freelist().freed(pg_id)
}
fn remove_tx(&self, rem_tx: TxId, tx_stats: Arc<TxStats>) {
let mut records = self.db_state.lock();
if let Some(pos) = records.txs.iter().position(|tx| *tx == rem_tx) {
records.txs.swap_remove(pos);
}
let n = records.txs.len();
self.stats.open_tx_n.store(n as i64, Ordering::Release);
self.stats.tx_stats.add_assign(&tx_stats);
}
fn allocate(&self, tx: TxRwCell, page_count: u64) -> AllocateResult<'tx> {
let tx_id = tx.api_id();
let high_water = tx.meta().pgid();
let bytes = if page_count == 1 && !self.page_pool.lock().is_empty() {
let mut page = self.page_pool.lock().pop().unwrap();
page.fill(0);
page
} else {
AlignedBytes::new_zeroed(page_count as usize * self.backend.page_size())
};
{
let tx = tx.cell.borrow();
let stats = tx.r.stats.as_ref().unwrap();
stats.inc_page_count(page_count as i64);
stats.inc_page_alloc((page_count * tx.r.meta.page_size() as u64) as i64);
}
let mut mut_page = SelfOwned::new_with_map(bytes, |b| MutPage::new(b.as_mut_ptr()));
mut_page.overflow = (page_count - 1) as u32;
if let Some(pid) = self.backend.freelist().allocate(tx_id, page_count) {
mut_page.id = pid;
return AllocateResult::Page(mut_page);
}
mut_page.id = high_water;
let min_size = (high_water.0 + page_count + 1) * self.backend.page_size() as u64;
tx.split_r_mut().meta.set_pgid(high_water + page_count);
if min_size > self.backend.data_size() {
AllocateResult::PageWithNewSize(mut_page, min_size)
} else {
AllocateResult::Page(mut_page)
}
}
fn free_page(&self, txid: TxId, p: &PageHeader) {
self.backend.freelist().free(txid, p)
}
fn free_pages(&self, state: &mut DbState) {
let mut freelist = self.backend.freelist();
state.txs.sort();
let mut min_id = TxId(0xFFFFFFFFFFFFFFFF);
if !state.txs.is_empty() {
min_id = *state.txs.first().unwrap();
}
if min_id.0 > 0 {
freelist.release(min_id - 1);
}
for t in &state.txs {
freelist.release_range(min_id, *t - 1);
min_id = *t + 1;
}
freelist.release_range(min_id, TxId(0xFFFFFFFFFFFFFFFF));
}
fn freelist_count(&self) -> u64 {
self.backend.freelist().count()
}
fn freelist_copyall(&self, all: &mut BVec<PgId>) {
self.backend.freelist().copy_all(all)
}
fn commit_freelist(&self, tx: TxRwCell<'tx>) -> crate::Result<AllocateResult<'tx>> {
let count = {
let page_size = self.backend.page_size();
let freelist_size = self.backend.freelist().size();
(freelist_size / page_size as u64) + 1
};
let mut freelist_page = self.allocate(tx, count);
{
let page = match &mut freelist_page {
AllocateResult::Page(page) => page,
AllocateResult::PageWithNewSize(page, _) => page,
};
self
.backend
.freelist()
.write(MappedFreeListPage::mut_into(page))
}
Ok(freelist_page)
}
fn write_all_at(&self, buf: &[u8], offset: u64) -> crate::Result<usize> {
self.backend.write_all_at(buf, offset)
}
fn fsync(&self) -> crate::Result<()> {
self.backend.fsync()
}
fn repool_allocated(&self, page: AlignedBytes<alignment::Page>) {
self.page_pool.lock().push(page);
}
fn remove_rw_tx(&self, tx_closing_state: TxClosingState, rem_tx: TxId, tx_stats: Arc<TxStats>) {
let mut state = self.db_state.lock();
let page_size = self.backend.page_size();
let mut freelist = self.backend.freelist();
if tx_closing_state.is_rollback() {
freelist.rollback(rem_tx);
if tx_closing_state.is_physical_rollback() {
let freelist_page_id = self.backend.meta().free_list();
let freelist_page_ref = self.backend.page(freelist_page_id);
let freelist_page = MappedFreeListPage::coerce_ref(&freelist_page_ref).unwrap();
freelist.reload(freelist_page);
}
}
let free_list_free_n = freelist.free_count();
let free_list_pending_n = freelist.pending_count();
let free_list_alloc = freelist.size();
let new_meta = self.backend.meta();
state.current_meta = new_meta;
state.rwtx = None;
self.stats.set_free_page_n(free_list_free_n as i64);
self.stats.set_pending_page_n(free_list_pending_n as i64);
self
.stats
.set_free_alloc(((free_list_free_n + free_list_pending_n) * page_size as u64) as i64);
self.stats.set_free_list_in_use(free_list_alloc as i64);
self.stats.tx_stats.add_assign(&tx_stats);
}
fn grow(&self, size: u64) -> crate::Result<()> {
self.backend.grow(size)
}
}
impl<'tx> DbMutIApi<'tx> for DbShared {
fn mmap_to_new_size(&mut self, min_size: u64, tx: TxRwCell) -> crate::Result<()> {
self.backend.as_mut().mmap(min_size, tx)
}
}
#[derive(Clone, Default, Debug, PartialEq, Eq, TypedBuilder)]
#[builder(doc)]
pub struct BoltOptions {
#[cfg(timeout_supported)]
#[builder(
default,
setter(
strip_option,
skip,
doc = "Timeout is the amount of time to wait to obtain a file lock. \
When set to zero it will wait indefinitely."
)
)]
timeout: Option<Duration>,
#[builder(
default,
setter(
skip,
doc = "Sets the DB.NoGrowSync flag before memory mapping the file."
)
)]
no_grow_sync: bool,
#[builder(
default,
setter(
skip,
doc = "Do not sync freelist to disk.\
This improves the database write performance under normal operation,\
but requires a full database re-sync during recovery."
)
)]
no_freelist_sync: bool,
#[builder(setter(
strip_bool,
doc = "Sets whether to load the free pages when opening the db file.\
Note when opening db in write mode, bbolt will always load the free pages."
))]
preload_freelist: bool,
#[builder(
default,
setter(
strip_option,
doc = "InitialMmapSize is the initial mmap size of the database in bytes. \
Read transactions won't block write transaction if the InitialMmapSize is \
large enough to hold database mmap size."
)
)]
initial_mmap_size: Option<u64>,
#[builder(default, setter(strip_option))]
page_size: Option<usize>,
#[builder(setter(strip_bool))]
no_sync: bool,
#[cfg(mlock_supported)]
#[builder(setter(strip_bool))]
mlock: bool,
#[builder(
default,
setter(strip_option, doc = "max_batch_size is the maximum size of a batch.")
)]
max_batch_size: Option<u32>,
#[builder(
default,
setter(
strip_option,
doc = "max_batch_delay is the maximum delay before a batch starts."
)
)]
max_batch_delay: Option<Duration>,
#[builder(default = false, setter(skip))]
read_only: bool,
}
impl BoltOptions {
#[inline]
pub(crate) fn timeout(&self) -> Option<Duration> {
if cfg!(timeout_supported) {
self.timeout
} else {
None
}
}
#[inline]
pub(crate) fn no_grow_sync(&self) -> bool {
self.no_grow_sync
}
#[inline]
pub(crate) fn no_freelist_sync(&self) -> bool {
self.no_freelist_sync
}
#[inline]
pub(crate) fn preload_freelist(&self) -> bool {
if self.read_only {
self.preload_freelist
} else {
true
}
}
#[inline]
pub(crate) fn initial_map_size(&self) -> Option<u64> {
self.initial_mmap_size
}
#[inline]
pub(crate) fn page_size(&self) -> Option<usize> {
self.page_size
}
#[inline]
pub(crate) fn no_sync(&self) -> bool {
self.no_sync
}
#[inline]
pub(crate) fn mlock(&self) -> bool {
if cfg!(mlock_supported) {
self.mlock
} else {
false
}
}
#[inline]
pub(crate) fn read_only(&self) -> bool {
self.read_only
}
pub fn open<T: AsRef<Path>>(self, path: T) -> crate::Result<Bolt> {
Bolt::open_path(path, self)
}
pub fn open_ro<T: AsRef<Path>>(mut self, path: T) -> crate::Result<impl DbApi> {
self.read_only = true;
Bolt::open_path(path, self)
}
pub fn open_mem(self) -> crate::Result<Bolt> {
Bolt::new_mem_with_options(self)
}
}
type BatchFn = dyn FnMut(&mut TxRwRef) -> crate::Result<()> + Send + Sync + 'static;
struct Call {
f: Box<BatchFn>,
err: SyncSender<crate::Result<()>>,
}
struct ScheduledBatch {
timer_guard: Option<Guard>,
calls: Vec<Call>,
}
impl ScheduledBatch {
fn cancel_schedule(&mut self) {
if let Some(guard) = self.timer_guard.take() {
guard.ignore()
}
}
fn run(&mut self, db: &mut Bolt) {
'retry: loop {
if self.calls.is_empty() {
break;
}
let mut fail_idx = None;
let _ = db.update(|mut tx| {
for (i, call) in self.calls.iter_mut().enumerate() {
let result = (call.f)(&mut tx);
if result.is_err() {
fail_idx = Some(i);
return result;
}
}
Ok(())
});
if let Some(idx) = fail_idx {
let call = self.calls.remove(idx);
call.err.send(Err(Error::TrySolo)).unwrap();
continue 'retry;
}
for call in &self.calls {
call.err.send(Ok(())).unwrap()
}
break;
}
}
}
struct InnerBatcher {
timer: Timer,
batch_pool: Arc<SyncPool<ScheduledBatch>>,
scheduled: Mutex<SyncReusable<ScheduledBatch>>,
}
impl InnerBatcher {
fn new(parent: &Arc<Batcher>) -> InnerBatcher {
let b = Arc::downgrade(parent);
let timer = Timer::new();
let batch_pool = SyncPool::new(
|| ScheduledBatch {
timer_guard: None,
calls: Vec::with_capacity(0),
},
|batch| {
batch.timer_guard = None;
batch.calls.clear();
},
);
let guard = timer.schedule_with_delay(parent.max_batch_delay, move || {
let batcher = b.upgrade().unwrap();
let mut db = Bolt {
inner: batcher.db.upgrade().unwrap(),
};
batcher.take_batch().run(&mut db)
});
let mut scheduled = batch_pool.pull();
scheduled.timer_guard = Some(guard);
InnerBatcher {
timer,
batch_pool,
scheduled: scheduled.into(),
}
}
}
struct Batcher {
inner: OnceLock<InnerBatcher>,
db: Weak<InnerDB>,
max_batch_delay: Duration,
max_batch_size: u32,
}
impl Batcher {
fn new(db: Weak<InnerDB>, max_batch_delay: Duration, max_batch_size: u32) -> Arc<Batcher> {
Arc::new(Batcher {
inner: Default::default(),
db,
max_batch_delay,
max_batch_size,
})
}
fn inner<'a>(self: &'a Arc<Batcher>) -> &'a InnerBatcher {
self.inner.get_or_init(move || InnerBatcher::new(self))
}
fn batch<F>(self: &Arc<Batcher>, mut db: Bolt, mut f: F) -> crate::Result<()>
where
F: FnMut(&mut TxRwRef) -> crate::Result<()> + Send + Sync + Clone + 'static,
{
if self.max_batch_size == 0 || self.max_batch_delay.is_zero() {
return Err(Error::BatchDisabled);
}
let inner = self.inner();
let (call_len, rx) = {
let mut batch = inner.scheduled.lock();
let (tx, rx): (SyncSender<crate::Result<()>>, Receiver<crate::Result<()>>) =
mpsc::sync_channel(1);
batch.calls.push(Call {
f: Box::new(f.clone()),
err: tx,
});
(batch.calls.len(), rx)
};
if call_len > self.max_batch_size as usize {
let mut immediate = self.take_batch();
if !immediate.calls.is_empty() {
let mut i_db = db.clone();
thread::spawn(move || immediate.run(&mut i_db));
}
}
let result = rx.recv().unwrap();
if Err(Error::TrySolo) == result {
db.update(|mut tx| f(&mut tx))?;
}
Ok(())
}
fn schedule_batch(self: &Arc<Batcher>) -> Guard {
let inner = self.inner();
let b = Arc::downgrade(self);
inner
.timer
.schedule_with_delay(self.max_batch_delay, move || {
let batcher = b.upgrade().unwrap();
let mut db = Bolt {
inner: batcher.db.upgrade().unwrap(),
};
batcher.take_batch().run(&mut db)
})
}
fn take_batch(self: &Arc<Batcher>) -> SyncReusable<ScheduledBatch> {
let inner = self.inner();
let mut swap_batch = inner.batch_pool.pull();
let mut lock = inner.scheduled.lock();
mem::swap(&mut swap_batch, &mut *lock);
swap_batch.cancel_schedule();
let guard = self.schedule_batch();
lock.timer_guard = Some(guard);
swap_batch
}
}
pub struct InnerDB {
path: Arc<DbPath>,
bump_pool: Arc<SyncPool<Pin<Box<PinBump>>>>,
db: RwLock<DbShared>,
stats: Arc<DbStats>,
db_state: Arc<Mutex<DbState>>,
batcher: Arc<Batcher>,
}
unsafe impl Send for InnerDB {}
unsafe impl Sync for InnerDB {}
#[derive(Clone)]
pub struct Bolt {
inner: Arc<InnerDB>,
}
impl Bolt {
pub fn open<T: AsRef<Path>>(path: T) -> crate::Result<Self> {
Bolt::open_path(path, BoltOptions::default())
}
pub fn open_ro<T: AsRef<Path>>(path: T) -> crate::Result<impl DbApi> {
Bolt::open_path(
path,
BoltOptions {
read_only: true,
..Default::default()
},
)
}
fn new_file_backend(path: &Path, bolt_options: BoltOptions) -> crate::Result<Bolt> {
let read_only = bolt_options.read_only();
let mut file = if bolt_options.read_only() {
let file = fs::OpenOptions::new().read(true).open(path)?;
file.lock_shared()?;
file
} else {
let mut file = fs::OpenOptions::new().write(true).read(true).open(path)?;
file.lock_exclusive()?;
if !path.exists() || path.metadata()?.len() == 0 {
let page_size = bolt_options
.page_size()
.unwrap_or(DEFAULT_PAGE_SIZE.bytes() as usize);
Bolt::init(path, &mut file, page_size)?;
}
file
};
let page_size = FileBackend::get_page_size(&mut file)?;
assert!(page_size > 0, "invalid page size");
let file_size = file.metadata()?.len();
let data_size = if let Some(initial_mmap_size) = bolt_options.initial_map_size() {
file_size.max(initial_mmap_size)
} else {
file_size
};
let options = MmapOptions::new()
.offset(0)
.len(data_size as usize)
.to_owned();
let mmap = if read_only {
options.map_raw_read_only(&file)?
} else {
options.map_raw(&file)?
};
#[cfg(mlock_supported)]
if bolt_options.mlock() {
mmap.lock()?;
}
#[cfg(mmap_advise_supported)]
mmap.advise(Advice::Random)?;
let backend = FileBackend {
path: Arc::new(path.into()),
file: Mutex::new(FileState { file, file_size }),
page_size,
mmap: Some(mmap),
freelist: OnceLock::new(),
alloc_size: DEFAULT_ALLOC_SIZE.bytes() as u64,
data_size,
use_mlock: bolt_options.mlock(),
grow_async: !bolt_options.no_grow_sync(),
read_only,
};
backend.file_size()?;
let backend = Box::new(backend);
Self::new_db(DbPath::FilePath(path.into()), bolt_options, backend)
}
fn new_mem_with_options(bolt_options: BoltOptions) -> crate::Result<Bolt> {
let page_size = bolt_options
.page_size()
.unwrap_or(DEFAULT_PAGE_SIZE.bytes() as usize);
let mut mmap = Bolt::init_page(page_size);
let file_size = mmap.len() as u64;
let data_size = if let Some(initial_mmap_size) = bolt_options.initial_map_size() {
file_size.max(initial_mmap_size)
} else {
file_size
};
if file_size < data_size {
let mut new_mmap = AlignedBytes::new_zeroed(data_size as usize);
new_mmap
.split_at_mut(file_size as usize)
.0
.copy_from_slice(&mmap);
mmap = new_mmap;
}
let backend = MemBackend {
mmap: Mutex::new(mmap),
freelist: OnceLock::new(),
page_size,
alloc_size: DEFAULT_ALLOC_SIZE.bytes() as u64,
file_size,
data_size,
};
let backend = Box::new(backend);
Self::new_db(DbPath::Memory, bolt_options, backend)
}
fn new_db(
db_path: DbPath, bolt_options: BoltOptions, backend: Box<dyn DBBackend>,
) -> crate::Result<Self> {
backend.validate_meta()?;
let mut free_count = 0u64;
if bolt_options.preload_freelist() {
free_count = backend.freelist().free_count();
}
let meta = backend.meta();
if meta.free_list() == PGID_NO_FREE_LIST {
return Err(Error::Other(anyhow!(
"PGID_NO_FREE_LIST not currently supported"
)));
}
let db_state = Arc::new(Mutex::new(DbState::new(meta)));
let stats = DbStats {
free_page_n: (free_count as i64).into(),
..Default::default()
};
let arc_stats = Arc::new(stats);
let bump_pool = SyncPool::new(
|| Box::pin(PinBump::default()),
|bump| Pin::as_mut(bump).reset(),
);
let inner = Arc::new_cyclic(|weak| InnerDB {
path: Arc::new(db_path),
bump_pool,
db: RwLock::new(DbShared {
stats: arc_stats.clone(),
db_state: db_state.clone(),
backend,
page_pool: Mutex::new(vec![]),
options: bolt_options.clone(),
}),
stats: arc_stats,
db_state,
batcher: Arc::new(Batcher {
inner: Default::default(),
db: weak.clone(),
max_batch_delay: bolt_options
.max_batch_delay
.unwrap_or(DEFAULT_MAX_BATCH_DELAY),
max_batch_size: bolt_options
.max_batch_size
.unwrap_or(DEFAULT_MAX_BATCH_SIZE),
}),
});
Ok(Bolt { inner })
}
fn open_path<T: AsRef<Path>>(path: T, db_options: BoltOptions) -> crate::Result<Self> {
let pref = path.as_ref();
Self::new_file_backend(pref, db_options)
}
pub fn open_mem() -> crate::Result<Self> {
Bolt::new_mem_with_options(BoltOptions::default())
}
fn init_page(page_size: usize) -> AlignedBytes<alignment::Page> {
let mut buffer = AlignedBytes::<alignment::Page>::new_zeroed(page_size * 4);
for (i, page_bytes) in buffer.chunks_mut(page_size).enumerate() {
let mut page = MutPage::new(page_bytes.as_mut_ptr());
if i < 2 {
let meta_page = MappedMetaPage::mut_into(&mut page);
let ph = &mut meta_page.page;
ph.id = PgId(i as u64);
ph.count = 0;
ph.overflow = 0;
let meta = &mut meta_page.meta;
meta.set_magic(MAGIC);
meta.set_version(VERSION);
meta.set_page_size(page_size as u32);
meta.set_free_list(PgId(2));
meta.set_root(BucketHeader::new(PgId(3), 0));
meta.set_pgid(PgId(4));
meta.set_txid(TxId(i as u64));
meta.set_checksum(meta.sum64());
} else if i == 2 {
let free_list = MappedFreeListPage::mut_into(&mut page);
free_list.id = PgId(2);
free_list.count = 0;
free_list.overflow = 0;
} else if i == 3 {
let leaf_page = MappedLeafPage::mut_into(&mut page);
leaf_page.id = PgId(3);
leaf_page.count = 0;
leaf_page.overflow = 0;
}
}
buffer
}
fn init(path: &Path, db: &mut File, page_size: usize) -> io::Result<usize> {
let buffer = Bolt::init_page(page_size);
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let metadata = db.metadata()?;
let mut permissions = metadata.permissions();
permissions.set_mode(0o600);
fs::set_permissions(path, permissions)?;
}
db.write_all(&buffer)?;
db.flush()?;
Ok(buffer.len())
}
fn require_open(state: &DbState) -> crate::Result<()> {
if !state.is_open {
return Err(Error::DatabaseNotOpen);
}
Ok(())
}
pub(crate) fn begin_tx(&self) -> crate::Result<TxImpl> {
let mut state = self.inner.db_state.lock();
Bolt::require_open(&state)?;
let lock = self.inner.db.read();
let bump = self.inner.bump_pool.pull();
let meta = state.current_meta;
let txid = meta.txid();
state.txs.push(txid);
self.inner.stats.inc_tx_n(1);
self
.inner
.stats
.open_tx_n
.store(state.txs.len() as i64, Ordering::Release);
Ok(TxImpl::new(bump, lock, meta))
}
#[cfg(feature = "try-begin")]
pub(crate) fn try_begin_tx<'a, F>(&'a self, f: F) -> crate::Result<Option<TxImpl>>
where
F: Fn() -> Option<RwLockReadGuard<'a, DbShared>>,
{
let mut state = self.inner.db_state.lock();
Bolt::require_open(&state)?;
if let Some(lock) = f() {
let bump = self.inner.bump_pool.pull();
let meta = state.current_meta;
let txid = meta.txid();
state.txs.push(txid);
self.inner.stats.inc_tx_n(1);
self
.inner
.stats
.open_tx_n
.store(state.txs.len() as i64, Ordering::Release);
Ok(Some(TxImpl::new(bump, lock, meta)))
} else {
Ok(None)
}
}
pub(crate) fn begin_rw_tx(&mut self) -> crate::Result<TxRwImpl> {
let lock = self.inner.db.upgradable_read();
let mut state = self.inner.db_state.lock();
Bolt::require_open(&state)?;
lock.free_pages(&mut state);
let bump = self.inner.bump_pool.pull();
let mut meta = state.current_meta;
let txid = meta.txid() + 1;
meta.set_txid(txid);
state.rwtx = Some(txid);
Ok(TxRwImpl::new(bump, lock, meta))
}
#[cfg(feature = "try-begin")]
pub(crate) fn try_begin_rw_tx<'a, F>(&'a self, f: F) -> crate::Result<Option<TxRwImpl>>
where
F: Fn() -> Option<RwLockUpgradableReadGuard<'a, DbShared>>,
{
if let Some(lock) = f() {
lock.free_pages();
let mut state = self.inner.db_state.lock();
Bolt::require_open(&state)?;
let bump = self.inner.bump_pool.pull();
let mut meta = state.current_meta;
let txid = meta.txid() + 1;
meta.set_txid(txid);
state.rwtx = Some(txid);
Ok(Some(TxRwImpl::new(bump, lock, meta)))
} else {
Ok(None)
}
}
}
impl DbApi for Bolt {
fn begin(&self) -> crate::Result<impl TxApi> {
self.begin_tx()
}
#[cfg(feature = "try-begin")]
fn try_begin(&self) -> crate::Result<Option<impl TxApi>> {
self.try_begin_tx(|| self.inner.db.try_read())
}
#[cfg(feature = "try-begin")]
fn try_begin_for(&self, duration: Duration) -> crate::Result<Option<impl TxApi>> {
self.try_begin_tx(|| self.inner.db.try_read_for(duration))
}
#[cfg(feature = "try-begin")]
fn try_begin_until(&self, instant: Instant) -> crate::Result<Option<impl TxApi>> {
self.try_begin_tx(|| self.inner.db.try_read_until(instant))
}
fn view<'tx, F: FnMut(TxRef<'tx>) -> crate::Result<()>>(
&'tx self, mut f: F,
) -> crate::Result<()> {
let tx = self.begin_tx()?;
let tx_ref = tx.get_ref();
let r = f(tx_ref);
r
}
fn stats(&self) -> Arc<DbStats> {
self.inner.stats.clone()
}
fn path(&self) -> &DbPath {
&self.inner.path
}
fn info(&self) -> DbInfo {
DbInfo {
page_size: self.inner.db.read().backend.page_size(),
}
}
fn close(self) {
let mut lock = self.inner.db.write();
let mut state = self.inner.db_state.lock();
if Bolt::require_open(&state).is_ok() {
state.is_open = false;
let mut closed_db: Box<dyn DBBackend> = Box::new(ClosedBackend {});
mem::swap(&mut closed_db, &mut lock.backend);
lock.page_pool.lock().clear();
self.inner.bump_pool.clear();
if let Some(inner_batcher) = self.inner.batcher.inner.get() {
inner_batcher.batch_pool.clear();
}
}
}
}
impl DbRwAPI for Bolt {
fn begin_rw(&mut self) -> crate::Result<impl TxRwApi> {
self.begin_rw_tx()
}
#[cfg(feature = "try-begin")]
fn try_begin_rw(&self) -> crate::Result<Option<impl TxRwApi>> {
self.try_begin_rw_tx(|| self.inner.db.try_upgradable_read())
}
#[cfg(feature = "try-begin")]
fn try_begin_rw_for(&self, duration: Duration) -> crate::Result<Option<impl TxRwApi>> {
self.try_begin_rw_tx(|| self.inner.db.try_upgradable_read_for(duration))
}
#[cfg(feature = "try-begin")]
fn try_begin_rw_until(&self, instant: Instant) -> crate::Result<Option<impl TxRwApi>> {
self.try_begin_rw_tx(|| self.inner.db.try_upgradable_read_until(instant))
}
fn update<'tx, F: FnMut(TxRwRef<'tx>) -> crate::Result<()>>(
&'tx mut self, mut f: F,
) -> crate::Result<()> {
let txrw = self.begin_rw_tx()?;
let tx_ref = txrw.get_ref();
match f(tx_ref) {
Ok(_) => {
txrw.commit()?;
Ok(())
}
Err(e) => {
let _ = txrw.rollback();
Err(e)
}
}
}
fn batch<F>(&mut self, f: F) -> crate::Result<()>
where
F: FnMut(&mut TxRwRef) -> crate::Result<()> + Send + Sync + Clone + 'static,
{
self.inner.batcher.batch(self.clone(), f)
}
fn sync(&mut self) -> crate::Result<()> {
self.inner.db.write().backend.fsync()?;
Ok(())
}
}
#[cfg(test)]
mod test {
use crate::common::defaults::DEFAULT_PAGE_SIZE;
use crate::common::meta::MappedMetaPage;
use crate::db::DbStats;
use crate::test_support::{temp_file, TestDb};
use crate::{
Bolt, BoltOptions, BucketApi, BucketRwApi, DbApi, DbPath, DbRwAPI, Error, PgId, TxApi, TxCheck,
TxRwApi, TxRwRefApi,
};
use aligners::{alignment, AlignedBytes};
use std::io::{Read, Seek, SeekFrom, Write};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread;
#[test]
#[cfg(not(miri))]
fn test_open() -> crate::Result<()> {
let db = TestDb::new()?;
db.clone_db().close();
Ok(())
}
#[test]
#[cfg(feature = "long-tests")]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
fn test_open_multiple_threads() -> crate::Result<()> {
let instances = 30;
let iterations = 30;
let mut threads = Vec::new();
let temp_file = Arc::new(temp_file()?);
let (tx, rx) = channel();
for _ in 0..iterations {
for _ in 0..instances {
let t_file = temp_file.clone();
let t_tx = tx.clone();
let handle = thread::spawn(move || {
let db = Bolt::open(t_file.path());
if let Some(error) = db.err() {
let s = format!("{}", &error);
t_tx.send(error).unwrap();
}
});
threads.push(handle);
}
while let Some(handle) = threads.pop() {
handle.join().unwrap();
}
}
drop(tx);
if let Ok(error) = rx.try_recv() {
panic!("Fatal error: {}", error);
}
Ok(())
}
#[test]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
fn test_open_err_path_required() -> crate::Result<()> {
let r = Bolt::open("");
assert!(r.is_err());
Ok(())
}
#[test]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
fn test_open_err_not_exists() -> crate::Result<()> {
let file = temp_file()?;
let path = file.path().join("bad-path");
let r = Bolt::open(path);
assert!(r.is_err());
Ok(())
}
#[test]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
fn test_open_err_invalid() -> crate::Result<()> {
let mut file = temp_file()?;
file
.as_file_mut()
.write_all(b"this is not a bolt database")?;
let r = Bolt::open(file.path());
assert_eq!(Some(Error::InvalidDatabase(false)), r.err());
Ok(())
}
#[test]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
fn test_open_err_version_mismatch() -> crate::Result<()> {
let mut file = temp_file()?;
let db = Bolt::open(file.path())?;
db.close();
let mut bytes = AlignedBytes::<alignment::Page>::new_zeroed(4096 * 2);
file.seek(SeekFrom::Start(0))?;
file.read_exact(&mut bytes)?;
let mut meta_0 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr()) };
let meta0_version = meta_0.meta.version();
meta_0.meta.set_version(meta0_version + 1);
let mut meta_1 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr().add(4096)) };
let meta1_version = meta_1.meta.version();
meta_1.meta.set_version(meta1_version + 1);
file.seek(SeekFrom::Start(0))?;
file.write_all(&bytes)?;
file.flush()?;
let r = Bolt::open(file.path());
assert_eq!(Some(Error::VersionMismatch), r.err());
Ok(())
}
#[test]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
fn test_open_err_checksum() -> crate::Result<()> {
let mut file = temp_file()?;
let db = Bolt::open(file.path())?;
db.close();
let mut bytes = AlignedBytes::<alignment::Page>::new_zeroed(4096 * 2);
file.seek(SeekFrom::Start(0))?;
file.read_exact(&mut bytes)?;
let mut meta_0 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr()) };
let meta0_pgid = meta_0.meta.pgid();
meta_0.meta.set_pgid(meta0_pgid + 1);
let mut meta_1 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr().add(4096)) };
let meta1_pgid = meta_1.meta.pgid();
meta_1.meta.set_pgid(meta1_pgid + 1);
file.seek(SeekFrom::Start(0))?;
file.write_all(&bytes)?;
file.flush()?;
let r = Bolt::open(file.path());
assert_eq!(Some(Error::ChecksumMismatch), r.err());
Ok(())
}
#[test]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
fn test_open_read_page_size_from_meta1_os() -> crate::Result<()> {
let mut file = temp_file()?;
let db = Bolt::open(file.path())?;
db.close();
let mut bytes = AlignedBytes::<alignment::Page>::new_zeroed(4096 * 2);
file.seek(SeekFrom::Start(0))?;
file.read_exact(&mut bytes)?;
let mut meta_0 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr()) };
let meta0_pgid = meta_0.meta.pgid();
meta_0.meta.set_pgid(meta0_pgid + 1);
file.seek(SeekFrom::Start(0))?;
file.write_all(&bytes)?;
file.flush()?;
let db = Bolt::open(file.path())?;
assert_eq!(4096, db.info().page_size);
Ok(())
}
#[test]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
fn test_open_read_page_size_from_meta1_given() -> crate::Result<()> {
for i in 0..=14usize {
let given_page_size = 1024usize << i;
let mut db = TestDb::with_options(BoltOptions::builder().page_size(given_page_size).build())?;
if i % 3 == 0 {
db.must_close();
let named_file = db.tmp_file.as_mut();
let file = named_file.unwrap();
let mut bytes = AlignedBytes::<alignment::Page>::new_zeroed(given_page_size * 2);
file.seek(SeekFrom::Start(0))?;
file.read_exact(&mut bytes)?;
let mut meta_0 = unsafe { MappedMetaPage::new(bytes.as_mut_ptr()) };
let meta0_pgid = meta_0.meta.pgid();
meta_0.meta.set_pgid(meta0_pgid + 1);
file.seek(SeekFrom::Start(0))?;
file.write_all(&bytes)?;
file.flush()?;
db.must_reopen();
}
assert_eq!(given_page_size, db.info().page_size);
}
Ok(())
}
#[test]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
fn test_open_size() -> crate::Result<()> {
let mut db = TestDb::new()?;
let page_size = db.info().page_size;
let v = [0; 1000];
for _tx_ct in 0..1 {
db.update(|mut tx| {
let mut b = tx.create_bucket_if_not_exists("data")?;
for keys_ct in 0..10000 {
let k = format!("{:04}", keys_ct);
b.put(&k, &v)?;
}
Ok(())
})?;
}
db.must_close();
let file_size = {
let tmp_file = db.tmp_file.as_ref();
let file = tmp_file.unwrap();
file.path().metadata()?.len()
};
assert_ne!(0, file_size, "unexpected new file size");
db.must_reopen();
db.update(|mut tx| {
tx.bucket_mut("data").unwrap().put("0", "0")?;
Ok(())
})?;
db.must_close();
let new_size = {
let tmp_file = db.tmp_file.as_ref();
let file = tmp_file.unwrap();
file.path().metadata()?.len()
};
assert!(
file_size > (new_size - (5 * page_size) as u64),
"unexpected file growth: {} => {}",
file_size,
new_size
);
Ok(())
}
#[test]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
#[cfg(feature = "long-tests")]
fn test_open_size_large() -> crate::Result<()> {
let mut db = TestDb::new()?;
let page_size = db.info().page_size;
let v = [0; 50];
for _tx_ct in 0..10000 {
db.update(|mut tx| {
let mut b = tx.create_bucket_if_not_exists("data")?;
for keys_ct in 0..1000u64 {
let k = keys_ct.to_be_bytes();
b.put(k, v)?;
}
Ok(())
})?;
}
db.must_close();
let file_size = {
let tmp_file = db.tmp_file.as_ref();
let file = tmp_file.unwrap();
file.path().metadata()?.len()
};
assert_ne!(0, file_size, "unexpected new file size");
db.must_reopen();
db.update(|mut tx| {
tx.bucket_mut("data").unwrap().put("0", "0")?;
Ok(())
})?;
db.must_close();
let new_size = {
let tmp_file = db.tmp_file.as_ref();
let file = tmp_file.unwrap();
file.path().metadata()?.len()
};
assert!(
file_size > (new_size - (5 * page_size) as u64),
"unexpected file growth: {} => {}",
file_size,
new_size
);
Ok(())
}
#[test]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
fn test_open_check() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.view(|tx| {
assert!(tx.check().is_empty());
Ok(())
})?;
db.must_close();
db.must_reopen();
db.view(|tx| {
assert!(tx.check().is_empty());
Ok(())
})?;
Ok(())
}
#[test]
#[ignore]
fn test_open_meta_init_write_error() {
todo!("pending in go")
}
#[test]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
fn test_open_file_too_small() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.must_close();
{
let temp_file = db.tmp_file.as_mut();
let file = temp_file.unwrap();
file.as_file_mut().set_len(4096)?;
}
assert_eq!(Some(Error::FileSizeTooSmall(4096)), db.reopen().err());
Ok(())
}
#[test]
#[ignore]
fn test_db_open_initial_mmap_size() {
todo!()
}
#[test]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
fn test_db_open_read_only() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut b = tx.create_bucket("widgets")?;
b.put("foo", "bar")?;
Ok(())
})?;
let path = db.path().clone();
db.must_close();
let ro = match path {
DbPath::Memory => panic!("Path is DbPath::Memory"),
DbPath::FilePath(path) => Bolt::open_ro(path)?,
};
ro.view(|tx| {
let b = tx.bucket("widgets").unwrap();
assert_eq!(Some(b"bar".as_slice()), b.get("foo"));
Ok(())
})?;
ro.close();
Ok(())
}
#[test]
#[cfg(not(any(miri, feature = "test-mem-backend")))]
fn test_open_big_page() -> crate::Result<()> {
let page_size = DEFAULT_PAGE_SIZE.bytes() as usize;
let options = BoltOptions::builder().page_size(page_size * 2).build();
let db1 = TestDb::with_options(options.clone())?;
let options = BoltOptions::builder().page_size(page_size * 4).build();
let db2 = TestDb::with_options(options.clone())?;
let db1_len = db1.tmp_file.as_ref().unwrap().as_file().metadata()?.len();
let db2_len = db2.tmp_file.as_ref().unwrap().as_file().metadata()?.len();
assert!(db1_len < db2_len, "expected {} < {}", db1_len, db2_len);
Ok(())
}
#[test]
#[ignore]
fn test_open_recover_free_list() {
todo!()
}
#[test]
fn test_db_begin_err_database_not_open() -> crate::Result<()> {
let db = TestDb::new()?;
let t_db = db.clone_db();
t_db.close();
let r = db.begin_tx();
assert_eq!(Some(Error::DatabaseNotOpen), r.err());
Ok(())
}
#[test]
fn test_db_begin_rw() -> crate::Result<()> {
let mut db = TestDb::new()?;
let tx = db.begin_rw()?;
assert!(tx.writable());
tx.commit()?;
Ok(())
}
#[test]
#[ignore]
fn test_db_concurrent_write_to() {
todo!()
}
#[test]
fn test_db_begin_rw_closed() -> crate::Result<()> {
let mut db = TestDb::new()?;
let t_db = db.clone_db();
t_db.close();
let r = db.begin_rw_tx();
assert_eq!(Some(Error::DatabaseNotOpen), r.err());
Ok(())
}
#[test]
#[ignore]
fn test_db_close_pending_tx_rw() {
todo!()
}
#[test]
#[ignore]
fn test_db_close_pending_tx_ro() {
todo!()
}
#[test]
fn test_db_update() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut b = tx.create_bucket("widgets")?;
b.put("foo", "bar")?;
b.put("baz", "bat")?;
b.delete("foo")?;
Ok(())
})?;
db.view(|tx| {
let b = tx.bucket("widgets").unwrap();
assert_eq!(None, b.get("foo"));
assert_eq!(Some(b"bat".as_slice()), b.get("baz"));
Ok(())
})?;
Ok(())
}
#[test]
fn test_db_update_closed() -> crate::Result<()> {
let mut db = TestDb::new()?;
let t_db = db.clone_db();
t_db.close();
let r = db.update(|mut tx| {
tx.create_bucket("widgets")?;
Ok(())
});
assert_eq!(Some(Error::DatabaseNotOpen), r.err());
Ok(())
}
#[test]
#[ignore]
fn test_db_update_panic() -> crate::Result<()> {
todo!()
}
#[test]
fn test_db_view_error() -> crate::Result<()> {
let db = TestDb::new()?;
let r = db.view(|_| Err(Error::InvalidDatabase(false))).err();
assert_eq!(Some(Error::InvalidDatabase(false)), r);
Ok(())
}
#[test]
#[ignore]
fn test_db_view_panic() {
todo!()
}
#[test]
fn test_db_stats() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
tx.create_bucket("widgets")?;
Ok(())
})?;
let stats = db.stats();
assert_eq!(2, stats.tx_stats.page_count());
assert_eq!(0, stats.free_page_n());
assert_eq!(2, stats.pending_page_n());
Ok(())
}
#[test]
fn test_db_consistency() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
tx.create_bucket("widgets")?;
Ok(())
})?;
for _ in 0..10 {
db.update(|mut tx| {
tx.bucket_mut("widgets").unwrap().put("foo", "bar")?;
Ok(())
})?;
}
db.update(|tx| {
let p = tx.page(PgId(0)).expect("expected page");
assert_eq!("meta", p.t);
let p = tx.page(PgId(1)).expect("expected page");
assert_eq!("meta", p.t);
let p = tx.page(PgId(2)).expect("expected page");
assert_eq!("free", p.t);
let p = tx.page(PgId(3)).expect("expected page");
assert_eq!("free", p.t);
let p = tx.page(PgId(4)).expect("expected page");
assert_eq!("leaf", p.t);
let p = tx.page(PgId(5)).expect("expected page");
assert_eq!("freelist", p.t);
assert_eq!(None, tx.page(PgId(6)));
Ok(())
})?;
Ok(())
}
#[test]
fn test_dbstats_sub() {
let a = DbStats::default();
let b = DbStats::default();
a.tx_stats.inc_page_count(3);
a.set_free_page_n(4);
b.tx_stats.inc_page_count(10);
b.set_free_page_n(14);
let diff = b.sub(&a);
assert_eq!(7, diff.tx_stats.page_count());
assert_eq!(14, diff.free_page_n());
}
#[test]
fn test_db_batch() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let _ = tx.create_bucket("widgets")?;
Ok(())
})?;
let n = 2;
let mut threads = Vec::with_capacity(n);
for i in 0..n {
let mut t_db = db.clone_db();
let join = thread::spawn(move || {
t_db.batch(move |tx| {
let mut b = tx.bucket_mut("widgets").unwrap();
b.put(format!("{}", i), "")
})
});
threads.push(join);
}
for t in threads {
t.join().unwrap()?;
}
db.view(|tx| {
let b = tx.bucket("widgets").unwrap();
for i in 0..n {
let g = b.get(format!("{}", i));
assert!(g.is_some(), "key not found {}", i);
}
Ok(())
})?;
Ok(())
}
#[test]
#[ignore]
fn test_db_batch_panic() {
todo!()
}
#[test]
#[ignore]
fn test_db_batch_full() {
todo!()
}
#[test]
#[ignore]
fn test_db_batch_time() {
todo!()
}
#[test]
#[ignore]
fn test_dbunmap() {
todo!()
}
#[test]
#[ignore]
fn benchmark_dbbatch_automatic() {
todo!()
}
#[test]
#[ignore]
fn benchmark_dbbatch_single() {
todo!()
}
#[test]
#[ignore]
fn benchmark_dbbatch_manual10x100() {
todo!()
}
}