use async_txn::{error::WtmError, PwmComparableRange};
use skipdb_core::rev_range::WriteTransactionRevRange;
use std::{convert::Infallible, future::Future};
use super::*;
pub struct WriteTransaction<K, V, S: AsyncSpawner> {
pub(super) db: ComparableDb<K, V, S>,
pub(super) wtm: AsyncWtm<K, V, BTreeCm<K>, BTreePwm<K, V>, S>,
}
impl<K, V, S> WriteTransaction<K, V, S>
where
K: CheapClone + Ord,
S: AsyncSpawner,
{
#[inline]
pub(super) async fn new(db: ComparableDb<K, V, S>) -> Self {
let wtm = db
.inner
.tm
.write_with_blocking_cm_and_pwm((), ())
.await
.unwrap();
Self { db, wtm }
}
}
impl<K, V, S> WriteTransaction<K, V, S>
where
K: CheapClone + Ord + Send + Sync + 'static,
V: Send + Sync + 'static,
S: AsyncSpawner,
{
#[inline]
pub async fn commit(&mut self) -> Result<(), WtmError<Infallible, Infallible, Infallible>> {
let db = self.db.clone();
self
.wtm
.commit(|ents| async move {
db.inner.map.apply(ents);
Ok(())
})
.await
}
}
impl<K, V, S> WriteTransaction<K, V, S>
where
K: CheapClone + Ord + Send + Sync + 'static,
V: Send + Sync + 'static,
S: AsyncSpawner,
{
#[inline]
pub async fn commit_with_task<Fut, E, R>(
&mut self,
callback: impl FnOnce(Result<(), E>) -> Fut + Send + 'static,
) -> Result<S::JoinHandle<R>, WtmError<Infallible, Infallible, E>>
where
E: std::error::Error + Send,
Fut: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let db = self.db.clone();
self
.wtm
.commit_with_task(
move |ents| async move {
db.inner.map.apply(ents);
Ok(())
},
callback,
)
.await
}
}
impl<K, V, S> WriteTransaction<K, V, S>
where
K: CheapClone + Ord,
S: AsyncSpawner,
{
#[inline]
pub fn version(&self) -> u64 {
self.wtm.version()
}
#[inline]
pub fn rollback(&mut self) -> Result<(), TransactionError<Infallible, Infallible>> {
self.wtm.rollback_blocking()
}
#[inline]
pub fn contains_key(
&mut self,
key: &K,
) -> Result<bool, TransactionError<Infallible, Infallible>> {
let version = self.wtm.version();
match self.wtm.contains_key_blocking(key)? {
Some(true) => Ok(true),
Some(false) => Ok(false),
None => Ok(self.db.inner.map.contains_key(key, version)),
}
}
#[inline]
pub fn get<'a, 'b: 'a>(
&'a mut self,
key: &'b K,
) -> Result<Option<Ref<'a, K, V>>, TransactionError<Infallible, Infallible>> {
let version = self.wtm.version();
match self.wtm.get_blocking(key)? {
Some(v) => {
if v.value().is_some() {
Ok(Some(v.into()))
} else {
Ok(None)
}
}
None => Ok(self.db.inner.map.get(key, version).map(Into::into)),
}
}
#[inline]
pub fn insert(
&mut self,
key: K,
value: V,
) -> Result<(), TransactionError<Infallible, Infallible>> {
self.wtm.insert_blocking(key, value)
}
#[inline]
pub fn remove(&mut self, key: K) -> Result<(), TransactionError<Infallible, Infallible>> {
self.wtm.remove_blocking(key)
}
#[inline]
pub fn iter(
&mut self,
) -> Result<WriteTransactionIter<'_, K, V, BTreeCm<K>>, TransactionError<Infallible, Infallible>>
{
let version = self.wtm.version();
let (marker, pm) = self
.wtm
.blocking_marker_with_pm()
.ok_or(TransactionError::Discard)?;
let committed = self.db.inner.map.iter(version);
let pendings = pm.iter();
Ok(WriteTransactionIter::new(pendings, committed, Some(marker)))
}
#[inline]
pub fn iter_rev(
&mut self,
) -> Result<WriteTransactionRevIter<'_, K, V, BTreeCm<K>>, TransactionError<Infallible, Infallible>>
{
let version = self.wtm.version();
let (marker, pm) = self
.wtm
.blocking_marker_with_pm()
.ok_or(TransactionError::Discard)?;
let committed = self.db.inner.map.iter_rev(version);
let pendings = pm.iter().rev();
Ok(WriteTransactionRevIter::new(
pendings,
committed,
Some(marker),
))
}
#[inline]
pub fn range<'a, Q, R>(
&'a mut self,
range: R,
) -> Result<
WriteTransactionRange<'a, Q, R, K, V, BTreeCm<K>>,
TransactionError<Infallible, Infallible>,
>
where
K: Borrow<Q>,
R: RangeBounds<Q> + 'a,
Q: Ord + ?Sized,
{
let version = self.wtm.version();
let (marker, pm) = self
.wtm
.blocking_marker_with_pm()
.ok_or(TransactionError::Discard)?;
let start = range.start_bound();
let end = range.end_bound();
let pendings = pm.range_comparable((start, end));
let committed = self.db.inner.map.range(range, version);
Ok(WriteTransactionRange::new(
pendings,
committed,
Some(marker),
))
}
#[inline]
pub fn range_rev<'a, Q, R>(
&'a mut self,
range: R,
) -> Result<
WriteTransactionRevRange<'a, Q, R, K, V, BTreeCm<K>>,
TransactionError<Infallible, Infallible>,
>
where
K: Borrow<Q>,
R: RangeBounds<Q> + 'a,
Q: Ord + ?Sized,
{
let version = self.wtm.version();
let (marker, pm) = self
.wtm
.blocking_marker_with_pm()
.ok_or(TransactionError::Discard)?;
let start = range.start_bound();
let end = range.end_bound();
let pendings = pm.range_comparable((start, end)).rev();
let committed = self.db.inner.map.range_rev(range, version);
Ok(WriteTransactionRevRange::new(
pendings,
committed,
Some(marker),
))
}
}