async_txn/
lib.rs

1//! A generic optimistic transaction manger, which is ACID, concurrent with SSI (Serializable Snapshot Isolation).
2//!
3//! For sync version, please see [`txn`](https://crates.io/crates/txn)
4//!
5#![allow(clippy::type_complexity)]
6#![forbid(unsafe_code)]
7#![deny(warnings, missing_docs)]
8#![cfg_attr(docsrs, feature(doc_cfg))]
9#![cfg_attr(docsrs, allow(unused_attributes))]
10
11use std::sync::Arc;
12
13use core::mem;
14
15use error::TransactionError;
16pub use smallvec_wrapper::OneOrMore;
17
18pub use wmark::{AsyncSpawner, Detach};
19
20#[cfg(feature = "smol")]
21pub use wmark::SmolSpawner;
22
23#[cfg(feature = "async-std")]
24pub use wmark::AsyncStdSpawner;
25
26#[cfg(feature = "tokio")]
27pub use wmark::TokioSpawner;
28
29#[cfg(feature = "wasm")]
30pub use wmark::WasmSpawner;
31
32/// Error types for the [`async-txn`] crate.
33pub use txn_core::error;
34
35mod oracle;
36use oracle::*;
37mod read;
38pub use read::*;
39mod write;
40pub use write::*;
41
42pub use txn_core::{
43  future::*,
44  sync::{
45    BTreeCm, BTreePwm, Cm, CmComparable, CmEquivalent, HashCm, HashCmOptions, IndexMapPwm, Marker,
46    Pwm, PwmComparable, PwmComparableRange, PwmEquivalent, PwmEquivalentRange, PwmRange,
47  },
48  types::*,
49};
50
51/// A multi-writer multi-reader MVCC, ACID, Serializable Snapshot Isolation transaction manager.
52pub struct AsyncTm<K, V, C, P, S>
53where
54  S: AsyncSpawner,
55{
56  inner: Arc<Oracle<C, S>>,
57  _phantom: std::marker::PhantomData<(K, V, P)>,
58}
59
60impl<K, V, C, P, S> Clone for AsyncTm<K, V, C, P, S>
61where
62  S: AsyncSpawner,
63{
64  fn clone(&self) -> Self {
65    Self {
66      inner: self.inner.clone(),
67      _phantom: std::marker::PhantomData,
68    }
69  }
70}
71
72impl<K, V, C, P, S> AsyncTm<K, V, C, P, S>
73where
74  C: AsyncCm<Key = K>,
75  P: AsyncPwm<Key = K, Value = V>,
76  S: AsyncSpawner,
77{
78  /// Create a new writable transaction with
79  /// the default pending writes manager to store the pending writes.
80  pub async fn write(
81    &self,
82    pending_manager_opts: P::Options,
83    conflict_manager_opts: C::Options,
84  ) -> Result<AsyncWtm<K, V, C, P, S>, TransactionError<C::Error, P::Error>> {
85    let read_ts = self.inner.read_ts().await;
86    Ok(AsyncWtm {
87      orc: self.inner.clone(),
88      read_ts,
89      size: 0,
90      count: 0,
91      conflict_manager: Some(
92        C::new(conflict_manager_opts)
93          .await
94          .map_err(TransactionError::conflict)?,
95      ),
96      pending_writes: Some(
97        P::new(pending_manager_opts)
98          .await
99          .map_err(TransactionError::pending)?,
100      ),
101      duplicate_writes: OneOrMore::new(),
102      discarded: false,
103      done_read: false,
104    })
105  }
106}
107
108impl<K, V, C, P, S> AsyncTm<K, V, C, P, S>
109where
110  S: AsyncSpawner,
111{
112  /// Create a new transaction manager with the given name (just for logging or debugging, use your crate name is enough)
113  /// and the current version (provided by the database).
114  #[inline]
115  pub async fn new(name: &str, current_version: u64) -> Self {
116    Self {
117      inner: Arc::new({
118        let next_ts = current_version;
119        let orc = Oracle::new(
120          format!("{}.pending_reads", name).into(),
121          format!("{}.txn_timestamps", name).into(),
122          next_ts,
123        );
124        orc.read_mark.done(next_ts).unwrap();
125        orc.txn_mark.done(next_ts).unwrap();
126        orc.increment_next_ts().await;
127        orc
128      }),
129      _phantom: std::marker::PhantomData,
130    }
131  }
132
133  /// Returns the current read version of the database.
134  #[inline]
135  pub async fn version(&self) -> u64 {
136    self.inner.read_ts().await
137  }
138
139  /// Close the transaction manager.
140  #[inline]
141  pub async fn close(&self) {
142    self.inner.stop().await;
143  }
144}
145
146impl<K, V, C, P, S> AsyncTm<K, V, C, P, S>
147where
148  S: AsyncSpawner,
149{
150  /// Returns a timestamp which hints that any versions under this timestamp can be discard.
151  /// This is useful when users want to implement compaction/merge functionality.
152  pub fn discard_hint(&self) -> u64 {
153    self.inner.discard_at_or_below()
154  }
155
156  /// Create a new writable transaction.
157  pub async fn read(&self) -> AsyncRtm<K, V, C, P, S> {
158    AsyncRtm {
159      db: self.clone(),
160      read_ts: self.inner.read_ts().await,
161    }
162  }
163}