use std::{
cell::RefCell,
collections::HashSet,
fs::File,
io::{Seek, SeekFrom, Write},
marker::PhantomData,
rc::Rc,
sync::{MutexGuard, RwLockReadGuard},
};
use crate::{
bucket::{Bucket, BucketMeta, InnerBucket},
bytes::ToBytes,
cursor::ToBuckets,
db::{DB, MIN_ALLOC_SIZE},
errors::{Error, Result},
freelist::TxFreelist,
meta::Meta,
node::Node,
page::{Page, PageID, Pages},
BucketName,
};
pub(crate) enum TxLock<'tx> {
Rw(MutexGuard<'tx, File>),
Ro(RwLockReadGuard<'tx, ()>),
}
impl<'tx> TxLock<'tx> {
fn writable(&self) -> bool {
match self {
Self::Rw(_) => true,
Self::Ro(_) => false,
}
}
}
pub struct Tx<'tx> {
pub(crate) inner: RefCell<TxInner<'tx>>,
}
pub(crate) struct TxInner<'tx> {
pub(crate) db: &'tx DB,
pub(crate) lock: TxLock<'tx>,
pub(crate) root: Rc<RefCell<InnerBucket<'tx>>>,
pub(crate) meta: Meta,
pub(crate) freelist: Rc<RefCell<TxFreelist>>,
pages: Pages,
num_freelist_pages: u64,
}
impl<'tx> Tx<'tx> {
pub(crate) fn new(db: &'tx DB, writable: bool) -> Result<Tx<'tx>> {
let lock = match writable {
true => TxLock::Rw(db.inner.file.lock()?),
false => TxLock::Ro(db.inner.mmap_lock.read()?),
};
let mut freelist = db.inner.freelist.lock()?.clone();
let mut meta = db.inner.meta()?;
debug_assert!(meta.valid());
{
let mut open_ro_txs = db.inner.open_ro_txs.lock().unwrap();
if writable {
meta.tx_id += 1;
if open_ro_txs.len() > 0 {
freelist.release(open_ro_txs[0]);
} else {
freelist.release(meta.tx_id);
}
} else {
open_ro_txs.push(meta.tx_id);
open_ro_txs.sort_unstable();
}
}
let freelist = Rc::new(RefCell::new(TxFreelist::new(meta.clone(), freelist)));
let data = db.inner.data.lock()?.clone();
let pages = Pages::new(data, db.inner.pagesize);
let num_freelist_pages = pages.page(meta.freelist_page).overflow + 1;
let root = InnerBucket::from_meta(meta.root, pages.clone());
let root = Rc::new(RefCell::new(root));
let inner = TxInner {
db,
lock,
root,
meta,
freelist,
num_freelist_pages,
pages,
};
Ok(Tx {
inner: RefCell::new(inner),
})
}
pub(crate) fn writable(&self) -> bool {
self.inner.borrow().lock.writable()
}
pub fn get_bucket<'b, T: ToBytes<'tx>>(&'b self, name: T) -> Result<Bucket<'b, 'tx>> {
let tx = self.inner.borrow();
let mut root = tx.root.borrow_mut();
let inner = root.get_bucket(name)?;
Ok(Bucket {
inner,
freelist: tx.freelist.clone(),
writable: tx.lock.writable(),
_phantom: PhantomData,
})
}
pub fn create_bucket<'b, T: ToBytes<'tx>>(&'b self, name: T) -> Result<Bucket<'b, 'tx>> {
let tx = self.inner.borrow();
if !tx.lock.writable() {
return Err(Error::ReadOnlyTx);
}
let mut root = tx.root.borrow_mut();
let inner = root.create_bucket(name)?;
Ok(Bucket {
inner,
freelist: tx.freelist.clone(),
writable: true,
_phantom: PhantomData,
})
}
pub fn get_or_create_bucket<'b, T: ToBytes<'tx>>(&'b self, name: T) -> Result<Bucket<'b, 'tx>> {
let tx = self.inner.borrow();
if !tx.lock.writable() {
return Err(Error::ReadOnlyTx);
}
let mut root = tx.root.borrow_mut();
let inner = root.get_or_create_bucket(name)?;
Ok(Bucket {
inner,
freelist: tx.freelist.clone(),
writable: true,
_phantom: PhantomData,
})
}
pub fn delete_bucket<T: ToBytes<'tx>>(&self, key: T) -> Result<()> {
let tx = self.inner.borrow();
if !tx.lock.writable() {
return Err(Error::ReadOnlyTx);
}
let freelist = tx.freelist.clone();
let mut freelist = freelist.borrow_mut();
let mut root = tx.root.borrow_mut();
root.delete_bucket(key, &mut freelist)
}
pub fn buckets<'b>(&'b self) -> impl Iterator<Item = (BucketName<'b, 'tx>, Bucket<'b, 'tx>)> {
let tx = self.inner.borrow();
let bucket = Bucket {
inner: tx.root.clone(),
freelist: tx.freelist.clone(),
writable: tx.lock.writable(),
_phantom: PhantomData,
};
bucket.cursor().to_buckets()
}
pub fn commit(self) -> Result<()> {
if !self.writable() {
return Err(Error::ReadOnlyTx);
}
let mut tx = self.inner.borrow_mut();
let freelist = tx.freelist.clone();
let mut freelist = freelist.borrow_mut();
let meta = {
let mut root = tx.root.borrow_mut();
root.rebalance(&mut freelist)?;
root.spill(&mut freelist)?
};
tx.meta.root = meta;
tx.write_data(&mut freelist)
}
pub(crate) fn check(&self) -> Result<()> {
self.inner.borrow().check()
}
}
impl<'tx> TxInner<'tx> {
fn write_data(&mut self, freelist: &mut TxFreelist) -> Result<()> {
if let TxLock::Rw(file) = &mut self.lock {
{
freelist.free(self.meta.freelist_page, self.num_freelist_pages);
let freelist_size = freelist.inner.size();
let page = freelist.allocate(freelist_size)?;
self.meta.freelist_page = page.id;
let free_page_ids = freelist.inner.pages();
page.page_type = Page::TYPE_FREELIST;
page.count = free_page_ids.len() as u64;
page.freelist_mut()
.copy_from_slice(free_page_ids.as_slice());
}
self.meta.num_pages = freelist.meta.num_pages;
let required_size = self.meta.num_pages * self.db.inner.pagesize;
let current_size = file.metadata()?.len();
if current_size < required_size {
let size_diff = required_size - current_size;
let alloc_size = ((size_diff / MIN_ALLOC_SIZE) + 1) * MIN_ALLOC_SIZE;
let data = self.db.inner.resize(file, current_size + alloc_size)?;
self.pages = Pages::new(data, self.db.inner.pagesize);
}
{
for (page_id, (ptr, size)) in freelist.pages.iter() {
let buf = unsafe { std::slice::from_raw_parts(ptr.as_ptr(), *size) };
file.seek(SeekFrom::Start(self.db.inner.pagesize * page_id))?;
file.write_all(buf)?;
}
}
}
if self.db.inner.flags.strict_mode {
self.check()?;
}
if let TxLock::Rw(file) = &mut self.lock {
{
let mut buf = vec![0; self.db.inner.pagesize as usize];
#[allow(clippy::cast_ptr_alignment)]
let page = unsafe { &mut *(&mut buf[0] as *mut u8 as *mut Page) };
let meta_page_id = u64::from(self.meta.meta_page == 0);
page.id = meta_page_id;
page.page_type = Page::TYPE_META;
let m = page.meta_mut();
m.meta_page = meta_page_id as u32;
m.magic = self.meta.magic;
m.version = self.meta.version;
m.pagesize = self.meta.pagesize;
m.root = self.meta.root;
m.num_pages = self.meta.num_pages;
m.freelist_page = self.meta.freelist_page;
m.tx_id = self.meta.tx_id;
m.hash = m.hash_self();
file.seek(SeekFrom::Start(self.db.inner.pagesize * meta_page_id))?;
file.write_all(buf.as_slice())?;
}
file.flush()?;
file.sync_all()?;
let mut lock = self.db.inner.freelist.lock()?;
*lock = freelist.inner.clone();
Ok(())
} else {
unreachable!()
}
}
fn check(&self) -> Result<()> {
let mut unused_pages: HashSet<PageID> = (2..self.meta.num_pages).collect();
let mut page_stack = Vec::new();
page_stack.push(self.meta.root.root_page);
page_stack.push(self.meta.freelist_page);
while let Some(page_id) = page_stack.pop() {
if !unused_pages.remove(&page_id) {
return Err(Error::InvalidDB(format!(
"Page {} missing from unused_pages",
page_id,
)));
}
let page = self.pages.page(page_id);
for i in 0..page.overflow {
let page_id = page_id + i + 1;
if !unused_pages.remove(&page_id) {
return Err(Error::InvalidDB(format!(
"Overflow Page {} from missing from unused_pages",
page_id,
)));
}
}
match page.page_type {
Page::TYPE_BRANCH => {
let mut last: Option<&[u8]> = None;
for b in page.branch_elements().iter() {
page_stack.push(b.page);
if let Some(last) = last {
if last >= b.key() {
return Err(Error::InvalidDB(format!(
"Branch page {} contains unsorted elements",
page_id
)));
}
}
last = Some(b.key());
}
}
Page::TYPE_LEAF => {
let mut last: Option<&[u8]> = None;
for (i, leaf) in page.leaf_elements().iter().enumerate() {
match leaf.node_type {
Node::TYPE_BUCKET => {
let meta: BucketMeta = leaf.value().into();
page_stack.push(meta.root_page);
}
Node::TYPE_DATA => (),
_ => {
return Err(Error::InvalidDB(format!(
"Page {} index {} has an invalid leaf node type {}",
page_id, i, leaf.node_type,
)))
}
}
if let Some(last) = last {
if last >= leaf.key() {
return Err(Error::InvalidDB(format!(
"Leaf page {} contains unsorted elements",
page_id
)));
}
}
last = Some(leaf.key());
}
}
Page::TYPE_FREELIST => {
if page_id != self.meta.freelist_page {
return Err(Error::InvalidDB(format!(
"Found Invalid Freelist Page {}",
page_id
)));
}
for page_id in page.freelist() {
if !unused_pages.remove(page_id) {
return Err(Error::InvalidDB(format!(
"Page {} from freelist missing from unused_pages",
page_id,
)));
}
}
}
_ => {
return Err(Error::InvalidDB(format!(
"Invalid page type {} for page {}",
page.page_type, page_id,
)))
}
}
}
if !unused_pages.is_empty() {
return Err(Error::InvalidDB(format!(
"Unreachable pages {:?}",
unused_pages,
)));
}
Ok(())
}
}
impl<'tx> Drop for TxInner<'tx> {
fn drop(&mut self) {
if !self.lock.writable() {
let mut open_txs = self.db.inner.open_ro_txs.lock().unwrap();
let index = match open_txs.binary_search(&self.meta.tx_id) {
Ok(i) => i,
_ => return, };
open_txs.remove(index);
}
}
}
#[cfg(test)]
mod tests {
use std::mem::size_of;
use super::*;
use crate::{
db::{OpenOptions, DB},
testutil::RandomFile,
};
#[test]
fn test_ro_txs() -> Result<()> {
let random_file = RandomFile::new();
let db = DB::open(&random_file)?;
{
let tx = db.tx(true)?;
assert!(tx.create_bucket("abc").is_ok());
tx.commit()?;
}
let tx = db.tx(false)?;
assert!(tx.create_bucket("def").is_err());
let b = tx.get_bucket("abc")?;
assert_eq!(b.put("key", "value"), Err(Error::ReadOnlyTx));
assert_eq!(b.delete("key"), Err(Error::ReadOnlyTx));
assert_eq!(b.create_bucket("dev").err(), Some(Error::ReadOnlyTx));
assert_eq!(tx.commit(), Err(Error::ReadOnlyTx));
Ok(())
}
#[test]
fn test_concurrent_txs() -> Result<()> {
let random_file = RandomFile::new();
let db = OpenOptions::new()
.pagesize(1024)
.num_pages(10)
.open(&random_file)?;
{
let tx = db.tx(false)?;
assert!(!tx.writable());
let tx = tx.inner.borrow_mut();
assert_eq!(tx.pages.data.len(), 1024 * 10);
assert!(!tx.lock.writable());
{
let open_ro_txs = tx.db.inner.open_ro_txs.lock().unwrap();
assert_eq!(open_ro_txs.len(), 1);
assert_eq!(open_ro_txs[0], tx.meta.tx_id);
}
{
let tx = db.tx(true)?;
assert!(tx.writable());
{
{
let inner = tx.inner.borrow_mut();
assert_eq!(inner.meta.tx_id, 1);
let freelist = inner.freelist.borrow();
assert_eq!(freelist.inner.pages(), vec![]);
}
let b = tx.create_bucket("abc")?;
b.put("123", "456")?;
}
tx.commit()?;
}
{
let tx = db.tx(true)?;
assert!(tx.writable());
{
{
let inner = tx.inner.borrow_mut();
let freelist = inner.freelist.borrow();
assert_eq!(inner.meta.tx_id, 2);
assert_eq!(freelist.inner.pages(), vec![2, 3]);
}
let b = tx.get_bucket("abc")?;
b.put("123", "456")?;
}
tx.commit()?;
}
}
{
let tx = db.tx(true)?;
assert!(tx.writable());
let inner = tx.inner.borrow_mut();
let mut freelist = inner.freelist.borrow_mut();
assert_eq!(freelist.inner.pages(), vec![2, 3, 4, 5, 6]);
assert_eq!(freelist.meta.num_pages, 10);
let page = freelist.allocate(size_of::<Page>() as u64)?;
assert!(page.id == 2);
assert!(page.overflow == 0);
let page = freelist.allocate(size_of::<Page>() as u64)?;
assert!(page.id == 3);
assert!(page.overflow == 0);
let page = freelist.allocate(size_of::<Page>() as u64)?;
assert!(page.id == 4);
assert!(page.overflow == 0);
let page = freelist.allocate(size_of::<Page>() as u64)?;
assert!(page.id == 5);
assert!(page.overflow == 0);
let page = freelist.allocate(size_of::<Page>() as u64)?;
assert!(page.id == 6);
assert!(page.overflow == 0);
assert_eq!(freelist.meta.num_pages, 10);
let page = freelist.allocate(size_of::<Page>() as u64)?;
assert!(page.id == 10);
assert!(page.overflow == 0);
assert_eq!(freelist.meta.num_pages, 11);
assert_eq!(freelist.inner.pages(), vec![]);
}
Ok(())
}
}