#![allow(clippy::module_name_repetitions)]
use std::{cell::RefCell, fmt, rc::Rc};
#[cfg(not(feature = "testing"))]
use std::collections::HashMap as Map;
#[cfg(feature = "testing")]
use std::collections::BTreeMap as Map;
use crate::{
concurrency_control, pin, Batch, Error, Guard, IVec, Protector, Result,
Tree,
};
#[derive(Clone)]
pub struct TransactionalTree {
pub(super) tree: Tree,
pub(super) writes: Rc<RefCell<Map<IVec, Option<IVec>>>>,
pub(super) read_cache: Rc<RefCell<Map<IVec, Option<IVec>>>>,
pub(super) flush_on_commit: Rc<RefCell<bool>>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum UnabortableTransactionError {
Conflict,
Storage(Error),
}
impl fmt::Display for UnabortableTransactionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use UnabortableTransactionError::*;
match self {
Conflict => write!(f, "Conflict during transaction"),
Storage(e) => e.fmt(f),
}
}
}
impl std::error::Error for UnabortableTransactionError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
UnabortableTransactionError::Storage(ref e) => Some(e),
_ => None,
}
}
}
pub(crate) type UnabortableTransactionResult<T> =
std::result::Result<T, UnabortableTransactionError>;
impl From<Error> for UnabortableTransactionError {
fn from(error: Error) -> Self {
UnabortableTransactionError::Storage(error)
}
}
impl<E> From<UnabortableTransactionError> for ConflictableTransactionError<E> {
fn from(error: UnabortableTransactionError) -> Self {
match error {
UnabortableTransactionError::Conflict => {
ConflictableTransactionError::Conflict
}
UnabortableTransactionError::Storage(error) => {
ConflictableTransactionError::Storage(error)
}
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum ConflictableTransactionError<T = Error> {
Abort(T),
#[doc(hidden)]
Conflict,
Storage(Error),
}
impl<E: fmt::Display> fmt::Display for ConflictableTransactionError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use ConflictableTransactionError::*;
match self {
Abort(e) => e.fmt(f),
Conflict => write!(f, "Conflict during transaction"),
Storage(e) => e.fmt(f),
}
}
}
impl<E: std::error::Error> std::error::Error
for ConflictableTransactionError<E>
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
ConflictableTransactionError::Storage(ref e) => Some(e),
_ => None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum TransactionError<T = Error> {
Abort(T),
Storage(Error),
}
impl<E: fmt::Display> fmt::Display for TransactionError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use TransactionError::*;
match self {
Abort(e) => e.fmt(f),
Storage(e) => e.fmt(f),
}
}
}
impl<E: std::error::Error> std::error::Error for TransactionError<E> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
TransactionError::Storage(ref e) => Some(e),
_ => None,
}
}
}
pub type ConflictableTransactionResult<T, E = ()> =
std::result::Result<T, ConflictableTransactionError<E>>;
impl<T> From<Error> for ConflictableTransactionError<T> {
fn from(error: Error) -> Self {
ConflictableTransactionError::Storage(error)
}
}
pub type TransactionResult<T, E = ()> =
std::result::Result<T, TransactionError<E>>;
impl<T> From<Error> for TransactionError<T> {
fn from(error: Error) -> Self {
TransactionError::Storage(error)
}
}
impl TransactionalTree {
pub fn insert<K, V>(
&self,
key: K,
value: V,
) -> UnabortableTransactionResult<Option<IVec>>
where
K: AsRef<[u8]> + Into<IVec>,
V: Into<IVec>,
{
let old = self.get(key.as_ref())?;
let mut writes = self.writes.borrow_mut();
let _last_write =
writes.insert(key.into(), Some(value.into()));
Ok(old)
}
pub fn remove<K>(
&self,
key: K,
) -> UnabortableTransactionResult<Option<IVec>>
where
K: AsRef<[u8]> + Into<IVec>,
{
let old = self.get(key.as_ref());
let mut writes = self.writes.borrow_mut();
let _last_write = writes.insert(key.into(), None);
old
}
pub fn get<K: AsRef<[u8]>>(
&self,
key: K,
) -> UnabortableTransactionResult<Option<IVec>> {
let writes = self.writes.borrow();
if let Some(first_try) = writes.get(key.as_ref()) {
return Ok(first_try.clone());
}
let mut reads = self.read_cache.borrow_mut();
if let Some(second_try) = reads.get(key.as_ref()) {
return Ok(second_try.clone());
}
let mut guard = pin();
let get = loop {
if let Ok(get) = self.tree.get_inner(key.as_ref(), &mut guard)? {
break get;
}
};
let last = reads.insert(key.as_ref().into(), get.clone());
assert!(last.is_none());
Ok(get)
}
pub fn apply_batch(
&self,
batch: &Batch,
) -> UnabortableTransactionResult<()> {
for (k, v_opt) in &batch.writes {
if let Some(v) = v_opt {
let _old = self.insert(k, v)?;
} else {
let _old = self.remove(k)?;
}
}
Ok(())
}
pub fn flush(&self) {
*self.flush_on_commit.borrow_mut() = true;
}
pub fn generate_id(&self) -> Result<u64> {
self.tree.context.pagecache.generate_id_inner()
}
fn unstage(&self) {
unimplemented!()
}
const fn validate(&self) -> bool {
true
}
fn commit(&self) -> Result<()> {
let writes = self.writes.borrow();
let mut guard = pin();
for (k, v_opt) in &*writes {
while self.tree.insert_inner(k, v_opt.clone(), &mut guard)?.is_err()
{
}
}
Ok(())
}
fn from_tree(tree: &Tree) -> Self {
Self {
tree: tree.clone(),
writes: Default::default(),
read_cache: Default::default(),
flush_on_commit: Default::default(),
}
}
}
pub struct TransactionalTrees {
inner: Vec<TransactionalTree>,
}
impl TransactionalTrees {
fn stage(&self) -> UnabortableTransactionResult<Protector<'_>> {
Ok(concurrency_control::write())
}
fn unstage(&self) {
for tree in &self.inner {
tree.unstage();
}
}
fn validate(&self) -> bool {
for tree in &self.inner {
if !tree.validate() {
return false;
}
}
true
}
fn commit(&self, guard: &Guard) -> Result<()> {
let peg = self.inner[0].tree.context.pin_log(guard)?;
for tree in &self.inner {
tree.commit()?;
}
peg.seal_batch()
}
fn flush_if_configured(&self) -> Result<()> {
let mut should_flush = None;
for tree in &self.inner {
if *tree.flush_on_commit.borrow() {
should_flush = Some(tree);
break;
}
}
if let Some(tree) = should_flush {
tree.tree.flush()?;
}
Ok(())
}
}
pub fn abort<A, T>(t: T) -> ConflictableTransactionResult<A, T> {
Err(ConflictableTransactionError::Abort(t))
}
pub trait Transactional<E = ()> {
type View;
fn make_overlay(&self) -> Result<TransactionalTrees>;
fn view_overlay(overlay: &TransactionalTrees) -> Self::View;
fn transaction<F, A>(&self, f: F) -> TransactionResult<A, E>
where
F: Fn(&Self::View) -> ConflictableTransactionResult<A, E>,
{
loop {
let tt = self.make_overlay()?;
let view = Self::view_overlay(&tt);
let locks = if let Ok(l) = tt.stage() {
l
} else {
tt.unstage();
continue;
};
let ret = f(&view);
if !tt.validate() {
tt.unstage();
continue;
}
match ret {
Ok(r) => {
let guard = pin();
tt.commit(&guard)?;
drop(locks);
tt.flush_if_configured()?;
return Ok(r);
}
Err(ConflictableTransactionError::Abort(e)) => {
return Err(TransactionError::Abort(e));
}
Err(ConflictableTransactionError::Conflict) => continue,
Err(ConflictableTransactionError::Storage(other)) => {
return Err(TransactionError::Storage(other));
}
}
}
}
}
impl<E> Transactional<E> for &Tree {
type View = TransactionalTree;
fn make_overlay(&self) -> Result<TransactionalTrees> {
Ok(TransactionalTrees {
inner: vec![TransactionalTree::from_tree(self)],
})
}
fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
overlay.inner[0].clone()
}
}
impl<E> Transactional<E> for &&Tree {
type View = TransactionalTree;
fn make_overlay(&self) -> Result<TransactionalTrees> {
Ok(TransactionalTrees {
inner: vec![TransactionalTree::from_tree(*self)],
})
}
fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
overlay.inner[0].clone()
}
}
impl<E> Transactional<E> for Tree {
type View = TransactionalTree;
fn make_overlay(&self) -> Result<TransactionalTrees> {
Ok(TransactionalTrees {
inner: vec![TransactionalTree::from_tree(self)],
})
}
fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
overlay.inner[0].clone()
}
}
impl<E> Transactional<E> for [Tree] {
type View = Vec<TransactionalTree>;
fn make_overlay(&self) -> Result<TransactionalTrees> {
let same_db = self.windows(2).all(|w| {
let path_1 = w[0].context.get_path();
let path_2 = w[1].context.get_path();
path_1 == path_2
});
if !same_db {
return Err(Error::Unsupported(
"cannot use trees from multiple databases in the same transaction".into(),
));
}
Ok(TransactionalTrees {
inner: self
.iter()
.map(|t| TransactionalTree::from_tree(t))
.collect(),
})
}
fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
overlay.inner.clone()
}
}
impl<E> Transactional<E> for [&Tree] {
type View = Vec<TransactionalTree>;
fn make_overlay(&self) -> Result<TransactionalTrees> {
let same_db = self.windows(2).all(|w| {
let path_1 = w[0].context.get_path();
let path_2 = w[1].context.get_path();
path_1 == path_2
});
if !same_db {
return Err(Error::Unsupported(
"cannot use trees from multiple databases in the same transaction".into(),
));
}
Ok(TransactionalTrees {
inner: self
.iter()
.map(|&t| TransactionalTree::from_tree(t))
.collect(),
})
}
fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
overlay.inner.clone()
}
}
macro_rules! repeat_type {
($t:ty, ($literal:literal)) => {
($t,)
};
($t:ty, ($($literals:literal),+)) => {
repeat_type!(IMPL $t, (), ($($literals),*))
};
(IMPL $t:ty, (), ($first:literal, $($rest:literal),*)) => {
repeat_type!(IMPL $t, ($t), ($($rest),*))
};
(IMPL $t:ty, ($($partial:tt),*), ($first:literal, $($rest:literal),*)) => {
repeat_type!(IMPL $t, ($t, $($partial),*), ($($rest),*))
};
(IMPL $t:ty, ($($partial:tt),*), ($last:literal)) => {
($($partial),*, $t)
};
}
macro_rules! impl_transactional_tuple_trees {
($($indices:tt),+) => {
impl<E> Transactional<E> for repeat_type!(&Tree, ($($indices),+)) {
type View = repeat_type!(TransactionalTree, ($($indices),+));
fn make_overlay(&self) -> Result<TransactionalTrees> {
let mut paths = vec![];
$(
paths.push(self.$indices.context.get_path());
)+
if !paths.windows(2).all(|w| {
w[0] == w[1]
}) {
return Err(Error::Unsupported(
"cannot use trees from multiple databases in the same transaction".into(),
));
}
Ok(TransactionalTrees {
inner: vec![
$(
TransactionalTree::from_tree(self.$indices)
),+
],
})
}
fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
(
$(
overlay.inner[$indices].clone()
),+,
)
}
}
};
}
impl_transactional_tuple_trees!(0);
impl_transactional_tuple_trees!(0, 1);
impl_transactional_tuple_trees!(0, 1, 2);
impl_transactional_tuple_trees!(0, 1, 2, 3);
impl_transactional_tuple_trees!(0, 1, 2, 3, 4);
impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5);
impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6);
impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7);
impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8);
impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13);