txn/
lib.rs

1//! A generic optimistic transaction manger, which is ACID, concurrent with SSI (Serializable Snapshot Isolation).
2//!
3//! For other async runtime, [`async-txn`](https://crates.io/crates/async-txn)
4#![allow(clippy::type_complexity)]
5#![forbid(unsafe_code)]
6#![deny(warnings, missing_docs)]
7#![cfg_attr(docsrs, feature(doc_cfg))]
8#![cfg_attr(docsrs, allow(unused_attributes))]
9
10use std::sync::Arc;
11
12use core::mem;
13
14pub use smallvec_wrapper::OneOrMore;
15use txn_core::error::TransactionError;
16
17/// Error types for the [`txn`] crate.
18pub use txn_core::error;
19
20mod oracle;
21use oracle::*;
22mod read;
23pub use read::*;
24mod write;
25pub use write::*;
26
27pub use txn_core::{sync::*, types::*};
28
29/// A multi-writer multi-reader MVCC, ACID, Serializable Snapshot Isolation transaction manager.
30pub struct Tm<K, V, C, P> {
31  inner: Arc<Oracle<C>>,
32  _phantom: std::marker::PhantomData<(K, V, P)>,
33}
34
35impl<K, V, C, P> Clone for Tm<K, V, C, P> {
36  fn clone(&self) -> Self {
37    Self {
38      inner: self.inner.clone(),
39      _phantom: std::marker::PhantomData,
40    }
41  }
42}
43
44impl<K, V, C, P> Tm<K, V, C, P>
45where
46  C: Cm<Key = K>,
47  P: Pwm<Key = K, Value = V>,
48{
49  /// Create a new writable transaction with
50  /// the default pending writes manager to store the pending writes.
51  pub fn write(
52    &self,
53    pending_manager_opts: P::Options,
54    conflict_manager_opts: C::Options,
55  ) -> Result<Wtm<K, V, C, P>, TransactionError<C::Error, P::Error>> {
56    let read_ts = self.inner.read_ts();
57    Ok(Wtm {
58      orc: self.inner.clone(),
59      read_ts,
60      size: 0,
61      count: 0,
62      conflict_manager: Some(C::new(conflict_manager_opts).map_err(TransactionError::conflict)?),
63      pending_writes: Some(P::new(pending_manager_opts).map_err(TransactionError::pending)?),
64      duplicate_writes: OneOrMore::new(),
65      discarded: false,
66      done_read: false,
67    })
68  }
69}
70
71impl<K, V, C, P> Tm<K, V, C, P> {
72  /// Create a new transaction manager with the given name (just for logging or debugging, use your crate name is enough)
73  /// and the current version (provided by the database).
74  #[inline]
75  pub fn new(name: &str, current_version: u64) -> Self {
76    Self {
77      inner: Arc::new({
78        let next_ts = current_version;
79        let orc = Oracle::new(
80          format!("{}.pending_reads", name).into(),
81          format!("{}.txn_timestamps", name).into(),
82          next_ts,
83        );
84        orc.read_mark.done(next_ts).unwrap();
85        orc.txn_mark.done(next_ts).unwrap();
86        orc.increment_next_ts();
87        orc
88      }),
89      _phantom: std::marker::PhantomData,
90    }
91  }
92
93  /// Returns the current read version of the transaction manager.
94  #[inline]
95  pub fn version(&self) -> u64 {
96    self.inner.read_ts()
97  }
98}
99
100impl<K, V, C, P> Tm<K, V, C, P> {
101  /// Returns a timestamp which hints that any versions under this timestamp can be discard.
102  /// This is useful when users want to implement compaction/merge functionality.
103  pub fn discard_hint(&self) -> u64 {
104    self.inner.discard_at_or_below()
105  }
106
107  /// Create a new writable transaction.
108  pub fn read(&self) -> Rtm<K, V, C, P> {
109    Rtm {
110      db: self.clone(),
111      read_ts: self.inner.read_ts(),
112    }
113  }
114}