async_skipdb/serializable/
optimistic.rs

1use async_txn::{error::WtmError, PwmComparableRange};
2use skipdb_core::rev_range::WriteTransactionRevRange;
3
4use std::{convert::Infallible, future::Future};
5
6use super::*;
7
8#[cfg(all(test, any(feature = "tokio", feature = "smol", feature = "async-std")))]
9mod tests;
10
11/// A optimistic concurrency control transaction over the [`SerializableDb`].
12pub struct OptimisticTransaction<K, V, S: AsyncSpawner> {
13  pub(super) db: SerializableDb<K, V, S>,
14  pub(super) wtm: AsyncWtm<K, V, BTreeCm<K>, BTreePwm<K, V>, S>,
15}
16
17impl<K, V, S> OptimisticTransaction<K, V, S>
18where
19  K: CheapClone + Ord,
20  S: AsyncSpawner,
21{
22  #[inline]
23  pub(super) async fn new(db: SerializableDb<K, V, S>) -> Self {
24    let wtm = db
25      .inner
26      .tm
27      .write_with_blocking_cm_and_pwm((), ())
28      .await
29      .unwrap();
30    Self { db, wtm }
31  }
32}
33
34impl<K, V, S> OptimisticTransaction<K, V, S>
35where
36  K: CheapClone + Ord + Send + Sync + 'static,
37  V: Send + Sync + 'static,
38  S: AsyncSpawner,
39{
40  /// Commits the transaction, following these steps:
41  ///
42  /// 1. If there are no writes, return immediately.
43  ///
44  /// 2. Check if read rows were updated since txn started. If so, return `TransactionError::Conflict`.
45  ///
46  /// 3. If no conflict, generate a commit timestamp and update written rows' commit ts.
47  ///
48  /// 4. Batch up all writes, write them to database.
49  ///
50  /// 5. If callback is provided, database will return immediately after checking
51  /// for conflicts. Writes to the database will happen in the background.  If
52  /// there is a conflict, an error will be returned and the callback will not
53  /// run. If there are no conflicts, the callback will be called in the
54  /// background upon successful completion of writes or any error during write.
55  #[inline]
56  pub async fn commit(&mut self) -> Result<(), WtmError<Infallible, Infallible, Infallible>> {
57    let db = self.db.clone();
58    self
59      .wtm
60      .commit(|ents| async move {
61        db.inner.map.apply(ents);
62        Ok(())
63      })
64      .await
65  }
66}
67
68impl<K, V, S> OptimisticTransaction<K, V, S>
69where
70  K: CheapClone + Ord + Send + Sync + 'static,
71  V: Send + Sync + 'static,
72  S: AsyncSpawner,
73{
74  /// Acts like [`commit`](WriteTransaction::commit), but takes a callback, which gets run via a
75  /// thread to avoid blocking this function. Following these steps:
76  ///
77  /// 1. If there are no writes, return immediately, callback will be invoked.
78  ///
79  /// 2. Check if read rows were updated since txn started. If so, return `TransactionError::Conflict`.
80  ///
81  /// 3. If no conflict, generate a commit timestamp and update written rows' commit ts.
82  ///
83  /// 4. Batch up all writes, write them to database.
84  ///
85  /// 5. Return immediately after checking for conflicts.
86  /// If there is a conflict, an error will be returned immediately and the callback will not
87  /// run. If there are no conflicts, the callback will be called in the
88  /// background upon successful completion of writes or any error during write.
89  #[inline]
90  pub async fn commit_with_task<Fut, E, R>(
91    &mut self,
92    callback: impl FnOnce(Result<(), E>) -> Fut + Send + 'static,
93  ) -> Result<S::JoinHandle<R>, WtmError<Infallible, Infallible, E>>
94  where
95    E: std::error::Error + Send,
96    Fut: Future<Output = R> + Send + 'static,
97    R: Send + 'static,
98  {
99    let db = self.db.clone();
100
101    self
102      .wtm
103      .commit_with_task(
104        move |ents| async move {
105          db.inner.map.apply(ents);
106          Ok(())
107        },
108        callback,
109      )
110      .await
111  }
112}
113
114impl<K, V, S> OptimisticTransaction<K, V, S>
115where
116  K: CheapClone + Ord,
117  S: AsyncSpawner,
118{
119  /// Returns the read version of the transaction.
120  #[inline]
121  pub fn version(&self) -> u64 {
122    self.wtm.version()
123  }
124
125  /// Rollback the transaction.
126  #[inline]
127  pub fn rollback(&mut self) -> Result<(), TransactionError<Infallible, Infallible>> {
128    self.wtm.rollback_blocking()
129  }
130
131  /// Returns true if the given key exists in the database.
132  #[inline]
133  pub fn contains_key(
134    &mut self,
135    key: &K,
136  ) -> Result<bool, TransactionError<Infallible, Infallible>> {
137    let version = self.wtm.version();
138    match self.wtm.contains_key_blocking(key)? {
139      Some(true) => Ok(true),
140      Some(false) => Ok(false),
141      None => Ok(self.db.inner.map.contains_key(key, version)),
142    }
143  }
144
145  /// Get a value from the database.
146  #[inline]
147  pub fn get<'a, 'b: 'a>(
148    &'a mut self,
149    key: &'b K,
150  ) -> Result<Option<Ref<'a, K, V>>, TransactionError<Infallible, Infallible>> {
151    let version = self.wtm.version();
152    match self.wtm.get_blocking(key)? {
153      Some(v) => {
154        if v.value().is_some() {
155          Ok(Some(v.into()))
156        } else {
157          Ok(None)
158        }
159      }
160      None => Ok(self.db.inner.map.get(key, version).map(Into::into)),
161    }
162  }
163
164  /// Insert a new key-value pair.
165  #[inline]
166  pub fn insert(
167    &mut self,
168    key: K,
169    value: V,
170  ) -> Result<(), TransactionError<Infallible, Infallible>> {
171    self.wtm.insert_blocking(key, value)
172  }
173
174  /// Remove a key.
175  #[inline]
176  pub fn remove(&mut self, key: K) -> Result<(), TransactionError<Infallible, Infallible>> {
177    self.wtm.remove_blocking(key)
178  }
179
180  /// Iterate over the entries of the write transaction.
181  #[inline]
182  pub fn iter(
183    &mut self,
184  ) -> Result<TransactionIter<'_, K, V, BTreeCm<K>>, TransactionError<Infallible, Infallible>> {
185    let version = self.wtm.version();
186    let (marker, pm) = self
187      .wtm
188      .blocking_marker_with_pm()
189      .ok_or(TransactionError::Discard)?;
190    let committed = self.db.inner.map.iter(version);
191    let pendings = pm.iter();
192
193    Ok(TransactionIter::new(pendings, committed, Some(marker)))
194  }
195
196  /// Iterate over the entries of the write transaction in reverse order.
197  #[inline]
198  pub fn iter_rev(
199    &mut self,
200  ) -> Result<WriteTransactionRevIter<'_, K, V, BTreeCm<K>>, TransactionError<Infallible, Infallible>>
201  {
202    let version = self.wtm.version();
203    let (marker, pm) = self
204      .wtm
205      .blocking_marker_with_pm()
206      .ok_or(TransactionError::Discard)?;
207
208    let committed = self.db.inner.map.iter_rev(version);
209    let pendings = pm.iter().rev();
210
211    Ok(WriteTransactionRevIter::new(
212      pendings,
213      committed,
214      Some(marker),
215    ))
216  }
217
218  /// Returns an iterator over the subset of entries of the database.
219  #[inline]
220  pub fn range<'a, Q, R>(
221    &'a mut self,
222    range: R,
223  ) -> Result<TransactionRange<'a, Q, R, K, V, BTreeCm<K>>, TransactionError<Infallible, Infallible>>
224  where
225    K: Borrow<Q>,
226    R: RangeBounds<Q> + 'a,
227    Q: Ord + ?Sized,
228  {
229    let version = self.wtm.version();
230    let (marker, pm) = self
231      .wtm
232      .blocking_marker_with_pm()
233      .ok_or(TransactionError::Discard)?;
234    let start = range.start_bound();
235    let end = range.end_bound();
236    let pendings = pm.range_comparable((start, end));
237    let committed = self.db.inner.map.range(range, version);
238
239    Ok(TransactionRange::new(pendings, committed, Some(marker)))
240  }
241
242  /// Returns an iterator over the subset of entries of the database in reverse order.
243  #[inline]
244  pub fn range_rev<'a, Q, R>(
245    &'a mut self,
246    range: R,
247  ) -> Result<
248    WriteTransactionRevRange<'a, Q, R, K, V, BTreeCm<K>>,
249    TransactionError<Infallible, Infallible>,
250  >
251  where
252    K: Borrow<Q>,
253    R: RangeBounds<Q> + 'a,
254    Q: Ord + ?Sized,
255  {
256    let version = self.wtm.version();
257    let (marker, pm) = self
258      .wtm
259      .blocking_marker_with_pm()
260      .ok_or(TransactionError::Discard)?;
261    let start = range.start_bound();
262    let end = range.end_bound();
263    let pendings = pm.range_comparable((start, end)).rev();
264    let committed = self.db.inner.map.range_rev(range, version);
265
266    Ok(WriteTransactionRevRange::new(
267      pendings,
268      committed,
269      Some(marker),
270    ))
271  }
272}