use crate::transaction_tracker::{TransactionId, TransactionTracker};
use crate::tree_store::{
Btree, BtreeMut, FreedTableKey, InternalTableDefinition, PageHint, PageNumber, TableTree,
TableType, TransactionalMemory,
};
use crate::types::{RedbKey, RedbValue};
use crate::{
Database, Error, MultimapTable, MultimapTableDefinition, ReadOnlyMultimapTable, ReadOnlyTable,
Result, Savepoint, Table, TableDefinition,
};
#[cfg(feature = "logging")]
use log::{info, warn};
use std::cell::RefCell;
use std::cmp::min;
use std::collections::HashMap;
use std::mem::size_of;
use std::ops::RangeFull;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::{panic, thread};
#[derive(Debug)]
pub struct DatabaseStats {
pub(crate) tree_height: usize,
pub(crate) allocated_pages: usize,
pub(crate) leaf_pages: usize,
pub(crate) branch_pages: usize,
pub(crate) stored_leaf_bytes: usize,
pub(crate) metadata_bytes: usize,
pub(crate) fragmented_bytes: usize,
pub(crate) page_size: usize,
}
impl DatabaseStats {
pub fn tree_height(&self) -> usize {
self.tree_height
}
pub fn allocated_pages(&self) -> usize {
self.allocated_pages
}
pub fn leaf_pages(&self) -> usize {
self.leaf_pages
}
pub fn branch_pages(&self) -> usize {
self.branch_pages
}
pub fn stored_bytes(&self) -> usize {
self.stored_leaf_bytes
}
pub fn metadata_bytes(&self) -> usize {
self.metadata_bytes
}
pub fn fragmented_bytes(&self) -> usize {
self.fragmented_bytes
}
pub fn page_size(&self) -> usize {
self.page_size
}
}
#[derive(Copy, Clone, Debug)]
pub enum Durability {
None,
Eventual,
Immediate,
}
pub struct WriteTransaction<'db> {
db: &'db Database,
transaction_tracker: Arc<Mutex<TransactionTracker>>,
mem: &'db TransactionalMemory,
transaction_id: TransactionId,
table_tree: RefCell<TableTree<'db>>,
freed_tree: BtreeMut<'db, FreedTableKey, &'static [u8]>,
freed_pages: Rc<RefCell<Vec<PageNumber>>>,
open_tables: RefCell<HashMap<String, &'static panic::Location<'static>>>,
completed: bool,
dirty: AtomicBool,
durability: Durability,
live_write_transaction: MutexGuard<'db, Option<TransactionId>>,
}
impl<'db> WriteTransaction<'db> {
pub(crate) fn new(db: &'db Database) -> Result<Self> {
let mut live_write_transaction = db.live_write_transaction.lock().unwrap();
assert!(live_write_transaction.is_none());
let transaction_id = db.increment_transaction_id();
#[cfg(feature = "logging")]
info!("Beginning write transaction id={:?}", transaction_id);
*live_write_transaction = Some(transaction_id);
unsafe {
db.get_memory().mark_transaction(transaction_id);
}
let root_page = db.get_memory().get_data_root();
let freed_root = db.get_memory().get_freed_root();
let freed_pages = Rc::new(RefCell::new(vec![]));
Ok(Self {
db,
transaction_tracker: db.transaction_tracker(),
mem: db.get_memory(),
transaction_id,
table_tree: RefCell::new(TableTree::new(
root_page,
db.get_memory(),
freed_pages.clone(),
)),
freed_tree: BtreeMut::new(freed_root, db.get_memory(), freed_pages.clone()),
freed_pages,
open_tables: RefCell::new(Default::default()),
completed: false,
dirty: AtomicBool::new(false),
durability: Durability::Immediate,
live_write_transaction,
})
}
pub fn savepoint(&self) -> Result<Savepoint> {
if self.dirty.load(Ordering::Acquire) {
return Err(Error::InvalidSavepoint);
}
let (id, transaction_id) = self.db.allocate_savepoint()?;
#[cfg(feature = "logging")]
info!(
"Creating savepoint id={:?}, txn_id={:?}",
id, transaction_id
);
let regional_allocators = self.mem.get_raw_allocator_states();
let root = self.mem.get_data_root();
let freed_root = self.mem.get_freed_root();
let savepoint = Savepoint::new(
self.db,
id,
transaction_id,
root,
freed_root,
regional_allocators,
);
Ok(savepoint)
}
pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result {
assert_eq!(
self.db.transaction_tracker().as_ref() as *const _,
savepoint.db_address()
);
if !self
.transaction_tracker
.lock()
.unwrap()
.is_valid_savepoint(savepoint.get_id())
{
return Err(Error::InvalidSavepoint);
}
#[cfg(feature = "logging")]
info!(
"Beginning savepoint restore (id={:?}) in transaction id={:?}",
savepoint.get_id(),
self.transaction_id
);
assert_eq!(self.db.get_memory().get_version(), savepoint.get_version());
assert_eq!(
self.db.get_memory().checksum_type(),
savepoint.get_checksum_type()
);
self.dirty.store(true, Ordering::Release);
let allocated_since_savepoint = self
.mem
.pages_allocated_since_raw_state(savepoint.get_regional_allocator_states());
let mut freed_pages = vec![];
for page in allocated_since_savepoint {
if self.mem.uncommitted(page) {
unsafe {
self.mem.free(page)?;
}
} else {
freed_pages.push(page);
}
}
*self.freed_pages.borrow_mut() = freed_pages;
self.table_tree = RefCell::new(TableTree::new(
savepoint.get_root(),
self.mem,
self.freed_pages.clone(),
));
let oldest_unprocessed_transaction = if let Some(entry) = self
.freed_tree
.range::<RangeFull, FreedTableKey>(..)?
.next()
{
FreedTableKey::from_bytes(entry.key()).transaction_id
} else {
self.transaction_id.0
};
self.freed_tree = BtreeMut::new(
savepoint.get_freed_root(),
self.mem,
self.freed_pages.clone(),
);
let lookup_key = FreedTableKey {
transaction_id: oldest_unprocessed_transaction,
pagination_id: 0,
};
let mut to_remove = vec![];
for entry in self.freed_tree.range(..lookup_key)? {
to_remove.push(FreedTableKey::from_bytes(entry.key()));
}
for key in to_remove {
unsafe { self.freed_tree.remove(&key)? };
}
self.transaction_tracker
.lock()
.unwrap()
.invalidate_savepoints_after(savepoint.get_id());
Ok(())
}
pub fn set_durability(&mut self, durability: Durability) {
self.durability = durability;
}
pub fn open_table<'txn, K: RedbKey + ?Sized, V: RedbValue + ?Sized>(
&'txn self,
definition: TableDefinition<K, V>,
) -> Result<Table<'db, 'txn, K, V>> {
#[cfg(feature = "logging")]
info!("Opening table: {}", definition);
if let Some(location) = self.open_tables.borrow().get(definition.name()) {
return Err(Error::TableAlreadyOpen(
definition.name().to_string(),
location,
));
}
self.dirty.store(true, Ordering::Release);
self.open_tables
.borrow_mut()
.insert(definition.name().to_string(), panic::Location::caller());
let internal_table = self
.table_tree
.borrow_mut()
.get_or_create_table::<K, V>(definition.name(), TableType::Normal)?;
Ok(Table::new(
definition.name(),
internal_table.get_root(),
self.freed_pages.clone(),
self.mem,
self,
))
}
pub fn open_multimap_table<'txn, K: RedbKey + ?Sized, V: RedbKey + ?Sized>(
&'txn self,
definition: MultimapTableDefinition<K, V>,
) -> Result<MultimapTable<'db, 'txn, K, V>> {
#[cfg(feature = "logging")]
info!("Opening multimap table: {}", definition);
if let Some(location) = self.open_tables.borrow().get(definition.name()) {
return Err(Error::TableAlreadyOpen(
definition.name().to_string(),
location,
));
}
self.dirty.store(true, Ordering::Release);
self.open_tables
.borrow_mut()
.insert(definition.name().to_string(), panic::Location::caller());
let internal_table = self
.table_tree
.borrow_mut()
.get_or_create_table::<K, V>(definition.name(), TableType::Multimap)?;
Ok(MultimapTable::new(
definition.name(),
internal_table.get_root(),
self.freed_pages.clone(),
self.mem,
self,
))
}
pub(crate) fn close_table<K: RedbKey + ?Sized, V: RedbValue + ?Sized>(
&self,
name: &str,
table: &mut BtreeMut<K, V>,
) {
self.open_tables.borrow_mut().remove(name).unwrap();
self.table_tree
.borrow_mut()
.stage_update_table_root(name, table.get_root());
}
pub fn delete_table<K: RedbKey + ?Sized, V: RedbValue + ?Sized>(
&self,
definition: TableDefinition<K, V>,
) -> Result<bool> {
#[cfg(feature = "logging")]
info!("Deleting table: {}", definition);
self.dirty.store(true, Ordering::Release);
self.table_tree
.borrow_mut()
.delete_table::<K, V>(definition.name(), TableType::Normal)
}
pub fn delete_multimap_table<K: RedbKey + ?Sized, V: RedbKey + ?Sized>(
&self,
definition: MultimapTableDefinition<K, V>,
) -> Result<bool> {
#[cfg(feature = "logging")]
info!("Deleting multimap table: {}", definition);
self.dirty.store(true, Ordering::Release);
self.table_tree
.borrow_mut()
.delete_table::<K, V>(definition.name(), TableType::Multimap)
}
pub fn list_tables(&self) -> Result<impl Iterator<Item = String> + '_> {
self.table_tree
.borrow()
.list_tables(TableType::Normal)
.map(|x| x.into_iter())
}
pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = String> + '_> {
self.table_tree
.borrow()
.list_tables(TableType::Multimap)
.map(|x| x.into_iter())
}
pub fn commit(mut self) -> Result {
self.table_tree.borrow_mut().flush_table_root_updates()?;
self.commit_inner()
}
fn commit_inner(&mut self) -> Result {
#[cfg(feature = "logging")]
info!(
"Committing transaction id={:?} with durability={:?}",
self.transaction_id, self.durability
);
match self.durability {
Durability::None => self.non_durable_commit()?,
Durability::Eventual => self.durable_commit(true)?,
Durability::Immediate => self.durable_commit(false)?,
}
self.completed = true;
#[cfg(feature = "logging")]
info!(
"Finished commit of transaction id={:?}",
self.transaction_id
);
Ok(())
}
pub fn abort(mut self) -> Result {
self.abort_inner()
}
fn abort_inner(&mut self) -> Result {
#[cfg(feature = "logging")]
info!("Aborting transaction id={:?}", self.transaction_id);
self.table_tree.borrow_mut().clear_table_root_updates();
self.mem.rollback_uncommitted_writes()?;
self.completed = true;
#[cfg(feature = "logging")]
info!("Finished abort of transaction id={:?}", self.transaction_id);
Ok(())
}
pub(crate) fn durable_commit(&mut self, eventual: bool) -> Result {
let oldest_live_read = self
.transaction_tracker
.lock()
.unwrap()
.oldest_live_read_transaction()
.unwrap_or(self.transaction_id);
unsafe {
self.mem.mmap_gc(oldest_live_read)?;
}
let root = self.table_tree.borrow_mut().flush_table_root_updates()?;
self.process_freed_pages(oldest_live_read)?;
self.store_freed_pages()?;
let freed_root = self.freed_tree.get_root();
self.mem
.commit(root, freed_root, self.transaction_id, eventual, None)?;
Ok(())
}
pub(crate) fn non_durable_commit(&mut self) -> Result {
let root = self.table_tree.borrow_mut().flush_table_root_updates()?;
self.store_freed_pages()?;
let freed_root = self.freed_tree.get_root();
self.mem
.non_durable_commit(root, freed_root, self.transaction_id)?;
Ok(())
}
fn process_freed_pages(&mut self, oldest_live_read: TransactionId) -> Result {
assert_eq!(PageNumber::serialized_size(), 8);
let lookup_key = FreedTableKey {
transaction_id: oldest_live_read.0,
pagination_id: 0,
};
let mut to_remove = vec![];
for entry in self.freed_tree.range(..lookup_key)? {
to_remove.push(FreedTableKey::from_bytes(entry.key()));
let value = entry.value();
let length: usize = u64::from_le_bytes(value[..size_of::<u64>()].try_into().unwrap())
.try_into()
.unwrap();
for i in 1..=length {
let page = PageNumber::from_le_bytes(value[i * 8..(i + 1) * 8].try_into().unwrap());
unsafe {
self.mem.free(page)?;
}
}
}
for key in to_remove {
unsafe { self.freed_tree.remove(&key)? };
}
Ok(())
}
fn store_freed_pages(&mut self) -> Result {
assert_eq!(PageNumber::serialized_size(), 8);
let mut pagination_counter = 0u64;
while !self.freed_pages.borrow().is_empty() {
let chunk_size = 100;
let buffer_size = size_of::<u64>() + 8 * chunk_size;
let key = FreedTableKey {
transaction_id: self.transaction_id.0,
pagination_id: pagination_counter,
};
let mut access_guard = unsafe { self.freed_tree.insert_reserve(&key, buffer_size)? };
let len = self.freed_pages.borrow().len();
access_guard.as_mut()[..8]
.copy_from_slice(&min(len as u64, chunk_size as u64).to_le_bytes());
for (i, page) in self
.freed_pages
.borrow_mut()
.drain(len - min(len, chunk_size)..)
.enumerate()
{
access_guard.as_mut()[(i + 1) * 8..(i + 2) * 8]
.copy_from_slice(&page.to_le_bytes());
}
drop(access_guard);
pagination_counter += 1;
}
Ok(())
}
pub fn stats(&self) -> Result<DatabaseStats> {
let table_tree = self.table_tree.borrow();
let data_tree_stats = table_tree.stats()?;
let freed_tree_stats = self.freed_tree.stats();
let total_metadata_bytes = data_tree_stats.metadata_bytes()
+ freed_tree_stats.metadata_bytes
+ freed_tree_stats.stored_leaf_bytes;
let total_fragmented =
data_tree_stats.fragmented_bytes() + freed_tree_stats.fragmented_bytes;
Ok(DatabaseStats {
tree_height: data_tree_stats.tree_height(),
allocated_pages: self.mem.count_allocated_pages()?,
leaf_pages: data_tree_stats.leaf_pages(),
branch_pages: data_tree_stats.branch_pages(),
stored_leaf_bytes: data_tree_stats.stored_bytes(),
metadata_bytes: total_metadata_bytes,
fragmented_bytes: total_fragmented,
page_size: self.mem.get_page_size(),
})
}
#[allow(dead_code)]
pub(crate) fn print_debug(&self) {
if let Some(page) = self
.table_tree
.borrow_mut()
.flush_table_root_updates()
.unwrap()
{
eprintln!("Master tree:");
let master_tree: Btree<&str, InternalTableDefinition> =
Btree::new(Some(page), PageHint::None, self.mem);
master_tree.print_debug(true);
}
}
}
impl<'a> Drop for WriteTransaction<'a> {
fn drop(&mut self) {
*self.live_write_transaction = None;
if !self.completed && !thread::panicking() {
#[allow(unused_variables)]
if let Err(error) = self.abort_inner() {
#[cfg(feature = "logging")]
warn!("Failure automatically aborting transaction: {}", error);
}
}
}
}
pub struct ReadTransaction<'a> {
db: &'a Database,
tree: TableTree<'a>,
transaction_id: TransactionId,
}
impl<'db> ReadTransaction<'db> {
pub(crate) fn new(db: &'db Database, transaction_id: TransactionId) -> Self {
let root_page = db.get_memory().get_data_root();
Self {
db,
tree: TableTree::new(root_page, db.get_memory(), Default::default()),
transaction_id,
}
}
pub fn open_table<K: RedbKey + ?Sized, V: RedbValue + ?Sized>(
&self,
definition: TableDefinition<K, V>,
) -> Result<ReadOnlyTable<K, V>> {
let header = self
.tree
.get_table::<K, V>(definition.name(), TableType::Normal)?
.ok_or_else(|| Error::TableDoesNotExist(definition.name().to_string()))?;
Ok(ReadOnlyTable::new(
header.get_root(),
PageHint::Clean,
self.db.get_memory(),
))
}
pub fn open_multimap_table<K: RedbKey + ?Sized, V: RedbKey + ?Sized>(
&self,
definition: MultimapTableDefinition<K, V>,
) -> Result<ReadOnlyMultimapTable<K, V>> {
let header = self
.tree
.get_table::<K, V>(definition.name(), TableType::Multimap)?
.ok_or_else(|| Error::TableDoesNotExist(definition.name().to_string()))?;
Ok(ReadOnlyMultimapTable::new(
header.get_root(),
PageHint::Clean,
self.db.get_memory(),
))
}
pub fn list_tables(&self) -> Result<impl Iterator<Item = String>> {
self.tree
.list_tables(TableType::Normal)
.map(|x| x.into_iter())
}
pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = String>> {
self.tree
.list_tables(TableType::Multimap)
.map(|x| x.into_iter())
}
}
impl<'a> Drop for ReadTransaction<'a> {
fn drop(&mut self) {
self.db
.transaction_tracker()
.lock()
.unwrap()
.deallocate_read_transaction(self.transaction_id);
}
}
#[cfg(test)]
mod test {
use crate::{Database, TableDefinition};
use tempfile::NamedTempFile;
const X: TableDefinition<&[u8], &[u8]> = TableDefinition::new("x");
#[test]
fn transaction_id_persistence() {
let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
let db = Database::create(tmpfile.path()).unwrap();
let write_txn = db.begin_write().unwrap();
{
let mut table = write_txn.open_table(X).unwrap();
table.insert(b"hello", b"world").unwrap();
}
let first_txn_id = write_txn.transaction_id;
write_txn.commit().unwrap();
drop(db);
let db2 = Database::create(tmpfile.path()).unwrap();
let write_txn = db2.begin_write().unwrap();
assert!(write_txn.transaction_id > first_txn_id);
}
}