use crate::multimap_table::MultimapTable;
use crate::table::{ReadOnlyTable, Table};
use crate::tree_store::{
expand_db_size, get_db_size, DbStats, PageNumber, Storage, TableType, FREED_TABLE,
};
use crate::types::{RedbKey, RedbValue};
use crate::{Error, ReadOnlyMultimapTable};
use memmap2::MmapRaw;
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::panic;
use std::path::Path;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
pub struct Database {
storage: Storage,
}
impl Database {
pub unsafe fn open(path: impl AsRef<Path>, db_size: usize) -> Result<Database, Error> {
let file = if path.as_ref().exists() && File::open(path.as_ref())?.metadata()?.len() > 0 {
let existing_size = get_db_size(path.as_ref())?;
if existing_size != db_size {
return Err(Error::DbSizeMismatch {
path: path.as_ref().to_string_lossy().to_string(),
size: existing_size,
requested_size: db_size,
});
}
OpenOptions::new().read(true).write(true).open(path)?
} else {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)?;
file.set_len(db_size as u64)?;
file
};
let mmap = MmapRaw::map_raw(&file)?;
let storage = Storage::new(mmap, None)?;
Ok(Database { storage })
}
pub unsafe fn resize(path: impl AsRef<Path>, new_size: usize) -> Result<(), Error> {
expand_db_size(path.as_ref(), new_size)?;
Self::open(path, new_size)?;
Ok(())
}
pub fn begin_write(&self) -> Result<DatabaseTransaction, Error> {
DatabaseTransaction::new(&self.storage)
}
pub fn begin_read(&self) -> Result<ReadOnlyDatabaseTransaction, Error> {
Ok(ReadOnlyDatabaseTransaction::new(&self.storage))
}
pub fn stats(&self) -> Result<DbStats, Error> {
self.storage.storage_stats()
}
pub fn print_debug(&self) {
self.storage.print_debug()
}
}
pub struct DatabaseBuilder {
page_size: Option<usize>,
}
impl DatabaseBuilder {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self { page_size: None }
}
pub fn set_page_size(&mut self, size: usize) -> &mut Self {
assert!(size.is_power_of_two());
self.page_size = Some(size);
self
}
pub unsafe fn open(&self, path: impl AsRef<Path>, db_size: usize) -> Result<Database, Error> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)?;
file.set_len(db_size as u64)?;
let mmap = MmapRaw::map_raw(&file)?;
let storage = Storage::new(mmap, self.page_size)?;
Ok(Database { storage })
}
}
#[allow(clippy::type_complexity)]
pub struct DatabaseTransaction<'a> {
storage: &'a Storage,
transaction_id: u128,
root_page: Cell<Option<PageNumber>>,
pending_table_root_changes:
RefCell<HashMap<(Vec<u8>, TableType), Rc<Cell<Option<PageNumber>>>>>,
open_tables: RefCell<HashMap<(Vec<u8>, TableType), &'static panic::Location<'static>>>,
completed: AtomicBool,
}
impl<'a> DatabaseTransaction<'a> {
fn new(storage: &'a Storage) -> Result<Self, Error> {
let transaction_id = storage.allocate_write_transaction()?;
let root_page = storage.get_root_page_number();
Ok(Self {
storage,
transaction_id,
root_page: Cell::new(root_page),
pending_table_root_changes: RefCell::new(Default::default()),
open_tables: RefCell::new(Default::default()),
completed: Default::default(),
})
}
pub fn open_table<K: RedbKey + ?Sized, V: RedbValue + ?Sized>(
&self,
name: impl AsRef<[u8]>,
) -> Result<Table<'a, K, V>, Error> {
assert!(!name.as_ref().is_empty());
assert_ne!(name.as_ref(), FREED_TABLE);
let key = (name.as_ref().to_vec(), TableType::Normal);
if let Some(location) = self.open_tables.borrow().get(&key) {
return Err(Error::TableAlreadyOpen(location));
}
self.open_tables
.borrow_mut()
.insert(key.clone(), panic::Location::caller());
let (definition, root) = self.storage.get_or_create_table(
name.as_ref(),
TableType::Normal,
self.transaction_id,
self.root_page.get(),
)?;
self.root_page.set(Some(root));
let root = self
.pending_table_root_changes
.borrow_mut()
.entry(key)
.or_insert_with(|| Rc::new(Cell::new(definition.get_root())))
.clone();
Ok(Table::new(self.transaction_id, root, self.storage))
}
pub fn open_multimap_table<K: RedbKey + ?Sized, V: RedbKey + ?Sized>(
&self,
name: impl AsRef<[u8]>,
) -> Result<MultimapTable<'a, K, V>, Error> {
assert!(!name.as_ref().is_empty());
assert_ne!(name.as_ref(), FREED_TABLE);
let key = (name.as_ref().to_vec(), TableType::Multimap);
if let Some(location) = self.open_tables.borrow().get(&key) {
return Err(Error::TableAlreadyOpen(location));
}
self.open_tables
.borrow_mut()
.insert(key.clone(), panic::Location::caller());
let (definition, root) = self.storage.get_or_create_table(
name.as_ref(),
TableType::Multimap,
self.transaction_id,
self.root_page.get(),
)?;
self.root_page.set(Some(root));
let root = self
.pending_table_root_changes
.borrow_mut()
.entry(key)
.or_insert_with(|| Rc::new(Cell::new(definition.get_root())))
.clone();
Ok(MultimapTable::new(self.transaction_id, root, self.storage))
}
pub fn delete_table(&self, name: impl AsRef<[u8]>) -> Result<bool, Error> {
assert!(!name.as_ref().is_empty());
let original_root = self.root_page.get();
let (root, found) = self.storage.delete_table(
name.as_ref(),
TableType::Normal,
self.transaction_id,
original_root,
)?;
self.root_page.set(root);
Ok(found)
}
pub fn delete_multimap_table(&self, name: impl AsRef<[u8]>) -> Result<bool, Error> {
assert!(!name.as_ref().is_empty());
let original_root = self.root_page.get();
let (root, found) = self.storage.delete_table(
name.as_ref(),
TableType::Multimap,
self.transaction_id,
original_root,
)?;
self.root_page.set(root);
Ok(found)
}
pub fn commit(self) -> Result<(), Error> {
self.commit_helper(false)
}
pub fn non_durable_commit(self) -> Result<(), Error> {
self.commit_helper(true)
}
pub fn commit_helper(self, non_durable: bool) -> Result<(), Error> {
match self.commit_helper_inner(non_durable) {
Ok(_) => Ok(()),
Err(err) => match err {
Error::OutOfSpace => {
self.abort()?;
Err(err)
}
err => Err(err),
},
}
}
pub fn commit_helper_inner(&self, non_durable: bool) -> Result<(), Error> {
for ((name, table_type), update) in self.pending_table_root_changes.borrow_mut().drain() {
let new_root = self.storage.update_table_root(
&name,
table_type,
update.get(),
self.transaction_id,
self.root_page.get(),
)?;
self.root_page.set(Some(new_root));
}
if non_durable {
self.storage
.non_durable_commit(self.root_page.get(), self.transaction_id)?;
} else {
self.storage
.commit(self.root_page.get(), self.transaction_id)?;
}
self.completed.store(true, Ordering::SeqCst);
Ok(())
}
pub fn abort(self) -> Result<(), Error> {
self.storage
.rollback_uncommited_writes(self.transaction_id)?;
self.completed.store(true, Ordering::SeqCst);
Ok(())
}
}
impl<'a> Drop for DatabaseTransaction<'a> {
fn drop(&mut self) {
if !self.completed.load(Ordering::SeqCst) {
self.storage
.record_leaked_write_transaction(self.transaction_id);
}
}
}
pub struct ReadOnlyDatabaseTransaction<'a> {
storage: &'a Storage,
transaction_id: u128,
root_page: Option<PageNumber>,
}
impl<'a> ReadOnlyDatabaseTransaction<'a> {
fn new(storage: &'a Storage) -> Self {
let transaction_id = storage.allocate_read_transaction();
let root_page = storage.get_root_page_number();
Self {
storage,
transaction_id,
root_page,
}
}
pub fn open_table<K: RedbKey + ?Sized, V: RedbValue + ?Sized>(
&self,
name: impl AsRef<[u8]>,
) -> Result<ReadOnlyTable<'a, K, V>, Error> {
assert!(!name.as_ref().is_empty());
assert_ne!(name.as_ref(), FREED_TABLE);
let definition = self
.storage
.get_table(name.as_ref(), TableType::Normal, self.root_page)?
.ok_or_else(|| {
Error::DoesNotExist(format!(
"Table '{}' does not exist",
String::from_utf8_lossy(name.as_ref())
))
})?;
Ok(ReadOnlyTable::new(definition.get_root(), self.storage))
}
pub fn open_multimap_table<K: RedbKey + ?Sized, V: RedbKey + ?Sized>(
&self,
name: impl AsRef<[u8]>,
) -> Result<ReadOnlyMultimapTable<'a, K, V>, Error> {
assert!(!name.as_ref().is_empty());
assert_ne!(name.as_ref(), FREED_TABLE);
let definition = self
.storage
.get_table(name.as_ref(), TableType::Multimap, self.root_page)?
.ok_or_else(|| {
Error::DoesNotExist(format!(
"Table '{}' does not exist",
String::from_utf8_lossy(name.as_ref())
))
})?;
Ok(ReadOnlyMultimapTable::new(
definition.get_root(),
self.storage,
))
}
}
impl<'a> Drop for ReadOnlyDatabaseTransaction<'a> {
fn drop(&mut self) {
self.storage
.deallocate_read_transaction(self.transaction_id);
}
}
#[cfg(test)]
mod test {
use crate::{Database, Table};
use tempfile::NamedTempFile;
#[test]
fn transaction_id_persistence() {
let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
let db = unsafe { Database::open(tmpfile.path(), 1024 * 1024).unwrap() };
let write_txn = db.begin_write().unwrap();
let mut table: Table<[u8], [u8]> = write_txn.open_table(b"x").unwrap();
table.insert(b"hello", b"world").unwrap();
let first_txn_id = write_txn.transaction_id;
write_txn.commit().unwrap();
drop(db);
let db2 = unsafe { Database::open(tmpfile.path(), 1024 * 1024).unwrap() };
let write_txn = db2.begin_write().unwrap();
assert!(write_txn.transaction_id > first_txn_id);
}
}