1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
//! `mvcc_cell` is a multiversion concurrency-control based
//! transactional-memory system for Rust.
//!
//! Transactions are fully isolated and serialized with respect
//! to controlled values:
//! * Each transaction sees a fixed snapshot as of its start time
//! * Any concurrent commits to cells that a transaction has accessed will
//! prevent that transaction from committing
//!
//! ## Example Usage
//! ```
//! use mvcc_cell::{Mvcc,MvccCell};
//! let mvcc = Mvcc::new();
//!
//! // Create a transactional slot
//! let slot = MvccCell::new(&mvcc, Box::new(0));
//!
//! // Start concurrent transactions
//! let mut t1 = mvcc.begin();
//! let mut t2 = mvcc.begin();
//! let ro = mvcc.begin();
//!
//! // Uncommitted values are not visible outside the transaction
//! t1[&slot] = 42;
//! assert_eq!(t2[&slot], 0);
//!
//! // First committer wins, regardless of actual modification order
//! t2[&slot] = 7;
//! assert!(t2.try_commit().is_ok());
//!
//! // Failed commits return the transaction, so you can still read
//! // the computed values
//! let t1_fail = t1.try_commit().unwrap_err();
//! assert_eq!(t1_fail[&slot], 42);
//!
//! // Transactions always read the values that were current when
//! // begin() was called.
//! assert_eq!(ro[&slot], 0);
//! assert_eq!(mvcc.begin()[&slot], 7);
//! ```
mod histogram;
mod history;
mod cell;
mod vacuum;
#[cfg(test)] mod tests;
use histogram::{TokenHist, Token};
use guest_cell::{Guest, id::{UniqueId,Id}};
use parking_lot::Mutex;
use std::collections::HashSet;
use cell::ErasedCell;
pub use cell::MvccCell;
pub use vacuum::Vacuum;
#[cfg(not(feature = "wide_id"))] type TxnId = u64;
#[cfg(feature = "wide_id")] type TxnId = u128;
/// Controller for a set of `MvccCell`s
///
/// See the main crate documentation for more details.
pub struct Mvcc {
id: Id,
guard: Mutex<UniqueId>,
current: Mutex<TxnId>,
txns: TokenHist<TxnId>
}
/// A mutable snapshot of the controlled `MvccCell`s. Created via `Mvcc::begin()`.
pub struct Transaction<'a> {
mvcc: &'a Mvcc,
seq: Token<'a,TxnId>,
pending: Guest<'a>,
visible: Mutex<HashSet<ErasedCell<'a>>>,
}
impl Mvcc {
pub fn new()->Self {
let unique_id = UniqueId::default();
Mvcc {
id: *unique_id,
guard: Mutex::new(unique_id),
current: Mutex::new(0),
txns: TokenHist::default()
}
}
/// Take a snapshot of the transactional state
pub fn begin(&self)->Transaction<'_> {
let current: TxnId = *self.current.lock();
Transaction {
mvcc: self,
seq: self.txns.make_token(current),
pending: Guest::default(),
visible: Mutex::new(HashSet::new()),
}
}
pub(crate) fn id(&self)->Id {
self.id
}
/// Repeat an operation until the transaction commits successfully
pub fn update<O>(&self, mut f:impl FnMut(&mut Transaction<'_>)->O)->O {
loop {
let mut txn = self.begin();
let out = f(&mut txn);
if txn.try_commit().is_ok() { return out; }
}
}
}
impl Transaction<'_> {
/// Attempt to commit changes
///
/// Returns `Err(self)` if any `MvccCell` seen by this transaction has committed
/// changes since `begin()` was called.
pub fn try_commit(mut self)->Result<(),Self> {
// Ensure that commits are fully serialized
let mut guard = self.mvcc.guard.lock();
if !self.can_commit() {
return Err(self)
}
// Get the new sequence number
let seq = self.mvcc.current.lock().checked_add(1).unwrap();
for ErasedCell(erased) in self.visible.get_mut().drain() {
erased.commit(&mut guard, seq, &mut self.pending);
}
// Everything is committed; OK to let new readers look at it now.
*self.mvcc.current.lock() = seq;
Ok(())
}
/// Checks to see if there are any conflicts preventing this transaction
/// from committing.
///
/// Note that, because of TOCTOU, `try_commit` may still fail after this
/// returns `true`.
pub fn can_commit(&self)->bool {
let visible = self.visible.lock();
for &ErasedCell(ref erased) in &*visible {
if erased.last_changed() > *self.seq {
return false;
}
}
true
}
}