use super::*;
impl<K, V, C, P, S> AsyncTm<K, V, C, P, S>
where
C: Cm<Key = K>,
P: Pwm<Key = K, Value = V>,
S: AsyncSpawner,
{
pub async fn write_with_blocking_cm_and_pwm(
&self,
pending_manager_opts: P::Options,
conflict_manager_opts: C::Options,
) -> Result<AsyncWtm<K, V, C, P, S>, TransactionError<C::Error, P::Error>> {
let read_ts = self.inner.read_ts().await;
Ok(AsyncWtm {
orc: self.inner.clone(),
read_ts,
size: 0,
count: 0,
conflict_manager: Some(C::new(conflict_manager_opts).map_err(TransactionError::conflict)?),
pending_writes: Some(P::new(pending_manager_opts).map_err(TransactionError::pending)?),
duplicate_writes: OneOrMore::new(),
discarded: false,
done_read: false,
})
}
}
impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
where
C: CmComparable<Key = K>,
S: AsyncSpawner,
{
pub fn mark_read_comparable_blocking<Q>(&mut self, k: &Q)
where
K: Borrow<Q>,
Q: ?Sized + Ord,
{
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_read_comparable(k);
}
}
pub fn mark_conflict_comparable_blocking<Q>(&mut self, k: &Q)
where
K: Borrow<Q>,
Q: ?Sized + Ord,
{
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_conflict_comparable(k);
}
}
}
impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
where
C: CmEquivalent<Key = K>,
S: AsyncSpawner,
{
pub fn mark_read_equivalent_blocking<Q>(&mut self, k: &Q)
where
K: Borrow<Q>,
Q: ?Sized + Hash + Eq,
{
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_read_equivalent(k);
}
}
pub fn mark_conflict_equivalent_blocking<Q>(&mut self, k: &Q)
where
K: Borrow<Q>,
Q: ?Sized + Hash + Eq,
{
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_conflict_equivalent(k);
}
}
}
impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
where
C: Cm<Key = K>,
S: AsyncSpawner,
{
pub fn blocking_marker(&mut self) -> Option<Marker<'_, C>> {
self.conflict_manager.as_mut().map(Marker::new)
}
pub fn blocking_marker_with_pm(&mut self) -> Option<(Marker<'_, C>, &P)> {
self
.conflict_manager
.as_mut()
.map(|marker| (Marker::new(marker), self.pending_writes.as_ref().unwrap()))
}
pub fn mark_read_blocking(&mut self, k: &K) {
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_read(k);
}
}
pub fn mark_conflict_blocking(&mut self, k: &K) {
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_conflict(k);
}
}
}
impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
where
C: Cm<Key = K>,
P: Pwm<Key = K, Value = V>,
S: AsyncSpawner,
{
#[inline]
pub fn rollback_blocking(&mut self) -> Result<(), TransactionError<C::Error, P::Error>> {
if self.discarded {
return Err(TransactionError::Discard);
}
self
.pending_writes
.as_mut()
.unwrap()
.rollback()
.map_err(TransactionError::Pwm)?;
self
.conflict_manager
.as_mut()
.unwrap()
.rollback()
.map_err(TransactionError::Cm)?;
Ok(())
}
pub fn insert_blocking(
&mut self,
key: K,
value: V,
) -> Result<(), TransactionError<C::Error, P::Error>> {
self.insert_with_blocking_in(key, value)
}
pub fn remove_blocking(&mut self, key: K) -> Result<(), TransactionError<C::Error, P::Error>> {
self.modify_blocking(Entry {
data: EntryData::Remove(key),
version: 0,
})
}
pub fn contains_key_blocking(
&mut self,
key: &K,
) -> Result<Option<bool>, TransactionError<C::Error, P::Error>> {
if self.discarded {
return Err(TransactionError::Discard);
}
match self
.pending_writes
.as_ref()
.unwrap()
.get(key)
.map_err(TransactionError::pending)?
{
Some(ent) => {
if ent.value.is_none() {
return Ok(Some(false));
}
Ok(Some(true))
}
None => {
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_read(key);
}
Ok(None)
}
}
}
pub fn get_blocking<'a, 'b: 'a>(
&'a mut self,
key: &'b K,
) -> Result<Option<EntryRef<'a, K, V>>, TransactionError<C::Error, P::Error>> {
if self.discarded {
return Err(TransactionError::Discard);
}
if let Some(e) = self
.pending_writes
.as_ref()
.unwrap()
.get(key)
.map_err(TransactionError::Pwm)?
{
if e.value.is_none() {
return Ok(None);
}
Ok(Some(EntryRef {
data: match &e.value {
Some(value) => EntryDataRef::Insert { key, value },
None => EntryDataRef::Remove(key),
},
version: e.version,
}))
} else {
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_read(key);
}
Ok(None)
}
}
fn insert_with_blocking_in(
&mut self,
key: K,
value: V,
) -> Result<(), TransactionError<C::Error, P::Error>> {
let ent = Entry {
data: EntryData::Insert { key, value },
version: self.read_ts,
};
self.modify_blocking(ent)
}
fn modify_blocking(
&mut self,
ent: Entry<K, V>,
) -> Result<(), TransactionError<C::Error, P::Error>> {
if self.discarded {
return Err(TransactionError::Discard);
}
let pending_writes = self.pending_writes.as_mut().unwrap();
pending_writes
.validate_entry(&ent)
.map_err(TransactionError::Pwm)?;
let cnt = self.count + 1;
let size = self.size + pending_writes.estimate_size(&ent);
if cnt >= pending_writes.max_batch_entries() || size >= pending_writes.max_batch_size() {
return Err(TransactionError::LargeTxn);
}
self.count = cnt;
self.size = size;
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_conflict(ent.key());
}
let eversion = ent.version;
let (ek, ev) = ent.split();
if let Some((old_key, old_value)) = pending_writes
.remove_entry(&ek)
.map_err(TransactionError::Pwm)?
{
if old_value.version != eversion {
self
.duplicate_writes
.push(Entry::unsplit(old_key, old_value));
}
}
pending_writes
.insert(ek, ev)
.map_err(TransactionError::Pwm)?;
Ok(())
}
}
impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
where
C: CmComparable<Key = K>,
P: PwmEquivalent<Key = K, Value = V>,
S: AsyncSpawner,
{
pub fn contains_key_comparable_cm_equivalent_pm_blocking<'a, 'b: 'a, Q>(
&'a mut self,
key: &'b Q,
) -> Result<Option<bool>, TransactionError<C::Error, P::Error>>
where
K: Borrow<Q>,
Q: ?Sized + Eq + Ord + Hash,
{
match self
.pending_writes
.as_ref()
.unwrap()
.get_equivalent(key)
.map_err(TransactionError::pending)?
{
Some(ent) => {
if ent.value.is_none() {
return Ok(Some(false));
}
Ok(Some(true))
}
None => {
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_read_comparable(key);
}
Ok(None)
}
}
}
pub fn get_comparable_cm_equivalent_pm_blocking<'a, 'b: 'a, Q>(
&'a mut self,
key: &'b Q,
) -> Result<Option<EntryRef<'a, K, V>>, TransactionError<C::Error, P::Error>>
where
K: Borrow<Q>,
Q: ?Sized + Eq + Ord + Hash,
{
if let Some((k, e)) = self
.pending_writes
.as_ref()
.unwrap()
.get_entry_equivalent(key)
.map_err(TransactionError::Pwm)?
{
if e.value.is_none() {
return Ok(None);
}
Ok(Some(EntryRef {
data: match &e.value {
Some(value) => EntryDataRef::Insert { key: k, value },
None => EntryDataRef::Remove(k),
},
version: e.version,
}))
} else {
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_read_comparable(key);
}
Ok(None)
}
}
}
impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
where
C: CmEquivalent<Key = K>,
P: PwmComparable<Key = K, Value = V>,
S: AsyncSpawner,
{
pub fn contains_key_equivalent_cm_comparable_pm_blocking<'a, 'b: 'a, Q>(
&'a mut self,
key: &'b Q,
) -> Result<Option<bool>, TransactionError<C::Error, P::Error>>
where
K: Borrow<Q>,
Q: ?Sized + Eq + Ord + Hash,
{
match self
.pending_writes
.as_ref()
.unwrap()
.get_comparable(key)
.map_err(TransactionError::pending)?
{
Some(ent) => {
if ent.value.is_none() {
return Ok(Some(false));
}
Ok(Some(true))
}
None => {
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_read_equivalent(key);
}
Ok(None)
}
}
}
pub fn get_equivalent_cm_comparable_pm_blocking<'a, 'b: 'a, Q>(
&'a mut self,
key: &'b Q,
) -> Result<Option<EntryRef<'a, K, V>>, TransactionError<C::Error, P::Error>>
where
K: Borrow<Q>,
Q: ?Sized + Eq + Ord + Hash,
{
if let Some((k, e)) = self
.pending_writes
.as_ref()
.unwrap()
.get_entry_comparable(key)
.map_err(TransactionError::Pwm)?
{
if e.value.is_none() {
return Ok(None);
}
Ok(Some(EntryRef {
data: match &e.value {
Some(value) => EntryDataRef::Insert { key: k, value },
None => EntryDataRef::Remove(k),
},
version: e.version,
}))
} else {
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_read_equivalent(key);
}
Ok(None)
}
}
}
impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
where
C: CmComparable<Key = K>,
P: PwmComparable<Key = K, Value = V>,
S: AsyncSpawner,
{
pub fn contains_key_comparable_blocking<'a, 'b: 'a, Q>(
&'a mut self,
key: &'b Q,
) -> Result<Option<bool>, TransactionError<C::Error, P::Error>>
where
K: Borrow<Q>,
Q: ?Sized + Ord,
{
match self
.pending_writes
.as_ref()
.unwrap()
.get_comparable(key)
.map_err(TransactionError::pending)?
{
Some(ent) => {
if ent.value.is_none() {
return Ok(Some(false));
}
Ok(Some(true))
}
None => {
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_read_comparable(key);
}
Ok(None)
}
}
}
pub fn get_comparable_blocking<'a, 'b: 'a, Q>(
&'a mut self,
key: &'b Q,
) -> Result<Option<EntryRef<'a, K, V>>, TransactionError<C::Error, P::Error>>
where
K: Borrow<Q>,
Q: ?Sized + Ord,
{
if let Some((k, e)) = self
.pending_writes
.as_ref()
.unwrap()
.get_entry_comparable(key)
.map_err(TransactionError::Pwm)?
{
if e.value.is_none() {
return Ok(None);
}
Ok(Some(EntryRef {
data: match &e.value {
Some(value) => EntryDataRef::Insert { key: k, value },
None => EntryDataRef::Remove(k),
},
version: e.version,
}))
} else {
if let Some(ref mut conflict_manager) = self.conflict_manager {
conflict_manager.mark_read_comparable(key);
}
Ok(None)
}
}
}