1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
//! `mvcc_cell` is a multiversion concurrency-control based
//! transactional-memory system for Rust.
//!
//! Transactions are fully isolated and serialized with respect
//! to controlled values:
//! * Each transaction sees a fixed snapshot as of its start time
//! * Any concurrent commits to cells that a transaction has accessed will
//!   prevent that transaction from committing
//!
//! ## Example Usage
//! ```
//! use mvcc_cell::{Mvcc,MvccCell};
//! let mvcc = Mvcc::new();
//!
//! // Create a transactional slot
//! let slot = MvccCell::new(&mvcc, Box::new(0));
//!
//! // Start concurrent transactions
//! let mut t1 = mvcc.begin();
//! let mut t2 = mvcc.begin();
//! let ro = mvcc.begin();
//!
//! // Uncommitted values are not visible outside the transaction
//! t1[&slot] = 42;
//! assert_eq!(t2[&slot], 0);
//!
//! // First committer wins, regardless of actual modification order
//! t2[&slot] = 7;
//! assert!(t2.try_commit().is_ok());
//! 
//! // Failed commits return the transaction, so you can still read
//! // the computed values
//! let t1_fail = t1.try_commit().unwrap_err();
//! assert_eq!(t1_fail[&slot], 42);
//!
//! // Transactions always read the values that were current when
//! // begin() was called.
//! assert_eq!(ro[&slot], 0);
//! assert_eq!(mvcc.begin()[&slot], 7);
//! ```
mod histogram;
mod history;
mod cell;
mod vacuum;
#[cfg(test)] mod tests;

use histogram::{TokenHist, Token};
use guest_cell::{Guest, id::{UniqueId,Id}};
use parking_lot::Mutex;
use std::collections::HashSet;
use cell::ErasedCell;

pub use cell::MvccCell;
pub use vacuum::Vacuum;

#[cfg(not(feature = "wide_id"))] type TxnId = u64;
#[cfg(feature = "wide_id")] type TxnId = u128;

/// Controller for a set of `MvccCell`s
///
/// See the main crate documentation for more details.
pub struct Mvcc {
    id: Id,
    guard: Mutex<UniqueId>,
    current: Mutex<TxnId>,
    txns: TokenHist<TxnId>
}

/// A mutable snapshot of the controlled `MvccCell`s.  Created via `Mvcc::begin()`.
pub struct Transaction<'a> {
    mvcc: &'a Mvcc,
    seq: Token<'a,TxnId>,
    pending: Guest<'a>,
    visible: Mutex<HashSet<ErasedCell<'a>>>,
}

impl Mvcc {
    pub fn new()->Self { 
        let unique_id = UniqueId::default();
        Mvcc {
            id: *unique_id,
            guard: Mutex::new(unique_id),
            current: Mutex::new(0),
            txns: TokenHist::default()
        }
    }

    /// Take a snapshot of the transactional state
    pub fn begin(&self)->Transaction<'_> {
        let current: TxnId = *self.current.lock();
        Transaction {
            mvcc: self,
            seq: self.txns.make_token(current),
            pending: Guest::default(),
            visible: Mutex::new(HashSet::new()),
        }
    }

    pub(crate) fn id(&self)->Id {
        self.id
    }

    /// Repeat an operation until the transaction commits successfully
    pub fn update<O>(&self, mut f:impl FnMut(&mut Transaction<'_>)->O)->O {
        loop {
            let mut txn = self.begin();
            let out = f(&mut txn);
            if txn.try_commit().is_ok() { return out; }
        }
    }
}

impl Transaction<'_> {
    /// Attempt to commit changes
    ///
    /// Returns `Err(self)` if any `MvccCell` seen by this transaction has committed
    /// changes since `begin()` was called.
    pub fn try_commit(mut self)->Result<(),Self> {
        // Ensure that commits are fully serialized
        let mut guard = self.mvcc.guard.lock();
        if !self.can_commit() {
            return Err(self)
        }

        // Get the new sequence number
        let seq = self.mvcc.current.lock().checked_add(1).unwrap();

        for ErasedCell(erased) in self.visible.get_mut().drain() {
            erased.commit(&mut guard, seq, &mut self.pending);
        }

        // Everything is committed; OK to let new readers look at it now.
        *self.mvcc.current.lock() = seq;

        Ok(())
    }

    /// Checks to see if there are any conflicts preventing this transaction
    /// from committing.
    ///
    /// Note that, because of TOCTOU, `try_commit` may still fail after this
    /// returns `true`.
    pub fn can_commit(&self)->bool {
        let visible = self.visible.lock();
        for &ErasedCell(ref erased) in &*visible {
            if erased.last_changed() > *self.seq {
                return false;
            }
        }
        true
    }
}