use std::convert::Infallible;
use skipdb_core::rev_range::WriteTransactionRevRange;
use txn::{error::WtmError, HashCmOptions, PwmComparableRange};
use super::*;
pub struct OptimisticTransaction<K, V, S = RandomState> {
db: OptimisticDb<K, V, S>,
pub(super) wtm: Wtm<K, V, HashCm<K, S>, BTreePwm<K, V>>,
}
impl<K, V, S> OptimisticTransaction<K, V, S>
where
K: Ord + Hash + Eq,
S: BuildHasher + Clone,
{
#[inline]
pub(super) fn new(db: OptimisticDb<K, V, S>, cap: Option<usize>) -> Self {
let wtm = db
.inner
.tm
.write(
(),
HashCmOptions::with_capacity(db.inner.hasher.clone(), cap.unwrap_or(8)),
)
.unwrap();
Self { db, wtm }
}
}
impl<K, V, S> OptimisticTransaction<K, V, S>
where
K: Ord + Hash + Eq,
V: Send + 'static,
S: BuildHasher,
{
#[inline]
pub fn commit(&mut self) -> Result<(), WtmError<Infallible, Infallible, Infallible>> {
self.wtm.commit(|ents| {
self.db.inner.map.apply(ents);
Ok(())
})
}
}
impl<K, V, S> OptimisticTransaction<K, V, S>
where
K: Ord + Hash + Eq + Send + Sync + 'static,
V: Send + Sync + 'static,
S: BuildHasher + Send + Sync + 'static,
{
#[inline]
pub fn commit_with_callback<E, R>(
&mut self,
callback: impl FnOnce(Result<(), E>) -> R + Send + 'static,
) -> Result<std::thread::JoinHandle<R>, WtmError<Infallible, Infallible, E>>
where
E: std::error::Error,
R: Send + 'static,
{
let db = self.db.clone();
self.wtm.commit_with_callback(
move |ents| {
db.inner.map.apply(ents);
Ok(())
},
callback,
)
}
}
impl<K, V, S> OptimisticTransaction<K, V, S>
where
K: Ord + Hash + Eq,
V: 'static,
S: BuildHasher,
{
#[inline]
pub fn version(&self) -> u64 {
self.wtm.version()
}
#[inline]
pub fn rollback(&mut self) -> Result<(), TransactionError<Infallible, Infallible>> {
self.wtm.rollback()
}
#[inline]
pub fn contains_key<Q>(
&mut self,
key: &Q,
) -> Result<bool, TransactionError<Infallible, Infallible>>
where
K: Borrow<Q>,
Q: Hash + Eq + Ord + ?Sized,
{
let version = self.wtm.version();
match self.wtm.contains_key_equivalent_cm_comparable_pm(key)? {
Some(true) => Ok(true),
Some(false) => Ok(false),
None => Ok(self.db.inner.map.contains_key(key, version)),
}
}
#[inline]
pub fn get<'a, 'b: 'a, Q>(
&'a mut self,
key: &'b Q,
) -> Result<Option<Ref<'a, K, V>>, TransactionError<Infallible, Infallible>>
where
K: Borrow<Q>,
Q: Hash + Eq + Ord + ?Sized,
{
let version = self.wtm.version();
match self.wtm.get_equivalent_cm_comparable_pm(key)? {
Some(v) => {
if v.value().is_some() {
Ok(Some(v.into()))
} else {
Ok(None)
}
}
None => Ok(self.db.inner.map.get(key, version).map(Into::into)),
}
}
#[inline]
pub fn insert(
&mut self,
key: K,
value: V,
) -> Result<(), TransactionError<Infallible, Infallible>> {
self.wtm.insert(key, value)
}
#[inline]
pub fn remove(&mut self, key: K) -> Result<(), TransactionError<Infallible, Infallible>> {
self.wtm.remove(key)
}
#[inline]
pub fn iter(
&mut self,
) -> Result<TransactionIter<'_, K, V, HashCm<K, S>>, TransactionError<Infallible, Infallible>> {
let version = self.wtm.version();
let (marker, pm) = self.wtm.marker_with_pm().ok_or(TransactionError::Discard)?;
let committed = self.db.inner.map.iter(version);
let pendings = pm.iter();
Ok(TransactionIter::new(pendings, committed, Some(marker)))
}
#[inline]
pub fn iter_rev(
&mut self,
) -> Result<
WriteTransactionRevIter<'_, K, V, HashCm<K, S>>,
TransactionError<Infallible, Infallible>,
> {
let version = self.wtm.version();
let (marker, pm) = self.wtm.marker_with_pm().ok_or(TransactionError::Discard)?;
let committed = self.db.inner.map.iter_rev(version);
let pendings = pm.iter().rev();
Ok(WriteTransactionRevIter::new(
pendings,
committed,
Some(marker),
))
}
#[inline]
pub fn range<'a, Q, R>(
&'a mut self,
range: R,
) -> Result<
TransactionRange<'a, Q, R, K, V, HashCm<K, S>>,
TransactionError<Infallible, Infallible>,
>
where
K: Borrow<Q>,
R: RangeBounds<Q> + 'a,
Q: Ord + ?Sized,
{
let version = self.wtm.version();
let (marker, pm) = self.wtm.marker_with_pm().ok_or(TransactionError::Discard)?;
let start = range.start_bound();
let end = range.end_bound();
let pendings = pm.range_comparable((start, end));
let committed = self.db.inner.map.range(range, version);
Ok(TransactionRange::new(pendings, committed, Some(marker)))
}
#[inline]
pub fn range_rev<'a, Q, R>(
&'a mut self,
range: R,
) -> Result<
WriteTransactionRevRange<'a, Q, R, K, V, HashCm<K, S>>,
TransactionError<Infallible, Infallible>,
>
where
K: Borrow<Q>,
R: RangeBounds<Q> + 'a,
Q: Ord + ?Sized,
{
let version = self.wtm.version();
let (marker, pm) = self.wtm.marker_with_pm().ok_or(TransactionError::Discard)?;
let start = range.start_bound();
let end = range.end_bound();
let pendings = pm.range_comparable((start, end));
let committed = self.db.inner.map.range_rev(range, version);
Ok(WriteTransactionRevRange::new(
pendings.rev(),
committed,
Some(marker),
))
}
}