stm_core/transaction/
mod.rs

1// Copyright 2015-2016 rust-stm Developers
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9pub mod control_block;
10pub mod log_var;
11
12use std::collections::BTreeMap;
13use std::collections::btree_map::Entry::*;
14use std::mem;
15use std::sync::Arc;
16use std::any::Any;
17use std::cell::Cell;
18
19use self::log_var::LogVar;
20use self::log_var::LogVar::*;
21use self::control_block::ControlBlock;
22use super::tvar::{TVar, VarControlBlock};
23use super::result::*;
24use super::result::StmError::*;
25
26thread_local!(static TRANSACTION_RUNNING: Cell<bool> = Cell::new(false));
27
28/// `TransactionGuard` checks against nested STM calls.
29///
30/// Use guard, so that it correctly marks the Transaction as finished.
31struct TransactionGuard;
32
33impl TransactionGuard {
34    pub fn new() -> TransactionGuard {
35        TRANSACTION_RUNNING.with(|t| {
36            assert!(!t.get(), "STM: Nested Transaction");
37            t.set(true);
38        });
39        TransactionGuard
40    }
41}
42
43impl Drop for TransactionGuard {
44    fn drop(&mut self) {
45        TRANSACTION_RUNNING.with(|t| {
46            t.set(false);
47        });
48    }
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum TransactionControl {
53    Retry, Abort
54}
55
56/// Transaction tracks all the read and written variables.
57///
58/// It is used for checking vars, to ensure atomicity.
59pub struct Transaction {
60
61    /// Map of all vars that map the `VarControlBlock` of a var to a `LogVar`.
62    /// The `VarControlBlock` is unique because it uses it's address for comparing.
63    ///
64    /// The logs need to be accessed in a order to prevend dead-locks on locking.
65    vars: BTreeMap<Arc<VarControlBlock>, LogVar>,
66}
67
68impl Transaction {
69    /// Create a new log.
70    ///
71    /// Normally you don't need to call this directly.
72    /// Use `atomically` instead.
73    fn new() -> Transaction {
74        Transaction { vars: BTreeMap::new() }
75    }
76
77    /// Run a function with a transaction.
78    ///
79    /// It is equivalent to `atomically`.
80    pub fn with<T, F>(f: F) -> T 
81    where F: Fn(&mut Transaction) -> StmResult<T>,
82    {
83        match Transaction::with_control(|_| TransactionControl::Retry, f) {
84            Some(t) => t,
85            None    => unreachable!()
86        }
87    }
88
89    /// Run a function with a transaction.
90    ///
91    /// `with_control` takes another control function, that
92    /// can steer the control flow and possible terminate early.
93    ///
94    /// `control` can react to counters, timeouts or external inputs.
95    ///
96    /// It allows the user to fall back to another strategy, like a global lock
97    /// in the case of too much contention.
98    ///
99    /// Please not, that the transaction may still infinitely wait for changes when `retry` is
100    /// called and `control` does not abort.
101    /// If you need a timeout, another thread should signal this through a TVar.
102    pub fn with_control<T, F, C>(mut control: C, f: F) -> Option<T>
103    where F: Fn(&mut Transaction) -> StmResult<T>,
104          C: FnMut(StmError) -> TransactionControl,
105    {
106        let _guard = TransactionGuard::new();
107
108        // create a log guard for initializing and cleaning up
109        // the log
110        let mut transaction = Transaction::new();
111
112        // loop until success
113        loop {
114            // run the computation
115            match f(&mut transaction) {
116                // on success exit loop
117                Ok(t) => {
118                    if transaction.commit() {
119                        return Some(t);
120                    }
121                }
122
123                Err(e) => {
124                    // Check if the user wants to abort the transaction.
125                    if let TransactionControl::Abort = control(e) {
126                        return None;
127                    }
128
129                    // on retry wait for changes
130                    if let Retry = e {
131                        transaction.wait_for_change();
132                    }
133                }
134            }
135
136            // clear log before retrying computation
137            transaction.clear();
138        }
139    }
140
141    /// Perform a downcast on a var.
142    fn downcast<T: Any + Clone>(var: Arc<Any>) -> T {
143        match var.downcast_ref::<T>() {
144            Some(s) => s.clone(),
145            None    => unreachable!("TVar has wrong type")
146        }
147    }
148
149    /// Read a variable and return the value.
150    ///
151    /// The returned value is not always consistent with the current value of the var,
152    /// but may be an outdated or or not yet commited value.
153    ///
154    /// The used code should be capable of handling inconsistent states
155    /// without running into infinite loops.
156    /// Just the commit of wrong values is prevented by STM.
157    pub fn read<T: Send + Sync + Any + Clone>(&mut self, var: &TVar<T>) -> StmResult<T> {
158        let ctrl = var.control_block().clone();
159        // Check if the same var was written before.
160        let value = match self.vars.entry(ctrl) {
161
162            // If the variable has been accessed before, then load that value.
163            Occupied(mut entry) => entry.get_mut().read(),
164
165            // Else load the variable statically.
166            Vacant(entry) => {
167                // Read the value from the var.
168                let value = var.read_ref_atomic();
169
170                // Store in in an entry.
171                entry.insert(Read(value.clone()));
172                value
173            }
174        };
175
176        // For now always succeeds, but that may change later.
177        Ok(Transaction::downcast(value))
178    }
179
180    /// Write a variable.
181    ///
182    /// The write is not immediately visible to other threads,
183    /// but atomically commited at the end of the computation.
184    pub fn write<T: Any + Send + Sync + Clone>(&mut self, var: &TVar<T>, value: T) -> StmResult<()> {
185        // box the value
186        let boxed = Arc::new(value);
187
188        // new control block
189        let ctrl = var.control_block().clone();
190        // update or create new entry
191        match self.vars.entry(ctrl) {
192            Occupied(mut entry)     => entry.get_mut().write(boxed),
193            Vacant(entry)       => { entry.insert(Write(boxed)); }
194        }
195
196        // For now always succeeds, but that may change later.
197        Ok(())
198    }
199
200    /// Combine two calculations. When one blocks with `retry`, 
201    /// run the other, but don't commit the changes in the first.
202    ///
203    /// If both block, `Transaction::or` still waits for `TVar`s in both functions.
204    /// Use `Transaction::or` instead of handling errors directly with the `Result::or`.
205    /// The later does not handle all the blocking correctly.
206    pub fn or<T, F1, F2>(&mut self, first: F1, second: F2) -> StmResult<T>
207        where F1: Fn(&mut Transaction) -> StmResult<T>,
208              F2: Fn(&mut Transaction) -> StmResult<T>,
209    {
210        // Create a backup of the log.
211        let mut copy = Transaction {
212            vars: self.vars.clone()
213        };
214
215        // Run the first computation.
216        let f = first(self);
217
218        match f {
219            // Run other on manual retry call.
220            Err(Retry)      => {
221                // swap, so that self is the current run
222                mem::swap(self, &mut copy);
223
224                // Run other action.
225                let s = second(self);
226
227                // If both called retry then exit.
228                match s {
229                    Err(Failure)        => Err(Failure),
230                    s => {
231                        self.combine(copy);
232                        s
233                    }
234                }
235            }
236
237            // Return success and failure directly
238            x               => x,
239        }
240    }
241
242    /// Combine two logs into a single log, to allow waiting for all reads.
243    fn combine(&mut self, other: Transaction) {
244        // combine reads
245        for (var, value) in other.vars {
246            // only insert new values
247            if let Some(value) = value.obsolete() {
248                self.vars.entry(var).or_insert(value);
249            }
250        }
251    }
252
253    /// Clear the log's data.
254    ///
255    /// This should be used before redoing a computation, but
256    /// nowhere else.
257    fn clear(&mut self) {
258        self.vars.clear();
259    }
260
261    /// Wait for any variable to change,
262    /// because the change may lead to a new calculation result.
263    fn wait_for_change(&mut self) {
264        // Create control block for waiting.
265        let ctrl = Arc::new(ControlBlock::new());
266
267        let vars = mem::replace(&mut self.vars, BTreeMap::new());
268        let mut reads = Vec::with_capacity(self.vars.len());
269            
270        let blocking = vars.into_iter()
271            .filter_map(|(a, b)| {
272                b.into_read_value()
273                    .map(|b| (a, b))
274            })
275            // Check for consistency.
276            .all(|(var, value)| {
277                var.wait(&ctrl);
278                let x = {
279                    // Take read lock and read value.
280                    let guard = var.value.read();
281                    Arc::ptr_eq(&value, &guard)
282                };
283                reads.push(var);
284                x
285            });
286
287        // If no var has changed, then block.
288        if blocking {
289            // Propably wait until one var has changed.
290            ctrl.wait();
291        }
292
293        // Let others know that ctrl is dead.
294        // It does not matter, if we set too many
295        // to dead since it may slightly reduce performance
296        // but not break the semantics.
297        for var in &reads {
298            var.set_dead();
299        }
300    }
301
302    /// Write the log back to the variables.
303    ///
304    /// Return true for success and false, if a read var has changed
305    fn commit(&mut self) -> bool {
306        // Use two phase locking for safely writing data back to the vars.
307
308        // First phase: acquire locks.
309        // Check for consistency of all the reads and perform
310        // an early return if something is not consistent.
311
312        // Created arrays for storing the locks
313        // vector of locks.
314        let mut read_vec = Vec::with_capacity(self.vars.len());
315
316        // vector of tuple (value, lock)
317        let mut write_vec = Vec::with_capacity(self.vars.len());
318
319        // vector of written variables
320        let mut written = Vec::with_capacity(self.vars.len());
321
322
323        for (var, value) in &self.vars {
324            // lock the variable and read the value
325
326            match *value {
327                // We need to take a write lock.
328                Write(ref w) | ReadObsoleteWrite(_,ref w)=> {
329                    // take write lock
330                    let lock = var.value.write();
331                    // add all data to the vector
332                    write_vec.push((w, lock));
333                    written.push(var);
334                }
335                
336                // We need to check for consistency and
337                // take a write lock.
338                ReadWrite(ref original,ref w) => {
339                    // take write lock
340                    let lock = var.value.write();
341
342                    if !Arc::ptr_eq(&lock, original) {
343                        return false;
344                    }
345                    // add all data to the vector
346                    write_vec.push((w, lock));
347                    written.push(var);
348                }
349                // Nothing to do. ReadObsolete is only needed for blocking, not
350                // for consistency checks.
351                ReadObsolete(_) => { }
352                // Take read lock and check for consistency.
353                Read(ref original) => {
354                    // Take a read lock.
355                    let lock = var.value.read();
356
357                    if !Arc::ptr_eq(&lock, original) {
358                        return false;
359                    }
360
361                    read_vec.push(lock);
362                }
363            }
364        }
365
366        // Second phase: write back and release
367
368        // Release the reads first.
369        // This allows other threads to continue quickly.
370        drop(read_vec);
371
372        for (value, mut lock) in write_vec {
373            // Commit value.
374            *lock = value.clone();
375        }
376        
377        for var in written {
378            // Unblock all threads waiting for it.
379            var.wake_all();
380        }
381
382        // Commit succeded.
383        true
384    }
385}
386
387#[cfg(test)]
388mod test {
389    use super::*;
390    #[test]
391    fn read() {
392        let mut log = Transaction::new();
393        let var = TVar::new(vec![1, 2, 3, 4]);
394
395        // The variable can be read.
396        assert_eq!(&*log.read(&var).unwrap(), &[1, 2, 3, 4]);
397    }
398
399    #[test]
400    fn write_read() {
401        let mut log = Transaction::new();
402        let var = TVar::new(vec![1, 2]);
403
404        log.write(&var, vec![1, 2, 3, 4]).unwrap();
405
406        // Consecutive reads get the updated version.
407        assert_eq!(log.read(&var).unwrap(), [1, 2, 3, 4]);
408
409        // The original value is still preserved.
410        assert_eq!(var.read_atomic(), [1, 2]);
411    }
412
413    #[test]
414    fn transaction_simple() {
415        let x = Transaction::with(|_| Ok(42));
416        assert_eq!(x, 42);
417    }
418
419    #[test]
420    fn transaction_read() {
421        let read = TVar::new(42);
422
423        let x = Transaction::with(|trans| {
424            read.read(trans)
425        });
426
427        assert_eq!(x, 42);
428    }
429
430    /// Run a transaction with a control function, that always aborts.
431    /// The transaction still tries to run a single time and should successfully 
432    /// commit in this test.
433    #[test]
434    fn transaction_with_control_abort_on_single_run() {
435        let read = TVar::new(42);
436
437        let x = Transaction::with_control(|_| TransactionControl::Abort, |tx| {
438            read.read(tx)
439        });
440
441        assert_eq!(x, Some(42));
442    }
443
444    /// Run a transaction with a control function, that always aborts.
445    /// The transaction retries infinitely often. The control function will abort this loop.
446    #[test]
447    fn transaction_with_control_abort_on_retry() {
448        let x: Option<i32> = Transaction::with_control(|_| TransactionControl::Abort, |_| {
449            Err(Retry)
450        });
451
452        assert_eq!(x, None);
453    }
454
455
456    #[test]
457    fn transaction_write() {
458        let write = TVar::new(42);
459
460        Transaction::with(|trans| {
461            write.write(trans, 0)
462        });
463
464        assert_eq!(write.read_atomic(), 0);
465    }
466
467    #[test]
468    fn transaction_copy() {
469        let read = TVar::new(42);
470        let write = TVar::new(0);
471
472        Transaction::with(|trans| {
473            let r = read.read(trans)?;
474            write.write(trans, r)
475        });
476
477        assert_eq!(write.read_atomic(), 42);
478    }
479
480    // Dat name. seriously? 
481    #[test]
482    fn transaction_control_stuff() {
483        let read = TVar::new(42);
484        let write = TVar::new(0);
485
486        Transaction::with(|trans| {
487            let r = read.read(trans)?;
488            write.write(trans, r)
489        });
490
491        assert_eq!(write.read_atomic(), 42);
492    }
493
494    /// Test if nested transactions are correctly detected.
495    #[test]
496    #[should_panic]
497    fn transaction_nested_fail() {
498        Transaction::with(|_| {
499            Transaction::with(|_| Ok(42));
500            Ok(1)
501        });
502    }
503}