use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
use crate::tree_store::{
AllPageNumbersBtreeIter, BtreeRangeIter, FreedTableKey, InternalTableDefinition, RawBtree,
TableType, TransactionalMemory, PAGE_SIZE,
};
use crate::types::{RedbKey, RedbValue};
use crate::Error;
use crate::{ReadTransaction, Result, WriteTransaction};
use std::fmt::{Display, Formatter};
use std::fs::{File, OpenOptions};
use std::io;
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::ops::RangeFull;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use crate::multimap_table::parse_subtree_roots;
#[cfg(feature = "logging")]
use log::{info, warn};
struct AtomicTransactionId {
inner: AtomicU64,
}
impl AtomicTransactionId {
fn new(last_id: TransactionId) -> Self {
Self {
inner: AtomicU64::new(last_id.0),
}
}
fn next(&self) -> TransactionId {
let id = self.inner.fetch_add(1, Ordering::AcqRel);
TransactionId(id)
}
}
pub struct TableDefinition<'a, K: RedbKey + ?Sized, V: RedbValue + ?Sized> {
name: &'a str,
_key_type: PhantomData<K>,
_value_type: PhantomData<V>,
}
impl<'a, K: RedbKey + ?Sized, V: RedbValue + ?Sized> TableDefinition<'a, K, V> {
pub const fn new(name: &'a str) -> Self {
assert!(!name.is_empty());
assert!(K::ALIGNMENT == 1);
assert!(V::ALIGNMENT == 1);
Self {
name,
_key_type: PhantomData,
_value_type: PhantomData,
}
}
pub fn name(&self) -> &str {
self.name
}
}
impl<'a, K: RedbKey + ?Sized, V: RedbValue + ?Sized> Clone for TableDefinition<'a, K, V> {
fn clone(&self) -> Self {
*self
}
}
impl<'a, K: RedbKey + ?Sized, V: RedbValue + ?Sized> Copy for TableDefinition<'a, K, V> {}
impl<'a, K: RedbKey + ?Sized, V: RedbValue + ?Sized> Display for TableDefinition<'a, K, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}<{}, {}>",
self.name,
K::type_name().name(),
V::type_name().name()
)
}
}
pub struct MultimapTableDefinition<'a, K: RedbKey + ?Sized, V: RedbKey + ?Sized> {
name: &'a str,
_key_type: PhantomData<K>,
_value_type: PhantomData<V>,
}
impl<'a, K: RedbKey + ?Sized, V: RedbKey + ?Sized> MultimapTableDefinition<'a, K, V> {
pub const fn new(name: &'a str) -> Self {
assert!(!name.is_empty());
assert!(K::ALIGNMENT == 1);
assert!(V::ALIGNMENT == 1);
Self {
name,
_key_type: PhantomData,
_value_type: PhantomData,
}
}
pub fn name(&self) -> &str {
self.name
}
}
impl<'a, K: RedbKey + ?Sized, V: RedbKey + ?Sized> Clone for MultimapTableDefinition<'a, K, V> {
fn clone(&self) -> Self {
*self
}
}
impl<'a, K: RedbKey + ?Sized, V: RedbKey + ?Sized> Copy for MultimapTableDefinition<'a, K, V> {}
impl<'a, K: RedbKey + ?Sized, V: RedbKey + ?Sized> Display for MultimapTableDefinition<'a, K, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}<{}, {}>",
self.name,
K::type_name().name(),
V::type_name().name()
)
}
}
pub struct Database {
mem: TransactionalMemory,
next_transaction_id: AtomicTransactionId,
transaction_tracker: Arc<Mutex<TransactionTracker>>,
pub(crate) live_write_transaction: Mutex<Option<TransactionId>>,
}
impl Database {
pub fn create(path: impl AsRef<Path>) -> Result<Database> {
Self::builder().create(path)
}
pub fn open(path: impl AsRef<Path>) -> Result<Database> {
Self::builder().open(path)
}
pub(crate) fn get_memory(&self) -> &TransactionalMemory {
&self.mem
}
fn verify_primary_checksums(mem: &TransactionalMemory) -> Result<bool> {
let (root, root_checksum) = mem
.get_data_root()
.expect("Tried to repair an empty database");
if !RawBtree::new(
Some((root, root_checksum)),
<&str>::fixed_width(),
InternalTableDefinition::fixed_width(),
mem,
)
.verify_checksum()?
{
return Ok(false);
}
if let Some((freed_root, freed_checksum)) = mem.get_freed_root() {
if !RawBtree::new(
Some((freed_root, freed_checksum)),
FreedTableKey::fixed_width(),
None,
mem,
)
.verify_checksum()?
{
return Ok(false);
}
}
let iter: BtreeRangeIter<&str, InternalTableDefinition> =
BtreeRangeIter::new::<RangeFull, &str>(.., Some(root), mem)?;
for entry in iter {
let definition = InternalTableDefinition::from_bytes(entry.value());
if let Some((table_root, table_checksum)) = definition.get_root() {
if !RawBtree::new(
Some((table_root, table_checksum)),
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
mem,
)
.verify_checksum()?
{
return Ok(false);
}
}
}
Ok(true)
}
#[allow(clippy::too_many_arguments)]
fn new(
file: File,
use_mmap: bool,
page_size: usize,
region_size: Option<usize>,
initial_size: Option<u64>,
read_cache_size_bytes: usize,
write_cache_size_bytes: usize,
write_strategy: Option<WriteStrategy>,
) -> Result<Self> {
#[cfg(feature = "logging")]
let file_path = format!("{:?}", &file);
#[cfg(feature = "logging")]
info!("Opening database {:?}", &file_path);
let mut mem = TransactionalMemory::new(
file,
use_mmap,
page_size,
region_size,
initial_size,
read_cache_size_bytes,
write_cache_size_bytes,
write_strategy,
)?;
if mem.needs_repair()? {
#[cfg(feature = "logging")]
warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
if mem.needs_checksum_verification()? && !Self::verify_primary_checksums(&mem)? {
mem.repair_primary_corrupted();
assert!(Self::verify_primary_checksums(&mem)?);
}
mem.begin_repair()?;
let (root, root_checksum) = mem
.get_data_root()
.expect("Tried to repair an empty database");
let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, &mem)?;
mem.mark_pages_allocated(master_pages_iter)?;
let iter: BtreeRangeIter<&str, InternalTableDefinition> =
BtreeRangeIter::new::<RangeFull, &str>(.., Some(root), &mem)?;
for entry in iter {
let definition = InternalTableDefinition::from_bytes(entry.value());
if let Some((table_root, _)) = definition.get_root() {
let table_pages_iter = AllPageNumbersBtreeIter::new(
table_root,
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
&mem,
)?;
mem.mark_pages_allocated(table_pages_iter)?;
if definition.get_type() == TableType::Multimap {
let table_pages_iter = AllPageNumbersBtreeIter::new(
table_root,
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
&mem,
)?;
for table_page in table_pages_iter {
let page = mem.get_page(table_page)?;
let mut subtree_roots = parse_subtree_roots(
&page,
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
);
mem.mark_pages_allocated(subtree_roots.drain(..))?;
}
}
}
}
mem.end_repair()?;
let transaction_id = mem.get_last_committed_transaction_id()?.next();
mem.commit(
Some((root, root_checksum)),
None,
transaction_id,
false,
None,
)?;
}
mem.begin_writable()?;
let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
Ok(Database {
mem,
next_transaction_id: AtomicTransactionId::new(next_transaction_id),
transaction_tracker: Arc::new(Mutex::new(TransactionTracker::new())),
live_write_transaction: Mutex::new(None),
})
}
pub(crate) fn transaction_tracker(&self) -> Arc<Mutex<TransactionTracker>> {
self.transaction_tracker.clone()
}
fn allocate_read_transaction(&self) -> Result<TransactionId> {
let mut guard = self.transaction_tracker.lock().unwrap();
let id = self.mem.get_last_committed_transaction_id()?;
guard.register_read_transaction(id);
Ok(id)
}
pub(crate) fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
let id = self
.transaction_tracker
.lock()
.unwrap()
.allocate_savepoint();
Ok((id, self.allocate_read_transaction()?))
}
pub(crate) fn increment_transaction_id(&self) -> TransactionId {
self.next_transaction_id.next()
}
pub fn builder() -> Builder {
Builder::new()
}
pub fn set_write_strategy(&self, strategy: WriteStrategy) -> Result {
let mut tracker = self.transaction_tracker.lock().unwrap();
tracker.invalidate_all_savepoints();
let guard = self.live_write_transaction.lock().unwrap();
assert!(guard.is_none());
assert!(matches!(strategy, WriteStrategy::TwoPhase));
let id = self.increment_transaction_id();
let root_page = self.mem.get_data_root();
let freed_root = self.mem.get_freed_root();
self.mem
.commit(root_page, freed_root, id, false, Some(strategy.into()))?;
drop(guard);
drop(tracker);
Ok(())
}
pub fn begin_write(&self) -> Result<WriteTransaction> {
WriteTransaction::new(self)
}
pub fn begin_read(&self) -> Result<ReadTransaction> {
let id = self.allocate_read_transaction()?;
#[cfg(feature = "logging")]
info!("Beginning read transaction id={:?}", id);
Ok(ReadTransaction::new(self, id))
}
}
#[derive(Copy, Clone)]
pub enum WriteStrategy {
Checksum,
TwoPhase,
}
pub struct Builder {
page_size: usize,
region_size: Option<usize>,
initial_size: Option<u64>,
read_cache_size_bytes: usize,
write_cache_size_bytes: usize,
write_strategy: Option<WriteStrategy>,
}
impl Builder {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {
page_size: PAGE_SIZE,
region_size: None,
initial_size: None,
read_cache_size_bytes: 1024 * 1024 * 1024,
write_cache_size_bytes: 100 * 1024 * 1024,
write_strategy: None,
}
}
#[cfg(any(fuzzing, test))]
pub fn set_page_size(&mut self, size: usize) -> &mut Self {
assert!(size.is_power_of_two());
self.page_size = std::cmp::max(size, 512);
self
}
pub fn set_write_strategy(&mut self, write_strategy: WriteStrategy) -> &mut Self {
self.write_strategy = Some(write_strategy);
self
}
pub fn set_read_cache_size(&mut self, bytes: usize) -> &mut Self {
self.read_cache_size_bytes = bytes;
self
}
pub fn set_write_cache_size(&mut self, bytes: usize) -> &mut Self {
self.write_cache_size_bytes = bytes;
self
}
#[cfg(test)]
#[cfg(unix)]
fn set_region_size(&mut self, size: usize) -> &mut Self {
assert!(size.is_power_of_two());
self.region_size = Some(size);
self
}
pub fn set_initial_size(&mut self, size: u64) -> &mut Self {
self.initial_size = Some(size);
self
}
pub fn create(&self, path: impl AsRef<Path>) -> Result<Database> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)?;
Database::new(
file,
false,
self.page_size,
self.region_size,
self.initial_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
self.write_strategy,
)
}
pub unsafe fn create_mmapped(&self, path: impl AsRef<Path>) -> Result<Database> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)?;
Database::new(
file,
true,
self.page_size,
self.region_size,
self.initial_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
self.write_strategy,
)
}
pub fn open(&self, path: impl AsRef<Path>) -> Result<Database> {
if !path.as_ref().exists() {
Err(Error::Io(ErrorKind::NotFound.into()))
} else if File::open(path.as_ref())?.metadata()?.len() > 0 {
let file = OpenOptions::new().read(true).write(true).open(path)?;
Database::new(
file,
false,
self.page_size,
None,
self.initial_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
None,
)
} else {
Err(Error::Io(io::Error::from(ErrorKind::InvalidData)))
}
}
pub unsafe fn open_mmapped(&self, path: impl AsRef<Path>) -> Result<Database> {
if !path.as_ref().exists() {
Err(Error::Io(ErrorKind::NotFound.into()))
} else if File::open(path.as_ref())?.metadata()?.len() > 0 {
let file = OpenOptions::new().read(true).write(true).open(path)?;
Database::new(
file,
true,
self.page_size,
None,
self.initial_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
None,
)
} else {
Err(Error::Io(io::Error::from(ErrorKind::InvalidData)))
}
}
}
impl std::fmt::Debug for Database {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Database").finish()
}
}
#[cfg(test)]
mod test {
use tempfile::NamedTempFile;
use crate::{Database, Durability, ReadableTable, TableDefinition, WriteStrategy};
#[test]
fn small_pages() {
let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
let db = Database::builder()
.set_page_size(512)
.create(tmpfile.path())
.unwrap();
let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let txn = db.begin_write().unwrap();
{
txn.open_table(table_definition).unwrap();
}
txn.commit().unwrap();
}
#[test]
fn small_pages2() {
let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
let db = Database::builder()
.set_write_strategy(WriteStrategy::TwoPhase)
.set_page_size(512)
.create(tmpfile.path())
.unwrap();
let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let tx = db.begin_write().unwrap();
let savepoint0 = tx.savepoint().unwrap();
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
let savepoint1 = tx.savepoint().unwrap();
tx.restore_savepoint(&savepoint0).unwrap();
tx.set_durability(Durability::None);
{
let mut t = tx.open_table(table_def).unwrap();
t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
assert!(t.remove(&291295).unwrap().is_none());
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.restore_savepoint(&savepoint0).unwrap();
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
let savepoint2 = tx.savepoint().unwrap();
drop(savepoint0);
tx.restore_savepoint(&savepoint2).unwrap();
{
let mut t = tx.open_table(table_def).unwrap();
assert!(t.get(&2059).unwrap().is_none());
assert!(t.remove(&145227).unwrap().is_none());
assert!(t.remove(&145227).unwrap().is_none());
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
let savepoint3 = tx.savepoint().unwrap();
drop(savepoint1);
tx.restore_savepoint(&savepoint3).unwrap();
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
let savepoint4 = tx.savepoint().unwrap();
drop(savepoint2);
tx.restore_savepoint(&savepoint3).unwrap();
tx.set_durability(Durability::None);
{
let mut t = tx.open_table(table_def).unwrap();
assert!(t.remove(&207936).unwrap().is_none());
}
tx.abort().unwrap();
let mut tx = db.begin_write().unwrap();
let savepoint5 = tx.savepoint().unwrap();
drop(savepoint3);
assert!(tx.restore_savepoint(&savepoint4).is_err());
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.restore_savepoint(&savepoint5).unwrap();
tx.set_durability(Durability::None);
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
}
#[test]
fn small_pages3() {
let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
let db = Database::builder()
.set_write_strategy(WriteStrategy::Checksum)
.set_page_size(1024)
.create(tmpfile.path())
.unwrap();
let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let mut tx = db.begin_write().unwrap();
let _savepoint0 = tx.savepoint().unwrap();
tx.set_durability(Durability::None);
{
let mut t = tx.open_table(table_def).unwrap();
let value = vec![0; 306];
t.insert(&539717, value.as_slice()).unwrap();
}
tx.abort().unwrap();
let mut tx = db.begin_write().unwrap();
let savepoint1 = tx.savepoint().unwrap();
tx.restore_savepoint(&savepoint1).unwrap();
tx.set_durability(Durability::None);
{
let mut t = tx.open_table(table_def).unwrap();
let value = vec![0; 2008];
t.insert(&784384, value.as_slice()).unwrap();
}
tx.abort().unwrap();
}
#[test]
fn small_pages4() {
let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
let db = Database::builder()
.set_write_strategy(WriteStrategy::Checksum)
.set_read_cache_size(1024 * 1024)
.set_write_cache_size(0)
.set_page_size(1024)
.create(tmpfile.path())
.unwrap();
let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let tx = db.begin_write().unwrap();
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let tx = db.begin_write().unwrap();
{
let mut t = tx.open_table(table_def).unwrap();
assert!(t.get(&131072).unwrap().is_none());
let value = vec![0xFF; 1130];
t.insert(&42394, value.as_slice()).unwrap();
t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
assert!(t.get(&0).unwrap().is_none());
}
tx.abort().unwrap();
let tx = db.begin_write().unwrap();
{
let mut t = tx.open_table(table_def).unwrap();
t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
}
tx.abort().unwrap();
}
#[test]
#[cfg(unix)]
fn dynamic_shrink() {
let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let big_value = vec![0u8; 1024];
let db = Database::builder()
.set_region_size(1024 * 1024)
.create(tmpfile.path())
.unwrap();
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(table_definition).unwrap();
for i in 0..2048 {
table.insert(&i, big_value.as_slice()).unwrap();
}
}
txn.commit().unwrap();
let file_size = tmpfile.as_file().metadata().unwrap().len();
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(table_definition).unwrap();
for i in 0..2048 {
table.remove(&i).unwrap();
}
}
txn.commit().unwrap();
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(table_definition).unwrap();
table.insert(0, [].as_slice()).unwrap();
}
txn.commit().unwrap();
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(table_definition).unwrap();
table.remove(0).unwrap();
}
txn.commit().unwrap();
let txn = db.begin_write().unwrap();
txn.commit().unwrap();
let final_file_size = tmpfile.as_file().metadata().unwrap().len();
assert!(final_file_size < file_size);
}
}