#![allow(unused)]
#![allow(missing_docs)]
use std::{cell::RefCell, collections::HashMap, sync::Arc};
use super::*;
pub struct TransactionalTree<'a> {
pub(super) tree: &'a Tree,
pub(super) writes: RefCell<HashMap<IVec, Option<IVec>>>,
pub(super) read_cache: RefCell<HashMap<IVec, Option<IVec>>>,
pub(super) locks: RefCell<Vec<parking_lot::RwLockWriteGuard<'a, ()>>>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum TransactionError {
Conflict,
Abort,
Storage(Error),
}
pub type TransactionResult<T> = std::result::Result<T, TransactionError>;
fn abort() -> TransactionError {
TransactionError::Abort
}
impl From<Error> for TransactionError {
fn from(error: Error) -> Self {
TransactionError::Storage(error)
}
}
impl<'a> TransactionalTree<'a> {
pub fn insert<K, V>(
&self,
key: K,
value: V,
) -> TransactionResult<Option<IVec>>
where
IVec: From<K>,
IVec: From<V>,
K: AsRef<[u8]>,
{
let old = self.get(key.as_ref())?;
let mut writes = self.writes.borrow_mut();
writes.insert(IVec::from(key), Some(IVec::from(value)));
Ok(old)
}
pub fn remove<K>(&self, key: K) -> TransactionResult<Option<IVec>>
where
IVec: From<K>,
K: AsRef<[u8]>,
{
let old = self.get(key.as_ref());
let mut writes = self.writes.borrow_mut();
writes.insert(IVec::from(key), None);
old
}
pub fn get<K: AsRef<[u8]>>(
&self,
key: K,
) -> TransactionResult<Option<IVec>> {
let writes = self.writes.borrow();
if let Some(first_try) = writes.get(key.as_ref()) {
return Ok(first_try.clone());
}
let mut reads = self.read_cache.borrow_mut();
if let Some(second_try) = reads.get(key.as_ref()) {
return Ok(second_try.clone());
}
let get = self.tree.get_inner(key.as_ref())?;
reads.insert(key.as_ref().into(), get.clone());
Ok(get)
}
pub fn apply_batch(&self, batch: Batch) -> TransactionResult<()> {
for (k, v_opt) in batch.writes {
if let Some(v) = v_opt {
self.insert(k, v)?;
} else {
self.remove(k)?;
}
}
Ok(())
}
fn stage(&self) -> bool {
let mut locks = self.locks.borrow_mut();
let guard = self.tree.concurrency_control.write();
locks.push(guard);
true
}
fn unstage(&self) {
unimplemented!()
}
fn validate(&self) -> bool {
true
}
fn commit(&self) -> Result<()> {
let mut writes = self.writes.borrow_mut();
for (k, v_opt) in &*writes {
if let Some(v) = v_opt {
self.tree.insert_inner(k, v)?;
} else {
self.tree.remove_inner(k)?;
}
}
Ok(())
}
}
pub struct TransactionalTrees<'a> {
inner: Vec<TransactionalTree<'a>>,
}
impl<'a> TransactionalTrees<'a> {
fn stage(&self) -> bool {
let mut tree_idxs: Vec<(&[u8], usize)> = self
.inner
.iter()
.enumerate()
.map(|(idx, t)| (&*t.tree.tree_id, idx))
.collect();
tree_idxs.sort_unstable();
let mut last_idx = usize::max_value();
for (_, idx) in tree_idxs.into_iter() {
if idx == last_idx {
continue;
}
last_idx = idx;
if !self.inner[idx].stage() {
return false;
}
}
true
}
fn unstage(&self) {
for tree in &self.inner {
tree.unstage();
}
}
fn validate(&self) -> bool {
for tree in &self.inner {
if !tree.validate() {
return false;
}
}
true
}
fn commit(self) -> Result<()> {
let peg = self.inner[0].tree.context.pin_log()?;
for tree in &self.inner {
tree.commit()?;
}
peg.seal_batch()
}
}
pub trait Transactional {
type View;
fn make_overlay(&self) -> TransactionalTrees<'_>;
fn view_overlay(overlay: &TransactionalTrees<'_>) -> Self::View;
fn transaction<F, R>(&self, f: F) -> TransactionResult<R>
where
F: Fn(Self::View) -> TransactionResult<R>,
{
loop {
let tt = self.make_overlay();
let view = Self::view_overlay(&tt);
if !tt.stage() {
tt.unstage();
continue;
}
let ret = f(view);
if !tt.validate() {
tt.unstage();
continue;
}
match ret {
Ok(r) => {
tt.commit()?;
return Ok(r);
}
Err(TransactionError::Abort) => {
return Err(TransactionError::Abort);
}
Err(TransactionError::Conflict) => continue,
Err(TransactionError::Storage(e)) => {
return Err(TransactionError::Storage(e));
}
}
}
}
}
impl<'a> Transactional for &'a Tree {
type View = &'static TransactionalTree<'static>;
fn make_overlay(&self) -> TransactionalTrees<'_> {
TransactionalTrees {
inner: vec![TransactionalTree {
tree: &self,
writes: Default::default(),
read_cache: Default::default(),
locks: Default::default(),
}],
}
}
fn view_overlay(overlay: &TransactionalTrees<'_>) -> Self::View {
unsafe {
let unsafe_ptr: &'static TransactionalTree<'static> =
std::mem::transmute(&overlay.inner[0]);
&*unsafe_ptr
}
}
}
impl<A, B> Transactional for (A, B)
where
A: AsRef<Tree>,
B: AsRef<Tree>,
{
type View = (
&'static TransactionalTree<'static>,
&'static TransactionalTree<'static>,
);
fn make_overlay(&self) -> TransactionalTrees<'_> {
TransactionalTrees {
inner: vec![
TransactionalTree {
tree: self.0.as_ref(),
writes: Default::default(),
read_cache: Default::default(),
locks: Default::default(),
},
TransactionalTree {
tree: self.1.as_ref(),
writes: Default::default(),
read_cache: Default::default(),
locks: Default::default(),
},
],
}
}
fn view_overlay(overlay: &TransactionalTrees<'_>) -> Self::View {
let t1 = unsafe {
let unsafe_ptr: &'static TransactionalTree<'static> =
std::mem::transmute(&overlay.inner[0]);
&*unsafe_ptr
};
let t2 = unsafe {
let unsafe_ptr: &'static TransactionalTree<'static> =
std::mem::transmute(&overlay.inner[1]);
&*unsafe_ptr
};
(t1, t2)
}
}