mwmr/
lib.rs

1//! A generic optimistic transaction manger, which is ACID, concurrent with SSI (Serializable Snapshot Isolation).
2//!
3//! For tokio runtime, please see [`tokio-mwmr`](https://crates.io/crates/tokio-mwmr)
4//!
5//! For other async runtime, [`async-mwmr`](https://crates.io/crates/async-mwmr)
6#![allow(clippy::type_complexity)]
7#![forbid(unsafe_code)]
8#![deny(warnings, missing_docs)]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10#![cfg_attr(docsrs, allow(unused_attributes))]
11
12use std::{cell::RefCell, sync::Arc};
13
14use core::{hash::BuildHasher, mem};
15
16use either::Either;
17use indexmap::{IndexMap, IndexSet};
18use smallvec_wrapper::MediumVec;
19pub use smallvec_wrapper::OneOrMore;
20
21/// Error types for the [`mwmr`] crate.
22pub mod error;
23
24/// Generic unit tests for users to test their database implementation based on `mwmr`.
25#[cfg(any(feature = "test", test))]
26#[cfg_attr(docsrs, doc(cfg(feature = "test")))]
27pub mod tests;
28
29mod oracle;
30use oracle::*;
31mod read;
32pub use read::*;
33mod write;
34pub use write::*;
35
36pub use mwmr_core::{sync::*, types::*};
37
38/// Options for the [`TransactionDB`].
39#[derive(Debug, Clone)]
40pub struct Options {
41  detect_conflicts: bool,
42}
43
44impl core::default::Default for Options {
45  fn default() -> Self {
46    Self::new()
47  }
48}
49
50impl Options {
51  /// Create a new options with default values.
52  #[inline]
53  pub const fn new() -> Self {
54    Self {
55      detect_conflicts: true,
56    }
57  }
58
59  /// Returns whether the transactions would be checked for conflicts.
60  #[inline]
61  pub const fn detect_conflicts(&self) -> bool {
62    self.detect_conflicts
63  }
64
65  /// Set whether the transactions would be checked for conflicts.
66  #[inline]
67  pub fn set_detect_conflicts(&mut self, detect_conflicts: bool) -> &mut Self {
68    self.detect_conflicts = detect_conflicts;
69    self
70  }
71
72  /// Set whether the transactions would be checked for conflicts.
73  #[inline]
74  pub const fn with_detect_conflicts(mut self, detect_conflicts: bool) -> Self {
75    self.detect_conflicts = detect_conflicts;
76    self
77  }
78}
79
80struct Inner<D, S = std::hash::RandomState> {
81  db: D,
82  /// Determines whether the transactions would be checked for conflicts.
83  /// The transactions can be processed at a higher rate when conflict detection is disabled.
84  opts: Options,
85  orc: Oracle<S>,
86  hasher: S,
87}
88/// A multi-writer multi-reader MVCC, ACID, Serializable Snapshot Isolation transaction manager.
89pub struct TransactionDB<D, S = std::hash::RandomState> {
90  inner: Arc<Inner<D, S>>,
91}
92
93impl<D, S> Clone for TransactionDB<D, S> {
94  fn clone(&self) -> Self {
95    Self {
96      inner: self.inner.clone(),
97    }
98  }
99}
100
101impl<D: Database, S: BuildHasher + Default + Clone + 'static> TransactionDB<D, S>
102where
103  D::Key: Eq + core::hash::Hash,
104{
105  /// Create a new writable transaction with
106  /// the default pending writes manager to store the pending writes.
107  pub fn write(&self) -> WriteTransaction<D, IndexMapManager<D::Key, D::Value, S>, S> {
108    WriteTransaction {
109      db: self.clone(),
110      read_ts: self.inner.orc.read_ts(),
111      size: 0,
112      count: 0,
113      reads: MediumVec::new(),
114      conflict_keys: if self.inner.opts.detect_conflicts {
115        Some(IndexSet::with_hasher(self.inner.hasher.clone()))
116      } else {
117        None
118      },
119      pending_writes: Some(IndexMap::with_hasher(S::default())),
120      duplicate_writes: OneOrMore::new(),
121      discarded: false,
122      done_read: false,
123    }
124  }
125}
126
127impl<D: Database, S: Clone + 'static> TransactionDB<D, S> {
128  /// Create a new writable transaction with the given pending writes manager to store the pending writes.
129  pub fn write_by<W: PendingManager>(&self, backend: W) -> WriteTransaction<D, W, S> {
130    WriteTransaction {
131      db: self.clone(),
132      read_ts: self.inner.orc.read_ts(),
133      size: 0,
134      count: 0,
135      reads: MediumVec::new(),
136      conflict_keys: if self.inner.opts.detect_conflicts {
137        Some(IndexSet::with_hasher(self.inner.hasher.clone()))
138      } else {
139        None
140      },
141      pending_writes: Some(backend),
142      duplicate_writes: OneOrMore::new(),
143      discarded: false,
144      done_read: false,
145    }
146  }
147}
148
149impl<D: Database, S: Default> TransactionDB<D, S> {
150  /// Open the database with the given options.
151  pub fn new(transaction_opts: Options, database_opts: D::Options) -> Result<Self, D::Error> {
152    Self::with_hasher(transaction_opts, database_opts, S::default())
153  }
154}
155
156impl<D: Database, S> TransactionDB<D, S> {
157  /// Open the database with the given options.
158  pub fn with_hasher(
159    transaction_opts: Options,
160    database_opts: D::Options,
161    hasher: S,
162  ) -> Result<Self, D::Error> {
163    D::open(database_opts).map(|db| Self {
164      inner: Arc::new(Inner {
165        orc: {
166          let next_ts = db.maximum_version();
167          let orc = Oracle::new(
168            format!("{}.pending_reads", core::any::type_name::<D>()).into(),
169            format!("{}.txn_timestamps", core::any::type_name::<D>()).into(),
170            transaction_opts.detect_conflicts(),
171            next_ts,
172          );
173          orc.read_mark.done_unchecked(next_ts);
174          orc.txn_mark.done_unchecked(next_ts);
175          orc.increment_next_ts();
176          orc
177        },
178        db,
179        opts: transaction_opts,
180        hasher,
181      }),
182    })
183  }
184
185  /// Returns a timestamp which hints that any versions under this timestamp can be discard.
186  /// This is useful when users want to implement compaction/merge functionality.
187  pub fn discard_hint(&self) -> u64 {
188    self.inner.orc.discard_at_or_below()
189  }
190
191  /// Returns the options of the database.
192  pub fn database_options(&self) -> &D::Options {
193    self.inner.db.options()
194  }
195
196  /// Returns the options of the transaction.
197  pub fn transaction_options(&self) -> &Options {
198    &self.inner.opts
199  }
200
201  /// Returns underlying database.
202  ///
203  /// **Note**: You should not use this method get the underlying database and read/write directly.
204  /// This method is only for you to implement advanced functionalities, such as compaction, merge, etc.
205  pub fn database(&self) -> &D {
206    &self.inner.db
207  }
208
209  /// Create a new writable transaction.
210  pub fn read(&self) -> ReadTransaction<D, S> {
211    ReadTransaction {
212      db: self.clone(),
213      read_ts: self.inner.orc.read_ts(),
214    }
215  }
216
217  #[inline]
218  fn orc(&self) -> &Oracle<S> {
219    &self.inner.orc
220  }
221}