async_txn/write/
blocking.rs

1use super::*;
2
3impl<K, V, C, P, S> AsyncTm<K, V, C, P, S>
4where
5  C: Cm<Key = K>,
6  P: Pwm<Key = K, Value = V>,
7  S: AsyncSpawner,
8{
9  /// Create a new writable transaction with
10  /// the default pending writes manager to store the pending writes.
11  pub async fn write_with_blocking_cm_and_pwm(
12    &self,
13    pending_manager_opts: P::Options,
14    conflict_manager_opts: C::Options,
15  ) -> Result<AsyncWtm<K, V, C, P, S>, TransactionError<C::Error, P::Error>> {
16    let read_ts = self.inner.read_ts().await;
17    Ok(AsyncWtm {
18      orc: self.inner.clone(),
19      read_ts,
20      size: 0,
21      count: 0,
22      conflict_manager: Some(C::new(conflict_manager_opts).map_err(TransactionError::conflict)?),
23      pending_writes: Some(P::new(pending_manager_opts).map_err(TransactionError::pending)?),
24      duplicate_writes: OneOrMore::new(),
25      discarded: false,
26      done_read: false,
27    })
28  }
29}
30
31impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
32where
33  C: CmComparable<Key = K>,
34  S: AsyncSpawner,
35{
36  /// Marks a key is read.
37  pub fn mark_read_comparable_blocking<Q>(&mut self, k: &Q)
38  where
39    K: Borrow<Q>,
40    Q: ?Sized + Ord,
41  {
42    if let Some(ref mut conflict_manager) = self.conflict_manager {
43      conflict_manager.mark_read_comparable(k);
44    }
45  }
46
47  /// Marks a key is conflict.
48  pub fn mark_conflict_comparable_blocking<Q>(&mut self, k: &Q)
49  where
50    K: Borrow<Q>,
51    Q: ?Sized + Ord,
52  {
53    if let Some(ref mut conflict_manager) = self.conflict_manager {
54      conflict_manager.mark_conflict_comparable(k);
55    }
56  }
57}
58
59impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
60where
61  C: CmEquivalent<Key = K>,
62  S: AsyncSpawner,
63{
64  /// Marks a key is read.
65  pub fn mark_read_equivalent_blocking<Q>(&mut self, k: &Q)
66  where
67    K: Borrow<Q>,
68    Q: ?Sized + Hash + Eq,
69  {
70    if let Some(ref mut conflict_manager) = self.conflict_manager {
71      conflict_manager.mark_read_equivalent(k);
72    }
73  }
74
75  /// Marks a key is conflict.
76  pub fn mark_conflict_equivalent_blocking<Q>(&mut self, k: &Q)
77  where
78    K: Borrow<Q>,
79    Q: ?Sized + Hash + Eq,
80  {
81    if let Some(ref mut conflict_manager) = self.conflict_manager {
82      conflict_manager.mark_conflict_equivalent(k);
83    }
84  }
85}
86
87impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
88where
89  C: Cm<Key = K>,
90  S: AsyncSpawner,
91{
92  /// This method is used to create a marker for the keys that are operated.
93  /// It must be used to mark keys when end user is implementing iterators to
94  /// make sure the transaction manager works correctly.
95  ///
96  /// `None` means the transaction has already been discarded.
97  pub fn blocking_marker(&mut self) -> Option<Marker<'_, C>> {
98    self.conflict_manager.as_mut().map(Marker::new)
99  }
100
101  /// Returns a marker for the keys that are operated and the pending writes manager.
102  ///
103  /// `None` means the transaction has already been discarded.
104  ///
105  /// As Rust's borrow checker does not allow to borrow mutable marker and the immutable pending writes manager at the same
106  /// time, this method is used to solve this problem.
107  pub fn blocking_marker_with_pm(&mut self) -> Option<(Marker<'_, C>, &P)> {
108    self
109      .conflict_manager
110      .as_mut()
111      .map(|marker| (Marker::new(marker), self.pending_writes.as_ref().unwrap()))
112  }
113
114  /// Marks a key is read.
115  pub fn mark_read_blocking(&mut self, k: &K) {
116    if let Some(ref mut conflict_manager) = self.conflict_manager {
117      conflict_manager.mark_read(k);
118    }
119  }
120
121  /// Marks a key is conflict.
122  pub fn mark_conflict_blocking(&mut self, k: &K) {
123    if let Some(ref mut conflict_manager) = self.conflict_manager {
124      conflict_manager.mark_conflict(k);
125    }
126  }
127}
128
129impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
130where
131  C: Cm<Key = K>,
132  P: Pwm<Key = K, Value = V>,
133  S: AsyncSpawner,
134{
135  /// Rolls back the transaction.
136  #[inline]
137  pub fn rollback_blocking(&mut self) -> Result<(), TransactionError<C::Error, P::Error>> {
138    if self.discarded {
139      return Err(TransactionError::Discard);
140    }
141
142    self
143      .pending_writes
144      .as_mut()
145      .unwrap()
146      .rollback()
147      .map_err(TransactionError::Pwm)?;
148    self
149      .conflict_manager
150      .as_mut()
151      .unwrap()
152      .rollback()
153      .map_err(TransactionError::Cm)?;
154    Ok(())
155  }
156
157  /// Insert a key-value pair to the transaction.
158  pub fn insert_blocking(
159    &mut self,
160    key: K,
161    value: V,
162  ) -> Result<(), TransactionError<C::Error, P::Error>> {
163    self.insert_with_blocking_in(key, value)
164  }
165
166  /// Removes a key.
167  ///
168  /// This is done by adding a delete marker for the key at commit timestamp.  Any
169  /// reads happening before this timestamp would be unaffected. Any reads after
170  /// this commit would see the deletion.
171  pub fn remove_blocking(&mut self, key: K) -> Result<(), TransactionError<C::Error, P::Error>> {
172    self.modify_blocking(Entry {
173      data: EntryData::Remove(key),
174      version: 0,
175    })
176  }
177
178  /// Returns `true` if the pending writes contains the key.
179  pub fn contains_key_blocking(
180    &mut self,
181    key: &K,
182  ) -> Result<Option<bool>, TransactionError<C::Error, P::Error>> {
183    if self.discarded {
184      return Err(TransactionError::Discard);
185    }
186
187    match self
188      .pending_writes
189      .as_ref()
190      .unwrap()
191      .get(key)
192      .map_err(TransactionError::pending)?
193    {
194      Some(ent) => {
195        // If the value is None, it means that the key is removed.
196        if ent.value.is_none() {
197          return Ok(Some(false));
198        }
199
200        // Fulfill from buffer.
201        Ok(Some(true))
202      }
203      None => {
204        // track reads. No need to track read if txn serviced it
205        // internally.
206        if let Some(ref mut conflict_manager) = self.conflict_manager {
207          conflict_manager.mark_read(key);
208        }
209
210        Ok(None)
211      }
212    }
213  }
214
215  /// Looks for the key in the pending writes, if such key is not in the pending writes,
216  /// the end user can read the key from the database.
217  pub fn get_blocking<'a, 'b: 'a>(
218    &'a mut self,
219    key: &'b K,
220  ) -> Result<Option<EntryRef<'a, K, V>>, TransactionError<C::Error, P::Error>> {
221    if self.discarded {
222      return Err(TransactionError::Discard);
223    }
224
225    if let Some(e) = self
226      .pending_writes
227      .as_ref()
228      .unwrap()
229      .get(key)
230      .map_err(TransactionError::Pwm)?
231    {
232      // If the value is None, it means that the key is removed.
233      if e.value.is_none() {
234        return Ok(None);
235      }
236
237      // Fulfill from buffer.
238      Ok(Some(EntryRef {
239        data: match &e.value {
240          Some(value) => EntryDataRef::Insert { key, value },
241          None => EntryDataRef::Remove(key),
242        },
243        version: e.version,
244      }))
245    } else {
246      // track reads. No need to track read if txn serviced it
247      // internally.
248      if let Some(ref mut conflict_manager) = self.conflict_manager {
249        conflict_manager.mark_read(key);
250      }
251
252      Ok(None)
253    }
254  }
255
256  fn insert_with_blocking_in(
257    &mut self,
258    key: K,
259    value: V,
260  ) -> Result<(), TransactionError<C::Error, P::Error>> {
261    let ent = Entry {
262      data: EntryData::Insert { key, value },
263      version: self.read_ts,
264    };
265
266    self.modify_blocking(ent)
267  }
268
269  fn modify_blocking(
270    &mut self,
271    ent: Entry<K, V>,
272  ) -> Result<(), TransactionError<C::Error, P::Error>> {
273    if self.discarded {
274      return Err(TransactionError::Discard);
275    }
276
277    let pending_writes = self.pending_writes.as_mut().unwrap();
278    pending_writes
279      .validate_entry(&ent)
280      .map_err(TransactionError::Pwm)?;
281
282    let cnt = self.count + 1;
283    // Extra bytes for the version in key.
284    let size = self.size + pending_writes.estimate_size(&ent);
285    if cnt >= pending_writes.max_batch_entries() || size >= pending_writes.max_batch_size() {
286      return Err(TransactionError::LargeTxn);
287    }
288
289    self.count = cnt;
290    self.size = size;
291
292    // The conflict_manager is used for conflict detection. If conflict detection
293    // is disabled, we don't need to store key hashes in the conflict_manager.
294    if let Some(ref mut conflict_manager) = self.conflict_manager {
295      conflict_manager.mark_conflict(ent.key());
296    }
297
298    // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
299    // Add the entry to duplicateWrites only if both the entries have different versions. For
300    // same versions, we will overwrite the existing entry.
301    let eversion = ent.version;
302    let (ek, ev) = ent.split();
303
304    if let Some((old_key, old_value)) = pending_writes
305      .remove_entry(&ek)
306      .map_err(TransactionError::Pwm)?
307    {
308      if old_value.version != eversion {
309        self
310          .duplicate_writes
311          .push(Entry::unsplit(old_key, old_value));
312      }
313    }
314    pending_writes
315      .insert(ek, ev)
316      .map_err(TransactionError::Pwm)?;
317
318    Ok(())
319  }
320}
321
322impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
323where
324  C: CmComparable<Key = K>,
325  P: PwmEquivalent<Key = K, Value = V>,
326  S: AsyncSpawner,
327{
328  /// Returns `true` if the pending writes contains the key.
329  ///
330  /// - `Ok(None)`: means the key is not in the pending writes, the end user can read the key from the database.
331  /// - `Ok(Some(true))`: means the key is in the pending writes.
332  /// - `Ok(Some(false))`: means the key is in the pending writes and but is a remove entry.
333  pub fn contains_key_comparable_cm_equivalent_pm_blocking<'a, 'b: 'a, Q>(
334    &'a mut self,
335    key: &'b Q,
336  ) -> Result<Option<bool>, TransactionError<C::Error, P::Error>>
337  where
338    K: Borrow<Q>,
339    Q: ?Sized + Eq + Ord + Hash,
340  {
341    match self
342      .pending_writes
343      .as_ref()
344      .unwrap()
345      .get_equivalent(key)
346      .map_err(TransactionError::pending)?
347    {
348      Some(ent) => {
349        // If the value is None, it means that the key is removed.
350        if ent.value.is_none() {
351          return Ok(Some(false));
352        }
353
354        // Fulfill from buffer.
355        Ok(Some(true))
356      }
357      None => {
358        // track reads. No need to track read if txn serviced it
359        // internally.
360        if let Some(ref mut conflict_manager) = self.conflict_manager {
361          conflict_manager.mark_read_comparable(key);
362        }
363
364        Ok(None)
365      }
366    }
367  }
368
369  /// Looks for the key in the pending writes, if such key is not in the pending writes,
370  /// the end user can read the key from the database.
371  pub fn get_comparable_cm_equivalent_pm_blocking<'a, 'b: 'a, Q>(
372    &'a mut self,
373    key: &'b Q,
374  ) -> Result<Option<EntryRef<'a, K, V>>, TransactionError<C::Error, P::Error>>
375  where
376    K: Borrow<Q>,
377    Q: ?Sized + Eq + Ord + Hash,
378  {
379    if let Some((k, e)) = self
380      .pending_writes
381      .as_ref()
382      .unwrap()
383      .get_entry_equivalent(key)
384      .map_err(TransactionError::Pwm)?
385    {
386      // If the value is None, it means that the key is removed.
387      if e.value.is_none() {
388        return Ok(None);
389      }
390
391      // Fulfill from buffer.
392      Ok(Some(EntryRef {
393        data: match &e.value {
394          Some(value) => EntryDataRef::Insert { key: k, value },
395          None => EntryDataRef::Remove(k),
396        },
397        version: e.version,
398      }))
399    } else {
400      // track reads. No need to track read if txn serviced it
401      // internally.
402      if let Some(ref mut conflict_manager) = self.conflict_manager {
403        conflict_manager.mark_read_comparable(key);
404      }
405
406      Ok(None)
407    }
408  }
409}
410
411impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
412where
413  C: CmEquivalent<Key = K>,
414  P: PwmComparable<Key = K, Value = V>,
415  S: AsyncSpawner,
416{
417  /// Returns `true` if the pending writes contains the key.
418  ///
419  /// - `Ok(None)`: means the key is not in the pending writes, the end user can read the key from the database.
420  /// - `Ok(Some(true))`: means the key is in the pending writes.
421  /// - `Ok(Some(false))`: means the key is in the pending writes and but is a remove entry.
422  pub fn contains_key_equivalent_cm_comparable_pm_blocking<'a, 'b: 'a, Q>(
423    &'a mut self,
424    key: &'b Q,
425  ) -> Result<Option<bool>, TransactionError<C::Error, P::Error>>
426  where
427    K: Borrow<Q>,
428    Q: ?Sized + Eq + Ord + Hash,
429  {
430    match self
431      .pending_writes
432      .as_ref()
433      .unwrap()
434      .get_comparable(key)
435      .map_err(TransactionError::pending)?
436    {
437      Some(ent) => {
438        // If the value is None, it means that the key is removed.
439        if ent.value.is_none() {
440          return Ok(Some(false));
441        }
442
443        // Fulfill from buffer.
444        Ok(Some(true))
445      }
446      None => {
447        // track reads. No need to track read if txn serviced it
448        // internally.
449        if let Some(ref mut conflict_manager) = self.conflict_manager {
450          conflict_manager.mark_read_equivalent(key);
451        }
452
453        Ok(None)
454      }
455    }
456  }
457
458  /// Looks for the key in the pending writes, if such key is not in the pending writes,
459  /// the end user can read the key from the database.
460  pub fn get_equivalent_cm_comparable_pm_blocking<'a, 'b: 'a, Q>(
461    &'a mut self,
462    key: &'b Q,
463  ) -> Result<Option<EntryRef<'a, K, V>>, TransactionError<C::Error, P::Error>>
464  where
465    K: Borrow<Q>,
466    Q: ?Sized + Eq + Ord + Hash,
467  {
468    if let Some((k, e)) = self
469      .pending_writes
470      .as_ref()
471      .unwrap()
472      .get_entry_comparable(key)
473      .map_err(TransactionError::Pwm)?
474    {
475      // If the value is None, it means that the key is removed.
476      if e.value.is_none() {
477        return Ok(None);
478      }
479
480      // Fulfill from buffer.
481      Ok(Some(EntryRef {
482        data: match &e.value {
483          Some(value) => EntryDataRef::Insert { key: k, value },
484          None => EntryDataRef::Remove(k),
485        },
486        version: e.version,
487      }))
488    } else {
489      // track reads. No need to track read if txn serviced it
490      // internally.
491      if let Some(ref mut conflict_manager) = self.conflict_manager {
492        conflict_manager.mark_read_equivalent(key);
493      }
494
495      Ok(None)
496    }
497  }
498}
499
500impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
501where
502  C: CmComparable<Key = K>,
503  P: PwmComparable<Key = K, Value = V>,
504  S: AsyncSpawner,
505{
506  /// Returns `true` if the pending writes contains the key.
507  ///
508  /// - `Ok(None)`: means the key is not in the pending writes, the end user can read the key from the database.
509  /// - `Ok(Some(true))`: means the key is in the pending writes.
510  /// - `Ok(Some(false))`: means the key is in the pending writes and but is a remove entry.
511  pub fn contains_key_comparable_blocking<'a, 'b: 'a, Q>(
512    &'a mut self,
513    key: &'b Q,
514  ) -> Result<Option<bool>, TransactionError<C::Error, P::Error>>
515  where
516    K: Borrow<Q>,
517    Q: ?Sized + Ord,
518  {
519    match self
520      .pending_writes
521      .as_ref()
522      .unwrap()
523      .get_comparable(key)
524      .map_err(TransactionError::pending)?
525    {
526      Some(ent) => {
527        // If the value is None, it means that the key is removed.
528        if ent.value.is_none() {
529          return Ok(Some(false));
530        }
531
532        // Fulfill from buffer.
533        Ok(Some(true))
534      }
535      None => {
536        // track reads. No need to track read if txn serviced it
537        // internally.
538        if let Some(ref mut conflict_manager) = self.conflict_manager {
539          conflict_manager.mark_read_comparable(key);
540        }
541
542        Ok(None)
543      }
544    }
545  }
546
547  /// Looks for the key in the pending writes, if such key is not in the pending writes,
548  /// the end user can read the key from the database.
549  pub fn get_comparable_blocking<'a, 'b: 'a, Q>(
550    &'a mut self,
551    key: &'b Q,
552  ) -> Result<Option<EntryRef<'a, K, V>>, TransactionError<C::Error, P::Error>>
553  where
554    K: Borrow<Q>,
555    Q: ?Sized + Ord,
556  {
557    if let Some((k, e)) = self
558      .pending_writes
559      .as_ref()
560      .unwrap()
561      .get_entry_comparable(key)
562      .map_err(TransactionError::Pwm)?
563    {
564      // If the value is None, it means that the key is removed.
565      if e.value.is_none() {
566        return Ok(None);
567      }
568
569      // Fulfill from buffer.
570      Ok(Some(EntryRef {
571        data: match &e.value {
572          Some(value) => EntryDataRef::Insert { key: k, value },
573          None => EntryDataRef::Remove(k),
574        },
575        version: e.version,
576      }))
577    } else {
578      // track reads. No need to track read if txn serviced it
579      // internally.
580      if let Some(ref mut conflict_manager) = self.conflict_manager {
581        conflict_manager.mark_read_comparable(key);
582      }
583
584      Ok(None)
585    }
586  }
587}