use std::{convert::Infallible, future::Future};
use async_txn::{error::WtmError, PwmComparableRange};
use skipdb_core::rev_range::WriteTransactionRevRange;
use super::*;
pub struct OptimisticTransaction<K, V, SP: AsyncSpawner, S = RandomState> {
db: OptimisticDb<K, V, SP, S>,
pub(super) wtm: AsyncWtm<K, V, HashCm<K, S>, BTreePwm<K, V>, SP>,
}
impl<K, V, SP, S> OptimisticTransaction<K, V, SP, S>
where
K: Ord + Hash + Eq,
S: BuildHasher + Clone,
SP: AsyncSpawner,
{
#[inline]
pub(super) async fn new(db: OptimisticDb<K, V, SP, S>, cap: Option<usize>) -> Self {
let wtm = db
.inner
.tm
.write_with_blocking_cm_and_pwm(
(),
HashCmOptions::with_capacity(db.inner.hasher.clone(), cap.unwrap_or(8)),
)
.await
.unwrap();
Self { db, wtm }
}
}
impl<K, V, SP, S> OptimisticTransaction<K, V, SP, S>
where
K: Ord + Hash + Eq + Send + Sync + 'static,
V: Send + Sync + 'static,
S: BuildHasher + Send + Sync + 'static,
SP: AsyncSpawner,
{
#[inline]
pub async fn commit(
&mut self,
) -> Result<(), WtmError<Infallible, Infallible, core::convert::Infallible>> {
let db = self.db.clone();
self
.wtm
.commit(|ents| async move {
db.inner.map.apply(ents);
Ok(())
})
.await
}
}
impl<K, V, SP, S> OptimisticTransaction<K, V, SP, S>
where
K: Ord + Hash + Eq + Send + Sync + 'static,
V: Send + Sync + 'static,
S: BuildHasher + Send + Sync + 'static,
SP: AsyncSpawner,
{
#[inline]
pub async fn commit_with_task<Fut, E, R>(
&mut self,
callback: impl FnOnce(Result<(), E>) -> Fut + Send + 'static,
) -> Result<SP::JoinHandle<R>, WtmError<Infallible, Infallible, E>>
where
Fut: Future<Output = R> + Send + 'static,
E: std::error::Error + Send,
R: Send + 'static,
{
let db = self.db.clone();
self
.wtm
.commit_with_task(
move |ents| async move {
db.inner.map.apply(ents);
Ok(())
},
callback,
)
.await
}
}
impl<K, V, SP, S> OptimisticTransaction<K, V, SP, S>
where
K: Ord + Hash + Eq,
V: 'static,
S: BuildHasher,
SP: AsyncSpawner,
{
#[inline]
pub fn version(&self) -> u64 {
self.wtm.version()
}
#[inline]
pub fn rollback(&mut self) -> Result<(), TransactionError<Infallible, Infallible>> {
self.wtm.rollback_blocking()
}
#[inline]
pub fn contains_key<Q>(
&mut self,
key: &Q,
) -> Result<bool, TransactionError<Infallible, Infallible>>
where
K: Borrow<Q>,
Q: Hash + Eq + Ord + ?Sized,
{
let version = self.wtm.version();
match self
.wtm
.contains_key_equivalent_cm_comparable_pm_blocking(key)?
{
Some(true) => Ok(true),
Some(false) => Ok(false),
None => Ok(self.db.inner.map.contains_key(key, version)),
}
}
#[inline]
pub fn get<'a, 'b: 'a, Q>(
&'a mut self,
key: &'b Q,
) -> Result<Option<Ref<'a, K, V>>, TransactionError<Infallible, Infallible>>
where
K: Borrow<Q>,
Q: Hash + Eq + Ord + ?Sized,
{
let version = self.wtm.version();
match self.wtm.get_equivalent_cm_comparable_pm_blocking(key)? {
Some(v) => {
if v.value().is_some() {
Ok(Some(v.into()))
} else {
Ok(None)
}
}
None => Ok(self.db.inner.map.get(key, version).map(Into::into)),
}
}
#[inline]
pub fn insert(
&mut self,
key: K,
value: V,
) -> Result<(), TransactionError<Infallible, Infallible>> {
self.wtm.insert_blocking(key, value)
}
#[inline]
pub fn remove(&mut self, key: K) -> Result<(), TransactionError<Infallible, Infallible>> {
self.wtm.remove_blocking(key)
}
#[inline]
pub fn iter(
&mut self,
) -> Result<TransactionIter<'_, K, V, HashCm<K, S>>, TransactionError<Infallible, Infallible>> {
let version = self.wtm.version();
let (marker, pm) = self
.wtm
.blocking_marker_with_pm()
.ok_or(TransactionError::Discard)?;
let committed = self.db.inner.map.iter(version);
let pendings = pm.iter();
Ok(TransactionIter::new(pendings, committed, Some(marker)))
}
#[inline]
pub fn iter_rev(
&mut self,
) -> Result<
WriteTransactionRevIter<'_, K, V, HashCm<K, S>>,
TransactionError<Infallible, Infallible>,
> {
let version = self.wtm.version();
let (marker, pm) = self
.wtm
.blocking_marker_with_pm()
.ok_or(TransactionError::Discard)?;
let committed = self.db.inner.map.iter_rev(version);
let pendings = pm.iter().rev();
Ok(WriteTransactionRevIter::new(
pendings,
committed,
Some(marker),
))
}
#[inline]
pub fn range<'a, Q, R>(
&'a mut self,
range: R,
) -> Result<
TransactionRange<'a, Q, R, K, V, HashCm<K, S>>,
TransactionError<Infallible, Infallible>,
>
where
K: Borrow<Q>,
R: RangeBounds<Q> + 'a,
Q: Ord + ?Sized,
{
let version = self.wtm.version();
let (marker, pm) = self
.wtm
.blocking_marker_with_pm()
.ok_or(TransactionError::Discard)?;
let start = range.start_bound();
let end = range.end_bound();
let pendings = pm.range_comparable((start, end));
let committed = self.db.inner.map.range(range, version);
Ok(TransactionRange::new(pendings, committed, Some(marker)))
}
#[inline]
pub fn range_rev<'a, Q, R>(
&'a mut self,
range: R,
) -> Result<
WriteTransactionRevRange<'a, Q, R, K, V, HashCm<K, S>>,
TransactionError<Infallible, Infallible>,
>
where
K: Borrow<Q>,
R: RangeBounds<Q> + 'a,
Q: Ord + ?Sized,
{
let version = self.wtm.version();
let (marker, pm) = self
.wtm
.blocking_marker_with_pm()
.ok_or(TransactionError::Discard)?;
let start = range.start_bound();
let end = range.end_bound();
let pendings = pm.range_comparable((start, end));
let committed = self.db.inner.map.range_rev(range, version);
Ok(WriteTransactionRevRange::new(
pendings.rev(),
committed,
Some(marker),
))
}
}